1 /*
2 * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /**
25 * @test id=default
26 * @bug 8284161
27 * @summary Test virtual threads doing blocking I/O on java.net Sockets
28 * @library /test/lib
29 * @run junit BlockingSocketOps
30 */
31
32 /**
33 * @test id=poller-modes
34 * @requires (os.family == "linux") | (os.family == "mac")
35 * @library /test/lib
36 * @run junit/othervm -Djdk.pollerMode=1 BlockingSocketOps
37 * @run junit/othervm -Djdk.pollerMode=2 BlockingSocketOps
38 */
39
40 /**
41 * @test id=no-vmcontinuations
42 * @requires vm.continuations
43 * @library /test/lib
44 * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingSocketOps
45 */
46
47 import java.io.Closeable;
48 import java.io.IOException;
49 import java.io.InputStream;
50 import java.io.OutputStream;
51 import java.net.DatagramPacket;
52 import java.net.DatagramSocket;
53 import java.net.InetAddress;
54 import java.net.InetSocketAddress;
55 import java.net.ServerSocket;
56 import java.net.Socket;
57 import java.net.SocketAddress;
58 import java.net.SocketException;
59 import java.net.SocketTimeoutException;
60
61 import jdk.test.lib.thread.VThreadRunner;
62 import org.junit.jupiter.api.Test;
63 import static org.junit.jupiter.api.Assertions.*;
64
65 class BlockingSocketOps {
66
67 /**
68 * Socket read/write, no blocking.
69 */
70 @Test
71 void testSocketReadWrite1() throws Exception {
72 VThreadRunner.run(() -> {
73 try (var connection = new Connection()) {
74 Socket s1 = connection.socket1();
75 Socket s2 = connection.socket2();
76
77 // write should not block
78 byte[] ba = "XXX".getBytes("UTF-8");
79 s1.getOutputStream().write(ba);
80
81 // read should not block
82 ba = new byte[10];
83 int n = s2.getInputStream().read(ba);
84 assertTrue(n > 0);
85 assertTrue(ba[0] == 'X');
86 }
87 });
88 }
89
90 /**
91 * Virtual thread blocks in read.
92 */
93 @Test
94 void testSocketRead1() throws Exception {
95 testSocketRead(0);
96 }
97
98 /**
99 * Virtual thread blocks in timed read.
100 */
101 @Test
102 void testSocketRead2() throws Exception {
103 testSocketRead(60_000);
104 }
105
106 void testSocketRead(int timeout) throws Exception {
107 VThreadRunner.run(() -> {
108 try (var connection = new Connection()) {
109 Socket s1 = connection.socket1();
110 Socket s2 = connection.socket2();
111
112 // delayed write from sc1
113 byte[] ba1 = "XXX".getBytes("UTF-8");
114 runAfterParkedAsync(() -> s1.getOutputStream().write(ba1));
115
116 // read from sc2 should block
117 if (timeout > 0) {
118 s2.setSoTimeout(timeout);
119 }
120 byte[] ba2 = new byte[10];
121 int n = s2.getInputStream().read(ba2);
122 assertTrue(n > 0);
123 assertTrue(ba2[0] == 'X');
124 }
125 });
126 }
127
128 /**
129 * Virtual thread blocks in write.
130 */
131 @Test
132 void testSocketWrite1() throws Exception {
133 VThreadRunner.run(() -> {
134 try (var connection = new Connection()) {
135 Socket s1 = connection.socket1();
136 Socket s2 = connection.socket2();
137
138 // delayed read from s2 to EOF
139 InputStream in = s2.getInputStream();
140 Thread reader = runAfterParkedAsync(() ->
141 in.transferTo(OutputStream.nullOutputStream()));
142
143 // write should block
144 byte[] ba = new byte[100*1024];
145 try (OutputStream out = s1.getOutputStream()) {
146 for (int i = 0; i < 1000; i++) {
147 out.write(ba);
148 }
149 }
150
151 // wait for reader to finish
152 reader.join();
153 }
154 });
155 }
156
157 /**
158 * Virtual thread blocks in read, peer closes connection gracefully.
159 */
160 @Test
161 void testSocketReadPeerClose1() throws Exception {
162 VThreadRunner.run(() -> {
163 try (var connection = new Connection()) {
164 Socket s1 = connection.socket1();
165 Socket s2 = connection.socket2();
166
167 // delayed close of s2
168 runAfterParkedAsync(s2::close);
169
170 // read from s1 should block, then read -1
171 int n = s1.getInputStream().read();
172 assertTrue(n == -1);
173 }
174 });
175 }
176
177 /**
178 * Virtual thread blocks in read, peer closes connection abruptly.
179 */
180 @Test
181 void testSocketReadPeerClose2() throws Exception {
182 VThreadRunner.run(() -> {
183 try (var connection = new Connection()) {
184 Socket s1 = connection.socket1();
185 Socket s2 = connection.socket2();
186
187 // delayed abrupt close of s2
188 s2.setSoLinger(true, 0);
189 runAfterParkedAsync(s2::close);
190
191 // read from s1 should block, then throw
192 try {
193 int n = s1.getInputStream().read();
194 fail("read " + n);
195 } catch (IOException ioe) {
196 // expected
197 }
198 }
199 });
200 }
201
202 /**
203 * Socket close while virtual thread blocked in read.
204 */
205 @Test
206 void testSocketReadAsyncClose1() throws Exception {
207 testSocketReadAsyncClose(0);
208 }
209
210 /**
211 * Socket close while virtual thread blocked in timed read.
212 */
213 @Test
214 void testSocketReadAsyncClose2() throws Exception {
215 testSocketReadAsyncClose(0);
216 }
217
218 void testSocketReadAsyncClose(int timeout) throws Exception {
219 VThreadRunner.run(() -> {
220 try (var connection = new Connection()) {
221 Socket s = connection.socket1();
222
223 // delayed close of s
224 runAfterParkedAsync(s::close);
225
226 // read from s should block, then throw
227 if (timeout > 0) {
228 s.setSoTimeout(timeout);
229 }
230 try {
231 int n = s.getInputStream().read();
232 fail("read " + n);
233 } catch (SocketException expected) { }
234 }
235 });
236 }
237
238 /**
239 * Virtual thread interrupted while blocked in Socket read.
240 */
241 @Test
242 void testSocketReadInterrupt1() throws Exception {
243 testSocketReadInterrupt(0);
244 }
245
246 /**
247 * Virtual thread interrupted while blocked in Socket read with timeout
248 */
249 @Test
250 void testSocketReadInterrupt2() throws Exception {
251 testSocketReadInterrupt(60_000);
252 }
253
254 void testSocketReadInterrupt(int timeout) throws Exception {
255 VThreadRunner.run(() -> {
256 try (var connection = new Connection()) {
257 Socket s = connection.socket1();
258
259
260 // delayed interrupt of current thread
261 Thread thisThread = Thread.currentThread();
262 runAfterParkedAsync(thisThread::interrupt);
263
264 // read from s should block, then throw
265 if (timeout > 0) {
266 s.setSoTimeout(timeout);
267 }
268 try {
269 int n = s.getInputStream().read();
270 fail("read " + n);
271 } catch (SocketException expected) {
272 assertTrue(Thread.interrupted());
273 assertTrue(s.isClosed());
274 }
275 }
276 });
277 }
278
279 /**
280 * Socket close while virtual thread blocked in write.
281 */
282 @Test
283 void testSocketWriteAsyncClose() throws Exception {
284 VThreadRunner.run(() -> {
285 try (var connection = new Connection()) {
286 Socket s = connection.socket1();
287
288 // delayedclose of s
289 runAfterParkedAsync(s::close);
290
291 // write to s should block, then throw
292 try {
293 byte[] ba = new byte[100*1024];
294 OutputStream out = s.getOutputStream();
295 for (;;) {
296 out.write(ba);
297 }
298 } catch (SocketException expected) { }
299 }
300 });
301 }
302
303 /**
304 * Virtual thread interrupted while blocked in Socket write.
305 */
306 @Test
307 void testSocketWriteInterrupt() throws Exception {
308 VThreadRunner.run(() -> {
309 try (var connection = new Connection()) {
310 Socket s = connection.socket1();
311
312 // delayed interrupt of current thread
313 Thread thisThread = Thread.currentThread();
314 runAfterParkedAsync(thisThread::interrupt);
315
316 // write to s should block, then throw
317 try {
318 byte[] ba = new byte[100*1024];
319 OutputStream out = s.getOutputStream();
320 for (;;) {
321 out.write(ba);
322 }
323 } catch (SocketException expected) {
324 assertTrue(Thread.interrupted());
325 assertTrue(s.isClosed());
326 }
327 }
328 });
329 }
330
331 /**
332 * Virtual thread reading urgent data when SO_OOBINLINE is enabled.
333 */
334 @Test
335 void testSocketReadUrgentData() throws Exception {
336 VThreadRunner.run(() -> {
337 try (var connection = new Connection()) {
338 Socket s1 = connection.socket1();
339 Socket s2 = connection.socket2();
340
341 // urgent data should be received
342 runAfterParkedAsync(() -> s2.sendUrgentData('X'));
343
344 // read should block, then read the OOB byte
345 s1.setOOBInline(true);
346 byte[] ba = new byte[10];
347 int n = s1.getInputStream().read(ba);
348 assertTrue(n == 1);
349 assertTrue(ba[0] == 'X');
350
351 // urgent data should not be received
352 s1.setOOBInline(false);
353 s1.setSoTimeout(500);
354 s2.sendUrgentData('X');
355 try {
356 s1.getInputStream().read(ba);
357 fail();
358 } catch (SocketTimeoutException expected) { }
359 }
360 });
361 }
362
363 /**
364 * ServerSocket accept, no blocking.
365 */
366 @Test
367 void testServerSocketAccept1() throws Exception {
368 VThreadRunner.run(() -> {
369 try (var listener = new ServerSocket()) {
370 InetAddress loopback = InetAddress.getLoopbackAddress();
371 listener.bind(new InetSocketAddress(loopback, 0));
372
373 // establish connection
374 var socket1 = new Socket(loopback, listener.getLocalPort());
375
376 // accept should not block
377 var socket2 = listener.accept();
378 socket1.close();
379 socket2.close();
380 }
381 });
382 }
383
384 /**
385 * Virtual thread blocks in accept.
386 */
387 @Test
388 void testServerSocketAccept2() throws Exception {
389 testServerSocketAccept(0);
390 }
391
392 /**
393 * Virtual thread blocks in timed accept.
394 */
395 @Test
396 void testServerSocketAccept3() throws Exception {
397 testServerSocketAccept(60_000);
398 }
399
400 void testServerSocketAccept(int timeout) throws Exception {
401 VThreadRunner.run(() -> {
402 try (var listener = new ServerSocket()) {
403 InetAddress loopback = InetAddress.getLoopbackAddress();
404 listener.bind(new InetSocketAddress(loopback, 0));
405
406 // schedule connect
407 var socket1 = new Socket();
408 SocketAddress remote = listener.getLocalSocketAddress();
409 runAfterParkedAsync(() -> socket1.connect(remote));
410
411 // accept should block
412 if (timeout > 0) {
413 listener.setSoTimeout(timeout);
414 }
415 var socket2 = listener.accept();
416 socket1.close();
417 socket2.close();
418 }
419 });
420 }
421
422 /**
423 * ServerSocket close while virtual thread blocked in accept.
424 */
425 @Test
426 void testServerSocketAcceptAsyncClose1() throws Exception {
427 testServerSocketAcceptAsyncClose(0);
428 }
429
430 /**
431 * ServerSocket close while virtual thread blocked in timed accept.
432 */
433 @Test
434 void testServerSocketAcceptAsyncClose2() throws Exception {
435 testServerSocketAcceptAsyncClose(60_000);
436 }
437
438 void testServerSocketAcceptAsyncClose(int timeout) throws Exception {
439 VThreadRunner.run(() -> {
440 try (var listener = new ServerSocket()) {
441 InetAddress loopback = InetAddress.getLoopbackAddress();
442 listener.bind(new InetSocketAddress(loopback, 0));
443
444 // delayed close of listener
445 runAfterParkedAsync(listener::close);
446
447 // accept should block, then throw
448 if (timeout > 0) {
449 listener.setSoTimeout(timeout);
450 }
451 try {
452 listener.accept().close();
453 fail("connection accepted???");
454 } catch (SocketException expected) { }
455 }
456 });
457 }
458
459 /**
460 * Virtual thread interrupted while blocked in ServerSocket accept.
461 */
462 @Test
463 void testServerSocketAcceptInterrupt1() throws Exception {
464 testServerSocketAcceptInterrupt(0);
465 }
466
467 /**
468 * Virtual thread interrupted while blocked in ServerSocket accept with timeout.
469 */
470 @Test
471 void testServerSocketAcceptInterrupt2() throws Exception {
472 testServerSocketAcceptInterrupt(60_000);
473 }
474
475 void testServerSocketAcceptInterrupt(int timeout) throws Exception {
476 VThreadRunner.run(() -> {
477 try (var listener = new ServerSocket()) {
478 InetAddress loopback = InetAddress.getLoopbackAddress();
479 listener.bind(new InetSocketAddress(loopback, 0));
480
481 // delayed interrupt of current thread
482 Thread thisThread = Thread.currentThread();
483 runAfterParkedAsync(thisThread::interrupt);
484
485 // accept should block, then throw
486 if (timeout > 0) {
487 listener.setSoTimeout(timeout);
488 }
489 try {
490 listener.accept().close();
491 fail("connection accepted???");
492 } catch (SocketException expected) {
493 assertTrue(Thread.interrupted());
494 assertTrue(listener.isClosed());
495 }
496 }
497 });
498 }
499
500 /**
501 * DatagramSocket receive/send, no blocking.
502 */
503 @Test
504 void testDatagramSocketSendReceive1() throws Exception {
505 VThreadRunner.run(() -> {
506 try (DatagramSocket s1 = new DatagramSocket(null);
507 DatagramSocket s2 = new DatagramSocket(null)) {
508
509 InetAddress lh = InetAddress.getLoopbackAddress();
510 s1.bind(new InetSocketAddress(lh, 0));
511 s2.bind(new InetSocketAddress(lh, 0));
512
513 // send should not block
514 byte[] bytes = "XXX".getBytes("UTF-8");
515 DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
516 p1.setSocketAddress(s2.getLocalSocketAddress());
517 s1.send(p1);
518
519 // receive should not block
520 byte[] ba = new byte[100];
521 DatagramPacket p2 = new DatagramPacket(ba, ba.length);
522 s2.receive(p2);
523 assertEquals(s1.getLocalSocketAddress(), p2.getSocketAddress());
524 assertTrue(ba[0] == 'X');
525 }
526 });
527 }
528
529 /**
530 * Virtual thread blocks in DatagramSocket receive.
531 */
532 @Test
533 void testDatagramSocketSendReceive2() throws Exception {
534 testDatagramSocketSendReceive(0);
535 }
536
537 /**
538 * Virtual thread blocks in DatagramSocket receive with timeout.
539 */
540 @Test
541 void testDatagramSocketSendReceive3() throws Exception {
542 testDatagramSocketSendReceive(60_000);
543 }
544
545 private void testDatagramSocketSendReceive(int timeout) throws Exception {
546 VThreadRunner.run(() -> {
547 try (DatagramSocket s1 = new DatagramSocket(null);
548 DatagramSocket s2 = new DatagramSocket(null)) {
549
550 InetAddress lh = InetAddress.getLoopbackAddress();
551 s1.bind(new InetSocketAddress(lh, 0));
552 s2.bind(new InetSocketAddress(lh, 0));
553
554 // delayed send
555 byte[] bytes = "XXX".getBytes("UTF-8");
556 DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
557 p1.setSocketAddress(s2.getLocalSocketAddress());
558 runAfterParkedAsync(() -> s1.send(p1));
559
560 // receive should block
561 if (timeout > 0) {
562 s2.setSoTimeout(timeout);
563 }
564 byte[] ba = new byte[100];
565 DatagramPacket p2 = new DatagramPacket(ba, ba.length);
566 s2.receive(p2);
567 assertEquals(s1.getLocalSocketAddress(), p2.getSocketAddress());
568 assertTrue(ba[0] == 'X');
569 }
570 });
571 }
572
573 /**
574 * Virtual thread blocks in DatagramSocket receive that times out.
575 */
576 @Test
577 void testDatagramSocketReceiveTimeout() throws Exception {
578 VThreadRunner.run(() -> {
579 try (DatagramSocket s = new DatagramSocket(null)) {
580 InetAddress lh = InetAddress.getLoopbackAddress();
581 s.bind(new InetSocketAddress(lh, 0));
582 s.setSoTimeout(500);
583 byte[] ba = new byte[100];
584 DatagramPacket p = new DatagramPacket(ba, ba.length);
585 try {
586 s.receive(p);
587 fail();
588 } catch (SocketTimeoutException expected) { }
589 }
590 });
591 }
592
593 /**
594 * DatagramSocket close while virtual thread blocked in receive.
595 */
596 @Test
597 void testDatagramSocketReceiveAsyncClose1() throws Exception {
598 testDatagramSocketReceiveAsyncClose(0);
599 }
600
601 /**
602 * DatagramSocket close while virtual thread blocked with timeout.
603 */
604 @Test
605 void testDatagramSocketReceiveAsyncClose2() throws Exception {
606 testDatagramSocketReceiveAsyncClose(60_000);
607 }
608
609 private void testDatagramSocketReceiveAsyncClose(int timeout) throws Exception {
610 VThreadRunner.run(() -> {
611 try (DatagramSocket s = new DatagramSocket(null)) {
612 InetAddress lh = InetAddress.getLoopbackAddress();
613 s.bind(new InetSocketAddress(lh, 0));
614
615 // delayed close of s
616 runAfterParkedAsync(s::close);
617
618 // receive should block, then throw
619 if (timeout > 0) {
620 s.setSoTimeout(timeout);
621 }
622 try {
623 byte[] ba = new byte[100];
624 DatagramPacket p = new DatagramPacket(ba, ba.length);
625 s.receive(p);
626 fail();
627 } catch (SocketException expected) { }
628 }
629 });
630 }
631
632 /**
633 * Virtual thread interrupted while blocked in DatagramSocket receive.
634 */
635 @Test
636 void testDatagramSocketReceiveInterrupt1() throws Exception {
637 testDatagramSocketReceiveInterrupt(0);
638 }
639
640 /**
641 * Virtual thread interrupted while blocked in DatagramSocket receive with timeout.
642 */
643 @Test
644 void testDatagramSocketReceiveInterrupt2() throws Exception {
645 testDatagramSocketReceiveInterrupt(60_000);
646 }
647
648 private void testDatagramSocketReceiveInterrupt(int timeout) throws Exception {
649 VThreadRunner.run(() -> {
650 try (DatagramSocket s = new DatagramSocket(null)) {
651 InetAddress lh = InetAddress.getLoopbackAddress();
652 s.bind(new InetSocketAddress(lh, 0));
653
654 // delayed interrupt of current thread
655 Thread thisThread = Thread.currentThread();
656 runAfterParkedAsync(thisThread::interrupt);
657
658 // receive should block, then throw
659 if (timeout > 0) {
660 s.setSoTimeout(timeout);
661 }
662 try {
663 byte[] ba = new byte[100];
664 DatagramPacket p = new DatagramPacket(ba, ba.length);
665 s.receive(p);
666 fail();
667 } catch (SocketException expected) {
668 assertTrue(Thread.interrupted());
669 assertTrue(s.isClosed());
670 }
671 }
672 });
673 }
674
675 /**
676 * Creates a loopback connection
677 */
678 static class Connection implements Closeable {
679 private final Socket s1;
680 private final Socket s2;
681 Connection() throws IOException {
682 var lh = InetAddress.getLoopbackAddress();
683 try (var listener = new ServerSocket()) {
684 listener.bind(new InetSocketAddress(lh, 0));
685 Socket s1 = new Socket();
686 Socket s2;
687 try {
688 s1.connect(listener.getLocalSocketAddress());
689 s2 = listener.accept();
690 } catch (IOException ioe) {
691 s1.close();
692 throw ioe;
693 }
694 this.s1 = s1;
695 this.s2 = s2;
696 }
697
698 }
699 Socket socket1() {
700 return s1;
701 }
702 Socket socket2() {
703 return s2;
704 }
705 @Override
706 public void close() throws IOException {
707 s1.close();
708 s2.close();
709 }
710 }
711
712 @FunctionalInterface
713 interface ThrowingRunnable {
714 void run() throws Exception;
715 }
716
717 /**
718 * Runs the given task asynchronously after the current virtual thread has parked.
719 * @return the thread started to run the task
720 */
721 static Thread runAfterParkedAsync(ThrowingRunnable task) {
722 Thread target = Thread.currentThread();
723 if (!target.isVirtual())
724 throw new WrongThreadException();
725 return Thread.ofPlatform().daemon().start(() -> {
726 try {
727 Thread.State state = target.getState();
728 while (state != Thread.State.WAITING
729 && state != Thread.State.TIMED_WAITING) {
730 Thread.sleep(20);
731 state = target.getState();
732 }
733 Thread.sleep(20); // give a bit more time to release carrier
734 task.run();
735 } catch (Exception e) {
736 e.printStackTrace();
737 }
738 });
739 }
740 }