1 /* 2 * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /** 25 * @test id=default 26 * @bug 8284161 27 * @summary Test virtual threads doing blocking I/O on java.net Sockets 28 * @library /test/lib 29 * @run junit BlockingSocketOps 30 */ 31 32 /** 33 * @test id=poller-modes 34 * @requires (os.family == "linux") | (os.family == "mac") 35 * @library /test/lib 36 * @run junit/othervm -Djdk.pollerMode=1 BlockingSocketOps 37 * @run junit/othervm -Djdk.pollerMode=2 BlockingSocketOps 38 */ 39 40 /** 41 * @test id=no-vmcontinuations 42 * @requires vm.continuations 43 * @library /test/lib 44 * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingSocketOps 45 */ 46 47 import java.io.Closeable; 48 import java.io.IOException; 49 import java.io.InputStream; 50 import java.io.OutputStream; 51 import java.net.DatagramPacket; 52 import java.net.DatagramSocket; 53 import java.net.InetAddress; 54 import java.net.InetSocketAddress; 55 import java.net.ServerSocket; 56 import java.net.Socket; 57 import java.net.SocketAddress; 58 import java.net.SocketException; 59 import java.net.SocketTimeoutException; 60 61 import jdk.test.lib.thread.VThreadRunner; 62 import org.junit.jupiter.api.Test; 63 import static org.junit.jupiter.api.Assertions.*; 64 65 class BlockingSocketOps { 66 67 /** 68 * Socket read/write, no blocking. 69 */ 70 @Test 71 void testSocketReadWrite1() throws Exception { 72 VThreadRunner.run(() -> { 73 try (var connection = new Connection()) { 74 Socket s1 = connection.socket1(); 75 Socket s2 = connection.socket2(); 76 77 // write should not block 78 byte[] ba = "XXX".getBytes("UTF-8"); 79 s1.getOutputStream().write(ba); 80 81 // read should not block 82 ba = new byte[10]; 83 int n = s2.getInputStream().read(ba); 84 assertTrue(n > 0); 85 assertTrue(ba[0] == 'X'); 86 } 87 }); 88 } 89 90 /** 91 * Virtual thread blocks in read. 92 */ 93 @Test 94 void testSocketRead1() throws Exception { 95 testSocketRead(0); 96 } 97 98 /** 99 * Virtual thread blocks in timed read. 100 */ 101 @Test 102 void testSocketRead2() throws Exception { 103 testSocketRead(60_000); 104 } 105 106 void testSocketRead(int timeout) throws Exception { 107 VThreadRunner.run(() -> { 108 try (var connection = new Connection()) { 109 Socket s1 = connection.socket1(); 110 Socket s2 = connection.socket2(); 111 112 // delayed write from sc1 113 byte[] ba1 = "XXX".getBytes("UTF-8"); 114 runAfterParkedAsync(() -> s1.getOutputStream().write(ba1)); 115 116 // read from sc2 should block 117 if (timeout > 0) { 118 s2.setSoTimeout(timeout); 119 } 120 byte[] ba2 = new byte[10]; 121 int n = s2.getInputStream().read(ba2); 122 assertTrue(n > 0); 123 assertTrue(ba2[0] == 'X'); 124 } 125 }); 126 } 127 128 /** 129 * Virtual thread blocks in write. 130 */ 131 @Test 132 void testSocketWrite1() throws Exception { 133 VThreadRunner.run(() -> { 134 try (var connection = new Connection()) { 135 Socket s1 = connection.socket1(); 136 Socket s2 = connection.socket2(); 137 138 // delayed read from s2 to EOF 139 InputStream in = s2.getInputStream(); 140 Thread reader = runAfterParkedAsync(() -> 141 in.transferTo(OutputStream.nullOutputStream())); 142 143 // write should block 144 byte[] ba = new byte[100*1024]; 145 try (OutputStream out = s1.getOutputStream()) { 146 for (int i = 0; i < 1000; i++) { 147 out.write(ba); 148 } 149 } 150 151 // wait for reader to finish 152 reader.join(); 153 } 154 }); 155 } 156 157 /** 158 * Virtual thread blocks in read, peer closes connection gracefully. 159 */ 160 @Test 161 void testSocketReadPeerClose1() throws Exception { 162 VThreadRunner.run(() -> { 163 try (var connection = new Connection()) { 164 Socket s1 = connection.socket1(); 165 Socket s2 = connection.socket2(); 166 167 // delayed close of s2 168 runAfterParkedAsync(s2::close); 169 170 // read from s1 should block, then read -1 171 int n = s1.getInputStream().read(); 172 assertTrue(n == -1); 173 } 174 }); 175 } 176 177 /** 178 * Virtual thread blocks in read, peer closes connection abruptly. 179 */ 180 @Test 181 void testSocketReadPeerClose2() throws Exception { 182 VThreadRunner.run(() -> { 183 try (var connection = new Connection()) { 184 Socket s1 = connection.socket1(); 185 Socket s2 = connection.socket2(); 186 187 // delayed abrupt close of s2 188 s2.setSoLinger(true, 0); 189 runAfterParkedAsync(s2::close); 190 191 // read from s1 should block, then throw 192 try { 193 int n = s1.getInputStream().read(); 194 fail("read " + n); 195 } catch (IOException ioe) { 196 // expected 197 } 198 } 199 }); 200 } 201 202 /** 203 * Socket close while virtual thread blocked in read. 204 */ 205 @Test 206 void testSocketReadAsyncClose1() throws Exception { 207 testSocketReadAsyncClose(0); 208 } 209 210 /** 211 * Socket close while virtual thread blocked in timed read. 212 */ 213 @Test 214 void testSocketReadAsyncClose2() throws Exception { 215 testSocketReadAsyncClose(0); 216 } 217 218 void testSocketReadAsyncClose(int timeout) throws Exception { 219 VThreadRunner.run(() -> { 220 try (var connection = new Connection()) { 221 Socket s = connection.socket1(); 222 223 // delayed close of s 224 runAfterParkedAsync(s::close); 225 226 // read from s should block, then throw 227 if (timeout > 0) { 228 s.setSoTimeout(timeout); 229 } 230 try { 231 int n = s.getInputStream().read(); 232 fail("read " + n); 233 } catch (SocketException expected) { } 234 } 235 }); 236 } 237 238 /** 239 * Virtual thread interrupted while blocked in Socket read. 240 */ 241 @Test 242 void testSocketReadInterrupt1() throws Exception { 243 testSocketReadInterrupt(0); 244 } 245 246 /** 247 * Virtual thread interrupted while blocked in Socket read with timeout 248 */ 249 @Test 250 void testSocketReadInterrupt2() throws Exception { 251 testSocketReadInterrupt(60_000); 252 } 253 254 void testSocketReadInterrupt(int timeout) throws Exception { 255 VThreadRunner.run(() -> { 256 try (var connection = new Connection()) { 257 Socket s = connection.socket1(); 258 259 260 // delayed interrupt of current thread 261 Thread thisThread = Thread.currentThread(); 262 runAfterParkedAsync(thisThread::interrupt); 263 264 // read from s should block, then throw 265 if (timeout > 0) { 266 s.setSoTimeout(timeout); 267 } 268 try { 269 int n = s.getInputStream().read(); 270 fail("read " + n); 271 } catch (SocketException expected) { 272 assertTrue(Thread.interrupted()); 273 assertTrue(s.isClosed()); 274 } 275 } 276 }); 277 } 278 279 /** 280 * Socket close while virtual thread blocked in write. 281 */ 282 @Test 283 void testSocketWriteAsyncClose() throws Exception { 284 VThreadRunner.run(() -> { 285 try (var connection = new Connection()) { 286 Socket s = connection.socket1(); 287 288 // delayedclose of s 289 runAfterParkedAsync(s::close); 290 291 // write to s should block, then throw 292 try { 293 byte[] ba = new byte[100*1024]; 294 OutputStream out = s.getOutputStream(); 295 for (;;) { 296 out.write(ba); 297 } 298 } catch (SocketException expected) { } 299 } 300 }); 301 } 302 303 /** 304 * Virtual thread interrupted while blocked in Socket write. 305 */ 306 @Test 307 void testSocketWriteInterrupt() throws Exception { 308 VThreadRunner.run(() -> { 309 try (var connection = new Connection()) { 310 Socket s = connection.socket1(); 311 312 // delayed interrupt of current thread 313 Thread thisThread = Thread.currentThread(); 314 runAfterParkedAsync(thisThread::interrupt); 315 316 // write to s should block, then throw 317 try { 318 byte[] ba = new byte[100*1024]; 319 OutputStream out = s.getOutputStream(); 320 for (;;) { 321 out.write(ba); 322 } 323 } catch (SocketException expected) { 324 assertTrue(Thread.interrupted()); 325 assertTrue(s.isClosed()); 326 } 327 } 328 }); 329 } 330 331 /** 332 * Virtual thread reading urgent data when SO_OOBINLINE is enabled. 333 */ 334 @Test 335 void testSocketReadUrgentData() throws Exception { 336 VThreadRunner.run(() -> { 337 try (var connection = new Connection()) { 338 Socket s1 = connection.socket1(); 339 Socket s2 = connection.socket2(); 340 341 // urgent data should be received 342 runAfterParkedAsync(() -> s2.sendUrgentData('X')); 343 344 // read should block, then read the OOB byte 345 s1.setOOBInline(true); 346 byte[] ba = new byte[10]; 347 int n = s1.getInputStream().read(ba); 348 assertTrue(n == 1); 349 assertTrue(ba[0] == 'X'); 350 351 // urgent data should not be received 352 s1.setOOBInline(false); 353 s1.setSoTimeout(500); 354 s2.sendUrgentData('X'); 355 try { 356 s1.getInputStream().read(ba); 357 fail(); 358 } catch (SocketTimeoutException expected) { } 359 } 360 }); 361 } 362 363 /** 364 * ServerSocket accept, no blocking. 365 */ 366 @Test 367 void testServerSocketAccept1() throws Exception { 368 VThreadRunner.run(() -> { 369 try (var listener = new ServerSocket()) { 370 InetAddress loopback = InetAddress.getLoopbackAddress(); 371 listener.bind(new InetSocketAddress(loopback, 0)); 372 373 // establish connection 374 var socket1 = new Socket(loopback, listener.getLocalPort()); 375 376 // accept should not block 377 var socket2 = listener.accept(); 378 socket1.close(); 379 socket2.close(); 380 } 381 }); 382 } 383 384 /** 385 * Virtual thread blocks in accept. 386 */ 387 @Test 388 void testServerSocketAccept2() throws Exception { 389 testServerSocketAccept(0); 390 } 391 392 /** 393 * Virtual thread blocks in timed accept. 394 */ 395 @Test 396 void testServerSocketAccept3() throws Exception { 397 testServerSocketAccept(60_000); 398 } 399 400 void testServerSocketAccept(int timeout) throws Exception { 401 VThreadRunner.run(() -> { 402 try (var listener = new ServerSocket()) { 403 InetAddress loopback = InetAddress.getLoopbackAddress(); 404 listener.bind(new InetSocketAddress(loopback, 0)); 405 406 // schedule connect 407 var socket1 = new Socket(); 408 SocketAddress remote = listener.getLocalSocketAddress(); 409 runAfterParkedAsync(() -> socket1.connect(remote)); 410 411 // accept should block 412 if (timeout > 0) { 413 listener.setSoTimeout(timeout); 414 } 415 var socket2 = listener.accept(); 416 socket1.close(); 417 socket2.close(); 418 } 419 }); 420 } 421 422 /** 423 * ServerSocket close while virtual thread blocked in accept. 424 */ 425 @Test 426 void testServerSocketAcceptAsyncClose1() throws Exception { 427 testServerSocketAcceptAsyncClose(0); 428 } 429 430 /** 431 * ServerSocket close while virtual thread blocked in timed accept. 432 */ 433 @Test 434 void testServerSocketAcceptAsyncClose2() throws Exception { 435 testServerSocketAcceptAsyncClose(60_000); 436 } 437 438 void testServerSocketAcceptAsyncClose(int timeout) throws Exception { 439 VThreadRunner.run(() -> { 440 try (var listener = new ServerSocket()) { 441 InetAddress loopback = InetAddress.getLoopbackAddress(); 442 listener.bind(new InetSocketAddress(loopback, 0)); 443 444 // delayed close of listener 445 runAfterParkedAsync(listener::close); 446 447 // accept should block, then throw 448 if (timeout > 0) { 449 listener.setSoTimeout(timeout); 450 } 451 try { 452 listener.accept().close(); 453 fail("connection accepted???"); 454 } catch (SocketException expected) { } 455 } 456 }); 457 } 458 459 /** 460 * Virtual thread interrupted while blocked in ServerSocket accept. 461 */ 462 @Test 463 void testServerSocketAcceptInterrupt1() throws Exception { 464 testServerSocketAcceptInterrupt(0); 465 } 466 467 /** 468 * Virtual thread interrupted while blocked in ServerSocket accept with timeout. 469 */ 470 @Test 471 void testServerSocketAcceptInterrupt2() throws Exception { 472 testServerSocketAcceptInterrupt(60_000); 473 } 474 475 void testServerSocketAcceptInterrupt(int timeout) throws Exception { 476 VThreadRunner.run(() -> { 477 try (var listener = new ServerSocket()) { 478 InetAddress loopback = InetAddress.getLoopbackAddress(); 479 listener.bind(new InetSocketAddress(loopback, 0)); 480 481 // delayed interrupt of current thread 482 Thread thisThread = Thread.currentThread(); 483 runAfterParkedAsync(thisThread::interrupt); 484 485 // accept should block, then throw 486 if (timeout > 0) { 487 listener.setSoTimeout(timeout); 488 } 489 try { 490 listener.accept().close(); 491 fail("connection accepted???"); 492 } catch (SocketException expected) { 493 assertTrue(Thread.interrupted()); 494 assertTrue(listener.isClosed()); 495 } 496 } 497 }); 498 } 499 500 /** 501 * DatagramSocket receive/send, no blocking. 502 */ 503 @Test 504 void testDatagramSocketSendReceive1() throws Exception { 505 VThreadRunner.run(() -> { 506 try (DatagramSocket s1 = new DatagramSocket(null); 507 DatagramSocket s2 = new DatagramSocket(null)) { 508 509 InetAddress lh = InetAddress.getLoopbackAddress(); 510 s1.bind(new InetSocketAddress(lh, 0)); 511 s2.bind(new InetSocketAddress(lh, 0)); 512 513 // send should not block 514 byte[] bytes = "XXX".getBytes("UTF-8"); 515 DatagramPacket p1 = new DatagramPacket(bytes, bytes.length); 516 p1.setSocketAddress(s2.getLocalSocketAddress()); 517 s1.send(p1); 518 519 // receive should not block 520 byte[] ba = new byte[100]; 521 DatagramPacket p2 = new DatagramPacket(ba, ba.length); 522 s2.receive(p2); 523 assertEquals(s1.getLocalSocketAddress(), p2.getSocketAddress()); 524 assertTrue(ba[0] == 'X'); 525 } 526 }); 527 } 528 529 /** 530 * Virtual thread blocks in DatagramSocket receive. 531 */ 532 @Test 533 void testDatagramSocketSendReceive2() throws Exception { 534 testDatagramSocketSendReceive(0); 535 } 536 537 /** 538 * Virtual thread blocks in DatagramSocket receive with timeout. 539 */ 540 @Test 541 void testDatagramSocketSendReceive3() throws Exception { 542 testDatagramSocketSendReceive(60_000); 543 } 544 545 private void testDatagramSocketSendReceive(int timeout) throws Exception { 546 VThreadRunner.run(() -> { 547 try (DatagramSocket s1 = new DatagramSocket(null); 548 DatagramSocket s2 = new DatagramSocket(null)) { 549 550 InetAddress lh = InetAddress.getLoopbackAddress(); 551 s1.bind(new InetSocketAddress(lh, 0)); 552 s2.bind(new InetSocketAddress(lh, 0)); 553 554 // delayed send 555 byte[] bytes = "XXX".getBytes("UTF-8"); 556 DatagramPacket p1 = new DatagramPacket(bytes, bytes.length); 557 p1.setSocketAddress(s2.getLocalSocketAddress()); 558 runAfterParkedAsync(() -> s1.send(p1)); 559 560 // receive should block 561 if (timeout > 0) { 562 s2.setSoTimeout(timeout); 563 } 564 byte[] ba = new byte[100]; 565 DatagramPacket p2 = new DatagramPacket(ba, ba.length); 566 s2.receive(p2); 567 assertEquals(s1.getLocalSocketAddress(), p2.getSocketAddress()); 568 assertTrue(ba[0] == 'X'); 569 } 570 }); 571 } 572 573 /** 574 * Virtual thread blocks in DatagramSocket receive that times out. 575 */ 576 @Test 577 void testDatagramSocketReceiveTimeout() throws Exception { 578 VThreadRunner.run(() -> { 579 try (DatagramSocket s = new DatagramSocket(null)) { 580 InetAddress lh = InetAddress.getLoopbackAddress(); 581 s.bind(new InetSocketAddress(lh, 0)); 582 s.setSoTimeout(500); 583 byte[] ba = new byte[100]; 584 DatagramPacket p = new DatagramPacket(ba, ba.length); 585 try { 586 s.receive(p); 587 fail(); 588 } catch (SocketTimeoutException expected) { } 589 } 590 }); 591 } 592 593 /** 594 * DatagramSocket close while virtual thread blocked in receive. 595 */ 596 @Test 597 void testDatagramSocketReceiveAsyncClose1() throws Exception { 598 testDatagramSocketReceiveAsyncClose(0); 599 } 600 601 /** 602 * DatagramSocket close while virtual thread blocked with timeout. 603 */ 604 @Test 605 void testDatagramSocketReceiveAsyncClose2() throws Exception { 606 testDatagramSocketReceiveAsyncClose(60_000); 607 } 608 609 private void testDatagramSocketReceiveAsyncClose(int timeout) throws Exception { 610 VThreadRunner.run(() -> { 611 try (DatagramSocket s = new DatagramSocket(null)) { 612 InetAddress lh = InetAddress.getLoopbackAddress(); 613 s.bind(new InetSocketAddress(lh, 0)); 614 615 // delayed close of s 616 runAfterParkedAsync(s::close); 617 618 // receive should block, then throw 619 if (timeout > 0) { 620 s.setSoTimeout(timeout); 621 } 622 try { 623 byte[] ba = new byte[100]; 624 DatagramPacket p = new DatagramPacket(ba, ba.length); 625 s.receive(p); 626 fail(); 627 } catch (SocketException expected) { } 628 } 629 }); 630 } 631 632 /** 633 * Virtual thread interrupted while blocked in DatagramSocket receive. 634 */ 635 @Test 636 void testDatagramSocketReceiveInterrupt1() throws Exception { 637 testDatagramSocketReceiveInterrupt(0); 638 } 639 640 /** 641 * Virtual thread interrupted while blocked in DatagramSocket receive with timeout. 642 */ 643 @Test 644 void testDatagramSocketReceiveInterrupt2() throws Exception { 645 testDatagramSocketReceiveInterrupt(60_000); 646 } 647 648 private void testDatagramSocketReceiveInterrupt(int timeout) throws Exception { 649 VThreadRunner.run(() -> { 650 try (DatagramSocket s = new DatagramSocket(null)) { 651 InetAddress lh = InetAddress.getLoopbackAddress(); 652 s.bind(new InetSocketAddress(lh, 0)); 653 654 // delayed interrupt of current thread 655 Thread thisThread = Thread.currentThread(); 656 runAfterParkedAsync(thisThread::interrupt); 657 658 // receive should block, then throw 659 if (timeout > 0) { 660 s.setSoTimeout(timeout); 661 } 662 try { 663 byte[] ba = new byte[100]; 664 DatagramPacket p = new DatagramPacket(ba, ba.length); 665 s.receive(p); 666 fail(); 667 } catch (SocketException expected) { 668 assertTrue(Thread.interrupted()); 669 assertTrue(s.isClosed()); 670 } 671 } 672 }); 673 } 674 675 /** 676 * Creates a loopback connection 677 */ 678 static class Connection implements Closeable { 679 private final Socket s1; 680 private final Socket s2; 681 Connection() throws IOException { 682 var lh = InetAddress.getLoopbackAddress(); 683 try (var listener = new ServerSocket()) { 684 listener.bind(new InetSocketAddress(lh, 0)); 685 Socket s1 = new Socket(); 686 Socket s2; 687 try { 688 s1.connect(listener.getLocalSocketAddress()); 689 s2 = listener.accept(); 690 } catch (IOException ioe) { 691 s1.close(); 692 throw ioe; 693 } 694 this.s1 = s1; 695 this.s2 = s2; 696 } 697 698 } 699 Socket socket1() { 700 return s1; 701 } 702 Socket socket2() { 703 return s2; 704 } 705 @Override 706 public void close() throws IOException { 707 s1.close(); 708 s2.close(); 709 } 710 } 711 712 @FunctionalInterface 713 interface ThrowingRunnable { 714 void run() throws Exception; 715 } 716 717 /** 718 * Runs the given task asynchronously after the current virtual thread has parked. 719 * @return the thread started to run the task 720 */ 721 static Thread runAfterParkedAsync(ThrowingRunnable task) { 722 Thread target = Thread.currentThread(); 723 if (!target.isVirtual()) 724 throw new WrongThreadException(); 725 return Thread.ofPlatform().daemon().start(() -> { 726 try { 727 Thread.State state = target.getState(); 728 while (state != Thread.State.WAITING 729 && state != Thread.State.TIMED_WAITING) { 730 Thread.sleep(20); 731 state = target.getState(); 732 } 733 Thread.sleep(20); // give a bit more time to release carrier 734 task.run(); 735 } catch (Exception e) { 736 e.printStackTrace(); 737 } 738 }); 739 } 740 }