1 /* 2 * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /** 25 * @test id=default 26 * @bug 8284161 27 * @summary Test virtual threads doing blocking I/O on java.net Sockets 28 * @library /test/lib 29 * @run junit BlockingSocketOps 30 */ 31 32 /** 33 * @test id=poller-modes 34 * @requires (os.family == "linux") | (os.family == "mac") 35 * @library /test/lib 36 * @run junit/othervm -Djdk.pollerMode=1 BlockingSocketOps 37 * @run junit/othervm -Djdk.pollerMode=2 BlockingSocketOps 38 */ 39 40 /** 41 * @test id=no-vmcontinuations 42 * @requires vm.continuations 43 * @library /test/lib 44 * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingSocketOps 45 */ 46 47 import java.io.Closeable; 48 import java.io.IOException; 49 import java.io.InputStream; 50 import java.io.OutputStream; 51 import java.net.DatagramPacket; 52 import java.net.DatagramSocket; 53 import java.net.InetAddress; 54 import java.net.InetSocketAddress; 55 import java.net.ServerSocket; 56 import java.net.Socket; 57 import java.net.SocketAddress; 58 import java.net.SocketException; 59 import java.net.SocketTimeoutException; 60 61 import jdk.test.lib.thread.VThreadRunner; 62 import org.junit.jupiter.api.Test; 63 import static org.junit.jupiter.api.Assertions.*; 64 65 class BlockingSocketOps { 66 67 /** 68 * Socket read/write, no blocking. 69 */ 70 @Test 71 void testSocketReadWrite1() throws Exception { 72 VThreadRunner.run(() -> { 73 try (var connection = new Connection()) { 74 Socket s1 = connection.socket1(); 75 Socket s2 = connection.socket2(); 76 77 // write should not block 78 byte[] ba = "XXX".getBytes("UTF-8"); 79 s1.getOutputStream().write(ba); 80 81 // read should not block 82 ba = new byte[10]; 83 int n = s2.getInputStream().read(ba); 84 assertTrue(n > 0); 85 assertTrue(ba[0] == 'X'); 86 } 87 }); 88 } 89 90 /** 91 * Virtual thread blocks in read. 92 */ 93 @Test 94 void testSocketRead1() throws Exception { 95 testSocketRead(0); 96 } 97 98 /** 99 * Virtual thread blocks in timed read. 100 */ 101 @Test 102 void testSocketRead2() throws Exception { 103 testSocketRead(60_000); 104 } 105 106 void testSocketRead(int timeout) throws Exception { 107 VThreadRunner.run(() -> { 108 try (var connection = new Connection()) { 109 Socket s1 = connection.socket1(); 110 Socket s2 = connection.socket2(); 111 112 // delayed write from sc1 113 byte[] ba1 = "XXX".getBytes("UTF-8"); 114 runAfterParkedAsync(() -> s1.getOutputStream().write(ba1)); 115 116 // read from sc2 should block 117 if (timeout > 0) { 118 s2.setSoTimeout(timeout); 119 } 120 byte[] ba2 = new byte[10]; 121 int n = s2.getInputStream().read(ba2); 122 assertTrue(n > 0); 123 assertTrue(ba2[0] == 'X'); 124 } 125 }); 126 } 127 128 /** 129 * Virtual thread blocks in write. 130 */ 131 @Test 132 void testSocketWrite1() throws Exception { 133 VThreadRunner.run(() -> { 134 try (var connection = new Connection()) { 135 Socket s1 = connection.socket1(); 136 Socket s2 = connection.socket2(); 137 138 // delayed read from s2 to EOF 139 InputStream in = s2.getInputStream(); 140 Thread reader = runAfterParkedAsync(() -> 141 in.transferTo(OutputStream.nullOutputStream())); 142 143 // write should block 144 byte[] ba = new byte[100*1024]; 145 try (OutputStream out = s1.getOutputStream()) { 146 for (int i = 0; i < 1000; i++) { 147 out.write(ba); 148 } 149 } 150 151 // wait for reader to finish 152 reader.join(); 153 } 154 }); 155 } 156 157 /** 158 * Virtual thread blocks in read, peer closes connection gracefully. 159 */ 160 @Test 161 void testSocketReadPeerClose1() throws Exception { 162 VThreadRunner.run(() -> { 163 try (var connection = new Connection()) { 164 Socket s1 = connection.socket1(); 165 Socket s2 = connection.socket2(); 166 167 // delayed close of s2 168 runAfterParkedAsync(s2::close); 169 170 // read from s1 should block, then read -1 171 int n = s1.getInputStream().read(); 172 assertTrue(n == -1); 173 } 174 }); 175 } 176 177 /** 178 * Virtual thread blocks in read, peer closes connection abruptly. 179 */ 180 @Test 181 void testSocketReadPeerClose2() throws Exception { 182 VThreadRunner.run(() -> { 183 try (var connection = new Connection()) { 184 Socket s1 = connection.socket1(); 185 Socket s2 = connection.socket2(); 186 187 // delayed abrupt close of s2 188 s2.setSoLinger(true, 0); 189 runAfterParkedAsync(s2::close); 190 191 // read from s1 should block, then throw 192 try { 193 int n = s1.getInputStream().read(); 194 fail("read " + n); 195 } catch (IOException ioe) { 196 // expected 197 } 198 } 199 }); 200 } 201 202 /** 203 * Socket close while virtual thread blocked in read. 204 */ 205 @Test 206 void testSocketReadAsyncClose1() throws Exception { 207 testSocketReadAsyncClose(0); 208 } 209 210 /** 211 * Socket close while virtual thread blocked in timed read. 212 */ 213 @Test 214 void testSocketReadAsyncClose2() throws Exception { 215 testSocketReadAsyncClose(60_000); 216 } 217 218 void testSocketReadAsyncClose(int timeout) throws Exception { 219 VThreadRunner.run(() -> { 220 try (var connection = new Connection()) { 221 Socket s = connection.socket1(); 222 223 // delayed close of s 224 runAfterParkedAsync(s::close); 225 226 // read from s should block, then throw 227 if (timeout > 0) { 228 s.setSoTimeout(timeout); 229 } 230 try { 231 int n = s.getInputStream().read(); 232 fail("read " + n); 233 } catch (SocketException expected) { } 234 } 235 }); 236 } 237 238 /** 239 * Socket shutdownInput while virtual thread blocked in read. 240 */ 241 @Test 242 void testSocketReadAsyncShutdownInput1() throws Exception { 243 testSocketReadAsyncShutdownInput(0); 244 } 245 246 /** 247 * Socket shutdownInput while virtual thread blocked in timed read. 248 */ 249 @Test 250 void testSocketReadAsyncShutdownInput2() throws Exception { 251 testSocketReadAsyncShutdownInput(60_000); 252 } 253 254 void testSocketReadAsyncShutdownInput(int timeout) throws Exception { 255 VThreadRunner.run(() -> { 256 try (var connection = new Connection()) { 257 Socket s = connection.socket1(); 258 259 // delayed shutdown of s 260 runAfterParkedAsync(s::shutdownInput); 261 262 // read from s should block, then throw 263 if (timeout > 0) { 264 s.setSoTimeout(timeout); 265 } 266 267 // -1 or SocketException 268 try { 269 int n = s.getInputStream().read(); 270 assertEquals(-1, n); 271 } catch (SocketException e) { } 272 assertFalse(s.isClosed()); 273 } 274 }); 275 } 276 277 /** 278 * Virtual thread interrupted while blocked in Socket read. 279 */ 280 @Test 281 void testSocketReadInterrupt1() throws Exception { 282 testSocketReadInterrupt(0); 283 } 284 285 /** 286 * Virtual thread interrupted while blocked in Socket read with timeout 287 */ 288 @Test 289 void testSocketReadInterrupt2() throws Exception { 290 testSocketReadInterrupt(60_000); 291 } 292 293 void testSocketReadInterrupt(int timeout) throws Exception { 294 VThreadRunner.run(() -> { 295 try (var connection = new Connection()) { 296 Socket s = connection.socket1(); 297 298 299 // delayed interrupt of current thread 300 Thread thisThread = Thread.currentThread(); 301 runAfterParkedAsync(thisThread::interrupt); 302 303 // read from s should block, then throw 304 if (timeout > 0) { 305 s.setSoTimeout(timeout); 306 } 307 try { 308 int n = s.getInputStream().read(); 309 fail("read " + n); 310 } catch (SocketException expected) { 311 assertTrue(Thread.interrupted()); 312 assertTrue(s.isClosed()); 313 } 314 } 315 }); 316 } 317 318 /** 319 * Socket close while virtual thread blocked in write. 320 */ 321 @Test 322 void testSocketWriteAsyncClose() throws Exception { 323 VThreadRunner.run(() -> { 324 try (var connection = new Connection()) { 325 Socket s = connection.socket1(); 326 327 // delayed close of s 328 runAfterParkedAsync(s::close); 329 330 // write to s should block, then throw 331 try { 332 byte[] ba = new byte[100*1024]; 333 OutputStream out = s.getOutputStream(); 334 for (;;) { 335 out.write(ba); 336 } 337 } catch (SocketException expected) { } 338 } 339 }); 340 } 341 342 /** 343 * Socket shutdownOutput while virtual thread blocked in write. 344 */ 345 @Test 346 void testSocketWriteAsyncShutdownOutput() throws Exception { 347 VThreadRunner.run(() -> { 348 try (var connection = new Connection()) { 349 Socket s = connection.socket1(); 350 351 // delayed shutdown of s 352 runAfterParkedAsync(s::shutdownOutput); 353 354 // write to s should block, then throw 355 try { 356 byte[] ba = new byte[100*1024]; 357 OutputStream out = s.getOutputStream(); 358 for (;;) { 359 out.write(ba); 360 } 361 } catch (SocketException expected) { } 362 assertFalse(s.isClosed()); 363 } 364 }); 365 } 366 367 /** 368 * Virtual thread interrupted while blocked in Socket write. 369 */ 370 @Test 371 void testSocketWriteInterrupt() throws Exception { 372 VThreadRunner.run(() -> { 373 try (var connection = new Connection()) { 374 Socket s = connection.socket1(); 375 376 // delayed interrupt of current thread 377 Thread thisThread = Thread.currentThread(); 378 runAfterParkedAsync(thisThread::interrupt); 379 380 // write to s should block, then throw 381 try { 382 byte[] ba = new byte[100*1024]; 383 OutputStream out = s.getOutputStream(); 384 for (;;) { 385 out.write(ba); 386 } 387 } catch (SocketException expected) { 388 assertTrue(Thread.interrupted()); 389 assertTrue(s.isClosed()); 390 } 391 } 392 }); 393 } 394 395 /** 396 * Virtual thread reading urgent data when SO_OOBINLINE is enabled. 397 */ 398 @Test 399 void testSocketReadUrgentData() throws Exception { 400 VThreadRunner.run(() -> { 401 try (var connection = new Connection()) { 402 Socket s1 = connection.socket1(); 403 Socket s2 = connection.socket2(); 404 405 // urgent data should be received 406 runAfterParkedAsync(() -> s2.sendUrgentData('X')); 407 408 // read should block, then read the OOB byte 409 s1.setOOBInline(true); 410 byte[] ba = new byte[10]; 411 int n = s1.getInputStream().read(ba); 412 assertTrue(n == 1); 413 assertTrue(ba[0] == 'X'); 414 415 // urgent data should not be received 416 s1.setOOBInline(false); 417 s1.setSoTimeout(500); 418 s2.sendUrgentData('X'); 419 try { 420 s1.getInputStream().read(ba); 421 fail(); 422 } catch (SocketTimeoutException expected) { } 423 } 424 }); 425 } 426 427 /** 428 * ServerSocket accept, no blocking. 429 */ 430 @Test 431 void testServerSocketAccept1() throws Exception { 432 VThreadRunner.run(() -> { 433 try (var listener = new ServerSocket()) { 434 InetAddress loopback = InetAddress.getLoopbackAddress(); 435 listener.bind(new InetSocketAddress(loopback, 0)); 436 437 // establish connection 438 var socket1 = new Socket(loopback, listener.getLocalPort()); 439 440 // accept should not block 441 var socket2 = listener.accept(); 442 socket1.close(); 443 socket2.close(); 444 } 445 }); 446 } 447 448 /** 449 * Virtual thread blocks in accept. 450 */ 451 @Test 452 void testServerSocketAccept2() throws Exception { 453 testServerSocketAccept(0); 454 } 455 456 /** 457 * Virtual thread blocks in timed accept. 458 */ 459 @Test 460 void testServerSocketAccept3() throws Exception { 461 testServerSocketAccept(60_000); 462 } 463 464 void testServerSocketAccept(int timeout) throws Exception { 465 VThreadRunner.run(() -> { 466 try (var listener = new ServerSocket()) { 467 InetAddress loopback = InetAddress.getLoopbackAddress(); 468 listener.bind(new InetSocketAddress(loopback, 0)); 469 470 // schedule connect 471 var socket1 = new Socket(); 472 SocketAddress remote = listener.getLocalSocketAddress(); 473 runAfterParkedAsync(() -> socket1.connect(remote)); 474 475 // accept should block 476 if (timeout > 0) { 477 listener.setSoTimeout(timeout); 478 } 479 var socket2 = listener.accept(); 480 socket1.close(); 481 socket2.close(); 482 } 483 }); 484 } 485 486 /** 487 * ServerSocket close while virtual thread blocked in accept. 488 */ 489 @Test 490 void testServerSocketAcceptAsyncClose1() throws Exception { 491 testServerSocketAcceptAsyncClose(0); 492 } 493 494 /** 495 * ServerSocket close while virtual thread blocked in timed accept. 496 */ 497 @Test 498 void testServerSocketAcceptAsyncClose2() throws Exception { 499 testServerSocketAcceptAsyncClose(60_000); 500 } 501 502 void testServerSocketAcceptAsyncClose(int timeout) throws Exception { 503 VThreadRunner.run(() -> { 504 try (var listener = new ServerSocket()) { 505 InetAddress loopback = InetAddress.getLoopbackAddress(); 506 listener.bind(new InetSocketAddress(loopback, 0)); 507 508 // delayed close of listener 509 runAfterParkedAsync(listener::close); 510 511 // accept should block, then throw 512 if (timeout > 0) { 513 listener.setSoTimeout(timeout); 514 } 515 try { 516 listener.accept().close(); 517 fail("connection accepted???"); 518 } catch (SocketException expected) { } 519 } 520 }); 521 } 522 523 /** 524 * Virtual thread interrupted while blocked in ServerSocket accept. 525 */ 526 @Test 527 void testServerSocketAcceptInterrupt1() throws Exception { 528 testServerSocketAcceptInterrupt(0); 529 } 530 531 /** 532 * Virtual thread interrupted while blocked in ServerSocket accept with timeout. 533 */ 534 @Test 535 void testServerSocketAcceptInterrupt2() throws Exception { 536 testServerSocketAcceptInterrupt(60_000); 537 } 538 539 void testServerSocketAcceptInterrupt(int timeout) throws Exception { 540 VThreadRunner.run(() -> { 541 try (var listener = new ServerSocket()) { 542 InetAddress loopback = InetAddress.getLoopbackAddress(); 543 listener.bind(new InetSocketAddress(loopback, 0)); 544 545 // delayed interrupt of current thread 546 Thread thisThread = Thread.currentThread(); 547 runAfterParkedAsync(thisThread::interrupt); 548 549 // accept should block, then throw 550 if (timeout > 0) { 551 listener.setSoTimeout(timeout); 552 } 553 try { 554 listener.accept().close(); 555 fail("connection accepted???"); 556 } catch (SocketException expected) { 557 assertTrue(Thread.interrupted()); 558 assertTrue(listener.isClosed()); 559 } 560 } 561 }); 562 } 563 564 /** 565 * DatagramSocket receive/send, no blocking. 566 */ 567 @Test 568 void testDatagramSocketSendReceive1() throws Exception { 569 VThreadRunner.run(() -> { 570 try (DatagramSocket s1 = new DatagramSocket(null); 571 DatagramSocket s2 = new DatagramSocket(null)) { 572 573 InetAddress lh = InetAddress.getLoopbackAddress(); 574 s1.bind(new InetSocketAddress(lh, 0)); 575 s2.bind(new InetSocketAddress(lh, 0)); 576 577 // send should not block 578 byte[] bytes = "XXX".getBytes("UTF-8"); 579 DatagramPacket p1 = new DatagramPacket(bytes, bytes.length); 580 p1.setSocketAddress(s2.getLocalSocketAddress()); 581 s1.send(p1); 582 583 // receive should not block 584 byte[] ba = new byte[100]; 585 DatagramPacket p2 = new DatagramPacket(ba, ba.length); 586 s2.receive(p2); 587 assertEquals(s1.getLocalSocketAddress(), p2.getSocketAddress()); 588 assertTrue(ba[0] == 'X'); 589 } 590 }); 591 } 592 593 /** 594 * Virtual thread blocks in DatagramSocket receive. 595 */ 596 @Test 597 void testDatagramSocketSendReceive2() throws Exception { 598 testDatagramSocketSendReceive(0); 599 } 600 601 /** 602 * Virtual thread blocks in DatagramSocket receive with timeout. 603 */ 604 @Test 605 void testDatagramSocketSendReceive3() throws Exception { 606 testDatagramSocketSendReceive(60_000); 607 } 608 609 private void testDatagramSocketSendReceive(int timeout) throws Exception { 610 VThreadRunner.run(() -> { 611 try (DatagramSocket s1 = new DatagramSocket(null); 612 DatagramSocket s2 = new DatagramSocket(null)) { 613 614 InetAddress lh = InetAddress.getLoopbackAddress(); 615 s1.bind(new InetSocketAddress(lh, 0)); 616 s2.bind(new InetSocketAddress(lh, 0)); 617 618 // delayed send 619 byte[] bytes = "XXX".getBytes("UTF-8"); 620 DatagramPacket p1 = new DatagramPacket(bytes, bytes.length); 621 p1.setSocketAddress(s2.getLocalSocketAddress()); 622 runAfterParkedAsync(() -> s1.send(p1)); 623 624 // receive should block 625 if (timeout > 0) { 626 s2.setSoTimeout(timeout); 627 } 628 byte[] ba = new byte[100]; 629 DatagramPacket p2 = new DatagramPacket(ba, ba.length); 630 s2.receive(p2); 631 assertEquals(s1.getLocalSocketAddress(), p2.getSocketAddress()); 632 assertTrue(ba[0] == 'X'); 633 } 634 }); 635 } 636 637 /** 638 * Virtual thread blocks in DatagramSocket receive that times out. 639 */ 640 @Test 641 void testDatagramSocketReceiveTimeout() throws Exception { 642 VThreadRunner.run(() -> { 643 try (DatagramSocket s = new DatagramSocket(null)) { 644 InetAddress lh = InetAddress.getLoopbackAddress(); 645 s.bind(new InetSocketAddress(lh, 0)); 646 s.setSoTimeout(500); 647 byte[] ba = new byte[100]; 648 DatagramPacket p = new DatagramPacket(ba, ba.length); 649 try { 650 s.receive(p); 651 fail(); 652 } catch (SocketTimeoutException expected) { } 653 } 654 }); 655 } 656 657 /** 658 * DatagramSocket close while virtual thread blocked in receive. 659 */ 660 @Test 661 void testDatagramSocketReceiveAsyncClose1() throws Exception { 662 testDatagramSocketReceiveAsyncClose(0); 663 } 664 665 /** 666 * DatagramSocket close while virtual thread blocked with timeout. 667 */ 668 @Test 669 void testDatagramSocketReceiveAsyncClose2() throws Exception { 670 testDatagramSocketReceiveAsyncClose(60_000); 671 } 672 673 private void testDatagramSocketReceiveAsyncClose(int timeout) throws Exception { 674 VThreadRunner.run(() -> { 675 try (DatagramSocket s = new DatagramSocket(null)) { 676 InetAddress lh = InetAddress.getLoopbackAddress(); 677 s.bind(new InetSocketAddress(lh, 0)); 678 679 // delayed close of s 680 runAfterParkedAsync(s::close); 681 682 // receive should block, then throw 683 if (timeout > 0) { 684 s.setSoTimeout(timeout); 685 } 686 try { 687 byte[] ba = new byte[100]; 688 DatagramPacket p = new DatagramPacket(ba, ba.length); 689 s.receive(p); 690 fail(); 691 } catch (SocketException expected) { } 692 } 693 }); 694 } 695 696 /** 697 * Virtual thread interrupted while blocked in DatagramSocket receive. 698 */ 699 @Test 700 void testDatagramSocketReceiveInterrupt1() throws Exception { 701 testDatagramSocketReceiveInterrupt(0); 702 } 703 704 /** 705 * Virtual thread interrupted while blocked in DatagramSocket receive with timeout. 706 */ 707 @Test 708 void testDatagramSocketReceiveInterrupt2() throws Exception { 709 testDatagramSocketReceiveInterrupt(60_000); 710 } 711 712 private void testDatagramSocketReceiveInterrupt(int timeout) throws Exception { 713 VThreadRunner.run(() -> { 714 try (DatagramSocket s = new DatagramSocket(null)) { 715 InetAddress lh = InetAddress.getLoopbackAddress(); 716 s.bind(new InetSocketAddress(lh, 0)); 717 718 // delayed interrupt of current thread 719 Thread thisThread = Thread.currentThread(); 720 runAfterParkedAsync(thisThread::interrupt); 721 722 // receive should block, then throw 723 if (timeout > 0) { 724 s.setSoTimeout(timeout); 725 } 726 try { 727 byte[] ba = new byte[100]; 728 DatagramPacket p = new DatagramPacket(ba, ba.length); 729 s.receive(p); 730 fail(); 731 } catch (SocketException expected) { 732 assertTrue(Thread.interrupted()); 733 assertTrue(s.isClosed()); 734 } 735 } 736 }); 737 } 738 739 /** 740 * Creates a loopback connection 741 */ 742 static class Connection implements Closeable { 743 private final Socket s1; 744 private final Socket s2; 745 Connection() throws IOException { 746 var lh = InetAddress.getLoopbackAddress(); 747 try (var listener = new ServerSocket()) { 748 listener.bind(new InetSocketAddress(lh, 0)); 749 Socket s1 = new Socket(); 750 Socket s2; 751 try { 752 s1.connect(listener.getLocalSocketAddress()); 753 s2 = listener.accept(); 754 } catch (IOException ioe) { 755 s1.close(); 756 throw ioe; 757 } 758 this.s1 = s1; 759 this.s2 = s2; 760 } 761 762 } 763 Socket socket1() { 764 return s1; 765 } 766 Socket socket2() { 767 return s2; 768 } 769 @Override 770 public void close() throws IOException { 771 s1.close(); 772 s2.close(); 773 } 774 } 775 776 @FunctionalInterface 777 interface ThrowingRunnable { 778 void run() throws Exception; 779 } 780 781 /** 782 * Runs the given task asynchronously after the current virtual thread has parked. 783 * @return the thread started to run the task 784 */ 785 static Thread runAfterParkedAsync(ThrowingRunnable task) { 786 Thread target = Thread.currentThread(); 787 if (!target.isVirtual()) 788 throw new WrongThreadException(); 789 return Thread.ofPlatform().daemon().start(() -> { 790 try { 791 Thread.State state = target.getState(); 792 while (state != Thread.State.WAITING 793 && state != Thread.State.TIMED_WAITING) { 794 Thread.sleep(20); 795 state = target.getState(); 796 } 797 Thread.sleep(20); // give a bit more time to release carrier 798 task.run(); 799 } catch (Exception e) { 800 e.printStackTrace(); 801 } 802 }); 803 } 804 }