1 /* 2 * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test id=default 26 * @bug 8284161 27 * @summary Test virtual threads doing blocking I/O on java.net Sockets 28 * @library /test/lib 29 * @run junit BlockingSocketOps 30 */ 31 32 /* 33 * @test id=poller-modes 34 * @requires (os.family == "linux") | (os.family == "mac") 35 * @library /test/lib 36 * @run junit/othervm -Djdk.pollerMode=1 BlockingSocketOps 37 * @run junit/othervm -Djdk.pollerMode=2 BlockingSocketOps 38 * @run junit/othervm -Djdk.pollerMode=3 BlockingSocketOps 39 */ 40 41 /* 42 * @test id=io_uring 43 * @requires os.family == "linux" 44 * @library /test/lib 45 * @run junit/othervm -Djdk.pollerMode=1 -Djdk.io_uring=true BlockingSocketOps 46 * @run junit/othervm -Djdk.pollerMode=2 -Djdk.io_uring=true BlockingSocketOps 47 * @run junit/othervm -Djdk.pollerMode=3 -Djdk.io_uring=true BlockingSocketOps 48 * @run junit/othervm -Djdk.pollerMode=1 -Djdk.io_uring=true -Djdk.io_uring.sqpoll_idle=20 BlockingSocketOps 49 * @run junit/othervm -Djdk.pollerMode=2 -Djdk.io_uring=true -Djdk.io_uring.sqpoll_idle=20 BlockingSocketOps 50 * @run junit/othervm -Djdk.pollerMode=3 -Djdk.io_uring=true -Djdk.io_uring.sqpoll_idle=20 BlockingSocketOps 51 * @run junit/othervm -Djdk.pollerMode=1 -Djdk.io_uring=true -Djdk.io_uring.read=true -Djdk.io_uring.write=true BlockingSocketOps 52 * @run junit/othervm -Djdk.pollerMode=2 -Djdk.io_uring=true -Djdk.io_uring.read=true -Djdk.io_uring.write=true BlockingSocketOps 53 * @run junit/othervm -Djdk.pollerMode=3 -Djdk.io_uring=true -Djdk.io_uring.read=true -Djdk.io_uring.write=true BlockingSocketOps 54 * @run junit/othervm -Djdk.pollerMode=1 -Djdk.io_uring=true -Djdk.io_uring.read=true -Djdk.io_uring.write=true -Djdk.io_uring.sqpoll_idle=20 BlockingSocketOps 55 * @run junit/othervm -Djdk.pollerMode=2 -Djdk.io_uring=true -Djdk.io_uring.read=true -Djdk.io_uring.write=true -Djdk.io_uring.sqpoll_idle=20 BlockingSocketOps 56 * @run junit/othervm -Djdk.pollerMode=3 -Djdk.io_uring=true -Djdk.io_uring.read=true -Djdk.io_uring.write=true -Djdk.io_uring.sqpoll_idle=20 BlockingSocketOps 57 */ 58 59 /* 60 * @test id=no-vmcontinuations 61 * @requires vm.continuations 62 * @library /test/lib 63 * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingSocketOps 64 */ 65 66 import java.io.Closeable; 67 import java.io.IOException; 68 import java.io.InputStream; 69 import java.io.OutputStream; 70 import java.net.DatagramPacket; 71 import java.net.DatagramSocket; 72 import java.net.InetAddress; 73 import java.net.InetSocketAddress; 74 import java.net.ServerSocket; 75 import java.net.Socket; 76 import java.net.SocketAddress; 77 import java.net.SocketException; 78 import java.net.SocketTimeoutException; 79 80 import jdk.test.lib.thread.VThreadRunner; 81 import org.junit.jupiter.api.Test; 82 import static org.junit.jupiter.api.Assertions.*; 83 84 class BlockingSocketOps { 85 86 /** 87 * Socket read/write, no blocking. 88 */ 89 @Test 90 void testSocketReadWrite1() throws Exception { 91 VThreadRunner.run(() -> { 92 try (var connection = new Connection()) { 93 Socket s1 = connection.socket1(); 94 Socket s2 = connection.socket2(); 95 96 // write should not block 97 byte[] ba = "XXX".getBytes("UTF-8"); 98 s1.getOutputStream().write(ba); 99 100 // read should not block 101 ba = new byte[10]; 102 int n = s2.getInputStream().read(ba); 103 assertTrue(n > 0); 104 assertTrue(ba[0] == 'X'); 105 } 106 }); 107 } 108 109 /** 110 * Virtual thread blocks in read. 111 */ 112 @Test 113 void testSocketRead1() throws Exception { 114 testSocketRead(0); 115 } 116 117 /** 118 * Virtual thread blocks in timed read. 119 */ 120 @Test 121 void testSocketRead2() throws Exception { 122 testSocketRead(60_000); 123 } 124 125 void testSocketRead(int timeout) throws Exception { 126 VThreadRunner.run(() -> { 127 try (var connection = new Connection()) { 128 Socket s1 = connection.socket1(); 129 Socket s2 = connection.socket2(); 130 131 // delayed write from sc1 132 byte[] ba1 = "XXX".getBytes("UTF-8"); 133 runAfterParkedAsync(() -> s1.getOutputStream().write(ba1)); 134 135 // read from sc2 should block 136 if (timeout > 0) { 137 s2.setSoTimeout(timeout); 138 } 139 byte[] ba2 = new byte[10]; 140 int n = s2.getInputStream().read(ba2); 141 assertTrue(n > 0); 142 assertTrue(ba2[0] == 'X'); 143 } 144 }); 145 } 146 147 /** 148 * Virtual thread blocks in write. 149 */ 150 @Test 151 void testSocketWrite1() throws Exception { 152 VThreadRunner.run(() -> { 153 try (var connection = new Connection()) { 154 Socket s1 = connection.socket1(); 155 Socket s2 = connection.socket2(); 156 157 // delayed read from s2 to EOF 158 InputStream in = s2.getInputStream(); 159 Thread reader = runAfterParkedAsync(() -> 160 in.transferTo(OutputStream.nullOutputStream())); 161 162 // write should block 163 byte[] ba = new byte[100*1024]; 164 try (OutputStream out = s1.getOutputStream()) { 165 for (int i = 0; i < 1000; i++) { 166 out.write(ba); 167 } 168 } 169 170 // wait for reader to finish 171 reader.join(); 172 } 173 }); 174 } 175 176 /** 177 * Virtual thread blocks in read, peer closes connection gracefully. 178 */ 179 @Test 180 void testSocketReadPeerClose1() throws Exception { 181 VThreadRunner.run(() -> { 182 try (var connection = new Connection()) { 183 Socket s1 = connection.socket1(); 184 Socket s2 = connection.socket2(); 185 186 // delayed close of s2 187 runAfterParkedAsync(s2::close); 188 189 // read from s1 should block, then read -1 190 int n = s1.getInputStream().read(); 191 assertTrue(n == -1); 192 } 193 }); 194 } 195 196 /** 197 * Virtual thread blocks in read, peer closes connection abruptly. 198 */ 199 @Test 200 void testSocketReadPeerClose2() throws Exception { 201 VThreadRunner.run(() -> { 202 try (var connection = new Connection()) { 203 Socket s1 = connection.socket1(); 204 Socket s2 = connection.socket2(); 205 206 // delayed abrupt close of s2 207 s2.setSoLinger(true, 0); 208 runAfterParkedAsync(s2::close); 209 210 // read from s1 should block, then throw 211 try { 212 int n = s1.getInputStream().read(); 213 fail("read " + n); 214 } catch (IOException ioe) { 215 // expected 216 } 217 } 218 }); 219 } 220 221 /** 222 * Socket close while virtual thread blocked in read. 223 */ 224 @Test 225 void testSocketReadAsyncClose1() throws Exception { 226 testSocketReadAsyncClose(0); 227 } 228 229 /** 230 * Socket close while virtual thread blocked in timed read. 231 */ 232 @Test 233 void testSocketReadAsyncClose2() throws Exception { 234 testSocketReadAsyncClose(60_000); 235 } 236 237 void testSocketReadAsyncClose(int timeout) throws Exception { 238 VThreadRunner.run(() -> { 239 try (var connection = new Connection()) { 240 Socket s = connection.socket1(); 241 242 // delayed close of s 243 runAfterParkedAsync(s::close); 244 245 // read from s should block, then throw 246 if (timeout > 0) { 247 s.setSoTimeout(timeout); 248 } 249 try { 250 int n = s.getInputStream().read(); 251 fail("read " + n); 252 } catch (SocketException expected) { } 253 } 254 }); 255 } 256 257 /** 258 * Socket shutdownInput while virtual thread blocked in read. 259 */ 260 @Test 261 void testSocketReadAsyncShutdownInput1() throws Exception { 262 testSocketReadAsyncShutdownInput(0); 263 } 264 265 /** 266 * Socket shutdownInput while virtual thread blocked in timed read. 267 */ 268 @Test 269 void testSocketReadAsyncShutdownInput2() throws Exception { 270 testSocketReadAsyncShutdownInput(60_000); 271 } 272 273 void testSocketReadAsyncShutdownInput(int timeout) throws Exception { 274 VThreadRunner.run(() -> { 275 try (var connection = new Connection()) { 276 Socket s = connection.socket1(); 277 278 // delayed shutdown of s 279 runAfterParkedAsync(s::shutdownInput); 280 281 // read from s should block, then throw 282 if (timeout > 0) { 283 s.setSoTimeout(timeout); 284 } 285 286 // -1 or SocketException 287 try { 288 int n = s.getInputStream().read(); 289 assertEquals(-1, n); 290 } catch (SocketException e) { } 291 assertFalse(s.isClosed()); 292 } 293 }); 294 } 295 296 /** 297 * Virtual thread interrupted while blocked in Socket read. 298 */ 299 @Test 300 void testSocketReadInterrupt1() throws Exception { 301 testSocketReadInterrupt(0); 302 } 303 304 /** 305 * Virtual thread interrupted while blocked in Socket read with timeout 306 */ 307 @Test 308 void testSocketReadInterrupt2() throws Exception { 309 testSocketReadInterrupt(60_000); 310 } 311 312 void testSocketReadInterrupt(int timeout) throws Exception { 313 VThreadRunner.run(() -> { 314 try (var connection = new Connection()) { 315 Socket s = connection.socket1(); 316 317 318 // delayed interrupt of current thread 319 Thread thisThread = Thread.currentThread(); 320 runAfterParkedAsync(thisThread::interrupt); 321 322 // read from s should block, then throw 323 if (timeout > 0) { 324 s.setSoTimeout(timeout); 325 } 326 try { 327 int n = s.getInputStream().read(); 328 fail("read " + n); 329 } catch (SocketException expected) { 330 assertTrue(Thread.interrupted()); 331 assertTrue(s.isClosed()); 332 } 333 } 334 }); 335 } 336 337 /** 338 * Socket close while virtual thread blocked in write. 339 */ 340 @Test 341 void testSocketWriteAsyncClose() throws Exception { 342 VThreadRunner.run(() -> { 343 try (var connection = new Connection()) { 344 Socket s = connection.socket1(); 345 346 // delayed close of s 347 runAfterParkedAsync(s::close); 348 349 // write to s should block, then throw 350 try { 351 byte[] ba = new byte[100*1024]; 352 OutputStream out = s.getOutputStream(); 353 for (;;) { 354 out.write(ba); 355 } 356 } catch (SocketException expected) { } 357 } 358 }); 359 } 360 361 /** 362 * Socket shutdownOutput while virtual thread blocked in write. 363 */ 364 @Test 365 void testSocketWriteAsyncShutdownOutput() throws Exception { 366 VThreadRunner.run(() -> { 367 try (var connection = new Connection()) { 368 Socket s = connection.socket1(); 369 370 // delayed shutdown of s 371 runAfterParkedAsync(s::shutdownOutput); 372 373 // write to s should block, then throw 374 try { 375 byte[] ba = new byte[100*1024]; 376 OutputStream out = s.getOutputStream(); 377 for (;;) { 378 out.write(ba); 379 } 380 } catch (SocketException expected) { } 381 assertFalse(s.isClosed()); 382 } 383 }); 384 } 385 386 /** 387 * Virtual thread interrupted while blocked in Socket write. 388 */ 389 @Test 390 void testSocketWriteInterrupt() throws Exception { 391 VThreadRunner.run(() -> { 392 try (var connection = new Connection()) { 393 Socket s = connection.socket1(); 394 395 // delayed interrupt of current thread 396 Thread thisThread = Thread.currentThread(); 397 runAfterParkedAsync(thisThread::interrupt); 398 399 // write to s should block, then throw 400 try { 401 byte[] ba = new byte[100*1024]; 402 OutputStream out = s.getOutputStream(); 403 for (;;) { 404 out.write(ba); 405 } 406 } catch (SocketException expected) { 407 assertTrue(Thread.interrupted()); 408 assertTrue(s.isClosed()); 409 } 410 } 411 }); 412 } 413 414 /** 415 * Virtual thread reading urgent data when SO_OOBINLINE is enabled. 416 */ 417 @Test 418 void testSocketReadUrgentData() throws Exception { 419 VThreadRunner.run(() -> { 420 try (var connection = new Connection()) { 421 Socket s1 = connection.socket1(); 422 Socket s2 = connection.socket2(); 423 424 // urgent data should be received 425 runAfterParkedAsync(() -> s2.sendUrgentData('X')); 426 427 // read should block, then read the OOB byte 428 s1.setOOBInline(true); 429 byte[] ba = new byte[10]; 430 int n = s1.getInputStream().read(ba); 431 assertTrue(n == 1); 432 assertTrue(ba[0] == 'X'); 433 434 // urgent data should not be received 435 s1.setOOBInline(false); 436 s1.setSoTimeout(500); 437 s2.sendUrgentData('X'); 438 try { 439 s1.getInputStream().read(ba); 440 fail(); 441 } catch (SocketTimeoutException expected) { } 442 } 443 }); 444 } 445 446 /** 447 * ServerSocket accept, no blocking. 448 */ 449 @Test 450 void testServerSocketAccept1() throws Exception { 451 VThreadRunner.run(() -> { 452 try (var listener = new ServerSocket()) { 453 InetAddress loopback = InetAddress.getLoopbackAddress(); 454 listener.bind(new InetSocketAddress(loopback, 0)); 455 456 // establish connection 457 var socket1 = new Socket(loopback, listener.getLocalPort()); 458 459 // accept should not block 460 var socket2 = listener.accept(); 461 socket1.close(); 462 socket2.close(); 463 } 464 }); 465 } 466 467 /** 468 * Virtual thread blocks in accept. 469 */ 470 @Test 471 void testServerSocketAccept2() throws Exception { 472 testServerSocketAccept(0); 473 } 474 475 /** 476 * Virtual thread blocks in timed accept. 477 */ 478 @Test 479 void testServerSocketAccept3() throws Exception { 480 testServerSocketAccept(60_000); 481 } 482 483 void testServerSocketAccept(int timeout) throws Exception { 484 VThreadRunner.run(() -> { 485 try (var listener = new ServerSocket()) { 486 InetAddress loopback = InetAddress.getLoopbackAddress(); 487 listener.bind(new InetSocketAddress(loopback, 0)); 488 489 // schedule connect 490 var socket1 = new Socket(); 491 SocketAddress remote = listener.getLocalSocketAddress(); 492 runAfterParkedAsync(() -> socket1.connect(remote)); 493 494 // accept should block 495 if (timeout > 0) { 496 listener.setSoTimeout(timeout); 497 } 498 var socket2 = listener.accept(); 499 socket1.close(); 500 socket2.close(); 501 } 502 }); 503 } 504 505 /** 506 * ServerSocket close while virtual thread blocked in accept. 507 */ 508 @Test 509 void testServerSocketAcceptAsyncClose1() throws Exception { 510 testServerSocketAcceptAsyncClose(0); 511 } 512 513 /** 514 * ServerSocket close while virtual thread blocked in timed accept. 515 */ 516 @Test 517 void testServerSocketAcceptAsyncClose2() throws Exception { 518 testServerSocketAcceptAsyncClose(60_000); 519 } 520 521 void testServerSocketAcceptAsyncClose(int timeout) throws Exception { 522 VThreadRunner.run(() -> { 523 try (var listener = new ServerSocket()) { 524 InetAddress loopback = InetAddress.getLoopbackAddress(); 525 listener.bind(new InetSocketAddress(loopback, 0)); 526 527 // delayed close of listener 528 runAfterParkedAsync(listener::close); 529 530 // accept should block, then throw 531 if (timeout > 0) { 532 listener.setSoTimeout(timeout); 533 } 534 try { 535 listener.accept().close(); 536 fail("connection accepted???"); 537 } catch (SocketException expected) { } 538 } 539 }); 540 } 541 542 /** 543 * Virtual thread interrupted while blocked in ServerSocket accept. 544 */ 545 @Test 546 void testServerSocketAcceptInterrupt1() throws Exception { 547 testServerSocketAcceptInterrupt(0); 548 } 549 550 /** 551 * Virtual thread interrupted while blocked in ServerSocket accept with timeout. 552 */ 553 @Test 554 void testServerSocketAcceptInterrupt2() throws Exception { 555 testServerSocketAcceptInterrupt(60_000); 556 } 557 558 void testServerSocketAcceptInterrupt(int timeout) throws Exception { 559 VThreadRunner.run(() -> { 560 try (var listener = new ServerSocket()) { 561 InetAddress loopback = InetAddress.getLoopbackAddress(); 562 listener.bind(new InetSocketAddress(loopback, 0)); 563 564 // delayed interrupt of current thread 565 Thread thisThread = Thread.currentThread(); 566 runAfterParkedAsync(thisThread::interrupt); 567 568 // accept should block, then throw 569 if (timeout > 0) { 570 listener.setSoTimeout(timeout); 571 } 572 try { 573 listener.accept().close(); 574 fail("connection accepted???"); 575 } catch (SocketException expected) { 576 assertTrue(Thread.interrupted()); 577 assertTrue(listener.isClosed()); 578 } 579 } 580 }); 581 } 582 583 /** 584 * DatagramSocket receive/send, no blocking. 585 */ 586 @Test 587 void testDatagramSocketSendReceive1() throws Exception { 588 VThreadRunner.run(() -> { 589 try (DatagramSocket s1 = new DatagramSocket(null); 590 DatagramSocket s2 = new DatagramSocket(null)) { 591 592 InetAddress lh = InetAddress.getLoopbackAddress(); 593 s1.bind(new InetSocketAddress(lh, 0)); 594 s2.bind(new InetSocketAddress(lh, 0)); 595 596 // send should not block 597 byte[] bytes = "XXX".getBytes("UTF-8"); 598 DatagramPacket p1 = new DatagramPacket(bytes, bytes.length); 599 p1.setSocketAddress(s2.getLocalSocketAddress()); 600 s1.send(p1); 601 602 // receive should not block 603 byte[] ba = new byte[100]; 604 DatagramPacket p2 = new DatagramPacket(ba, ba.length); 605 s2.receive(p2); 606 assertEquals(s1.getLocalSocketAddress(), p2.getSocketAddress()); 607 assertTrue(ba[0] == 'X'); 608 } 609 }); 610 } 611 612 /** 613 * Virtual thread blocks in DatagramSocket receive. 614 */ 615 @Test 616 void testDatagramSocketSendReceive2() throws Exception { 617 testDatagramSocketSendReceive(0); 618 } 619 620 /** 621 * Virtual thread blocks in DatagramSocket receive with timeout. 622 */ 623 @Test 624 void testDatagramSocketSendReceive3() throws Exception { 625 testDatagramSocketSendReceive(60_000); 626 } 627 628 private void testDatagramSocketSendReceive(int timeout) throws Exception { 629 VThreadRunner.run(() -> { 630 try (DatagramSocket s1 = new DatagramSocket(null); 631 DatagramSocket s2 = new DatagramSocket(null)) { 632 633 InetAddress lh = InetAddress.getLoopbackAddress(); 634 s1.bind(new InetSocketAddress(lh, 0)); 635 s2.bind(new InetSocketAddress(lh, 0)); 636 637 // delayed send 638 byte[] bytes = "XXX".getBytes("UTF-8"); 639 DatagramPacket p1 = new DatagramPacket(bytes, bytes.length); 640 p1.setSocketAddress(s2.getLocalSocketAddress()); 641 runAfterParkedAsync(() -> s1.send(p1)); 642 643 // receive should block 644 if (timeout > 0) { 645 s2.setSoTimeout(timeout); 646 } 647 byte[] ba = new byte[100]; 648 DatagramPacket p2 = new DatagramPacket(ba, ba.length); 649 s2.receive(p2); 650 assertEquals(s1.getLocalSocketAddress(), p2.getSocketAddress()); 651 assertTrue(ba[0] == 'X'); 652 } 653 }); 654 } 655 656 /** 657 * Virtual thread blocks in DatagramSocket receive that times out. 658 */ 659 @Test 660 void testDatagramSocketReceiveTimeout() throws Exception { 661 VThreadRunner.run(() -> { 662 try (DatagramSocket s = new DatagramSocket(null)) { 663 InetAddress lh = InetAddress.getLoopbackAddress(); 664 s.bind(new InetSocketAddress(lh, 0)); 665 s.setSoTimeout(500); 666 byte[] ba = new byte[100]; 667 DatagramPacket p = new DatagramPacket(ba, ba.length); 668 try { 669 s.receive(p); 670 fail(); 671 } catch (SocketTimeoutException expected) { } 672 } 673 }); 674 } 675 676 /** 677 * DatagramSocket close while virtual thread blocked in receive. 678 */ 679 @Test 680 void testDatagramSocketReceiveAsyncClose1() throws Exception { 681 testDatagramSocketReceiveAsyncClose(0); 682 } 683 684 /** 685 * DatagramSocket close while virtual thread blocked with timeout. 686 */ 687 @Test 688 void testDatagramSocketReceiveAsyncClose2() throws Exception { 689 testDatagramSocketReceiveAsyncClose(60_000); 690 } 691 692 private void testDatagramSocketReceiveAsyncClose(int timeout) throws Exception { 693 VThreadRunner.run(() -> { 694 try (DatagramSocket s = new DatagramSocket(null)) { 695 InetAddress lh = InetAddress.getLoopbackAddress(); 696 s.bind(new InetSocketAddress(lh, 0)); 697 698 // delayed close of s 699 runAfterParkedAsync(s::close); 700 701 // receive should block, then throw 702 if (timeout > 0) { 703 s.setSoTimeout(timeout); 704 } 705 try { 706 byte[] ba = new byte[100]; 707 DatagramPacket p = new DatagramPacket(ba, ba.length); 708 s.receive(p); 709 fail(); 710 } catch (SocketException expected) { } 711 } 712 }); 713 } 714 715 /** 716 * Virtual thread interrupted while blocked in DatagramSocket receive. 717 */ 718 @Test 719 void testDatagramSocketReceiveInterrupt1() throws Exception { 720 testDatagramSocketReceiveInterrupt(0); 721 } 722 723 /** 724 * Virtual thread interrupted while blocked in DatagramSocket receive with timeout. 725 */ 726 @Test 727 void testDatagramSocketReceiveInterrupt2() throws Exception { 728 testDatagramSocketReceiveInterrupt(60_000); 729 } 730 731 private void testDatagramSocketReceiveInterrupt(int timeout) throws Exception { 732 VThreadRunner.run(() -> { 733 try (DatagramSocket s = new DatagramSocket(null)) { 734 InetAddress lh = InetAddress.getLoopbackAddress(); 735 s.bind(new InetSocketAddress(lh, 0)); 736 737 // delayed interrupt of current thread 738 Thread thisThread = Thread.currentThread(); 739 runAfterParkedAsync(thisThread::interrupt); 740 741 // receive should block, then throw 742 if (timeout > 0) { 743 s.setSoTimeout(timeout); 744 } 745 try { 746 byte[] ba = new byte[100]; 747 DatagramPacket p = new DatagramPacket(ba, ba.length); 748 s.receive(p); 749 fail(); 750 } catch (SocketException expected) { 751 assertTrue(Thread.interrupted()); 752 assertTrue(s.isClosed()); 753 } 754 } 755 }); 756 } 757 758 /** 759 * Creates a loopback connection 760 */ 761 static class Connection implements Closeable { 762 private final Socket s1; 763 private final Socket s2; 764 Connection() throws IOException { 765 var lh = InetAddress.getLoopbackAddress(); 766 try (var listener = new ServerSocket()) { 767 listener.bind(new InetSocketAddress(lh, 0)); 768 Socket s1 = new Socket(); 769 Socket s2; 770 try { 771 s1.connect(listener.getLocalSocketAddress()); 772 s2 = listener.accept(); 773 } catch (IOException ioe) { 774 s1.close(); 775 throw ioe; 776 } 777 this.s1 = s1; 778 this.s2 = s2; 779 } 780 781 } 782 Socket socket1() { 783 return s1; 784 } 785 Socket socket2() { 786 return s2; 787 } 788 @Override 789 public void close() throws IOException { 790 s1.close(); 791 s2.close(); 792 } 793 } 794 795 @FunctionalInterface 796 interface ThrowingRunnable { 797 void run() throws Exception; 798 } 799 800 /** 801 * Runs the given task asynchronously after the current virtual thread has parked. 802 * @return the thread started to run the task 803 */ 804 static Thread runAfterParkedAsync(ThrowingRunnable task) { 805 Thread target = Thread.currentThread(); 806 if (!target.isVirtual()) 807 throw new WrongThreadException(); 808 return Thread.ofPlatform().daemon().start(() -> { 809 try { 810 Thread.State state = target.getState(); 811 while (state != Thread.State.WAITING 812 && state != Thread.State.TIMED_WAITING) { 813 Thread.sleep(20); 814 state = target.getState(); 815 } 816 Thread.sleep(20); // give a bit more time to release carrier 817 task.run(); 818 } catch (Exception e) { 819 e.printStackTrace(); 820 } 821 }); 822 } 823 }