1 /* 2 * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /** 25 * @test id=default 26 * @bug 8284161 27 * @summary Test virtual threads doing blocking I/O on NIO channels 28 * @library /test/lib 29 * @run junit BlockingChannelOps 30 */ 31 32 /** 33 * @test id=poller-modes 34 * @requires (os.family == "linux") | (os.family == "mac") 35 * @library /test/lib 36 * @run junit/othervm -Djdk.pollerMode=1 BlockingChannelOps 37 * @run junit/othervm -Djdk.pollerMode=2 BlockingChannelOps 38 */ 39 40 /** 41 * @test id=no-vmcontinuations 42 * @requires vm.continuations 43 * @library /test/lib 44 * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps 45 */ 46 47 import java.io.Closeable; 48 import java.io.IOException; 49 import java.net.DatagramPacket; 50 import java.net.InetAddress; 51 import java.net.InetSocketAddress; 52 import java.net.Socket; 53 import java.net.SocketAddress; 54 import java.net.SocketException; 55 import java.nio.ByteBuffer; 56 import java.nio.channels.AsynchronousCloseException; 57 import java.nio.channels.ClosedByInterruptException; 58 import java.nio.channels.ClosedChannelException; 59 import java.nio.channels.DatagramChannel; 60 import java.nio.channels.Pipe; 61 import java.nio.channels.ReadableByteChannel; 62 import java.nio.channels.ServerSocketChannel; 63 import java.nio.channels.SocketChannel; 64 import java.nio.channels.WritableByteChannel; 65 66 import jdk.test.lib.thread.VThreadRunner; 67 import org.junit.jupiter.api.Test; 68 import static org.junit.jupiter.api.Assertions.*; 69 70 class BlockingChannelOps { 71 72 /** 73 * SocketChannel read/write, no blocking. 74 */ 75 @Test 76 void testSocketChannelReadWrite1() throws Exception { 77 VThreadRunner.run(() -> { 78 try (var connection = new Connection()) { 79 SocketChannel sc1 = connection.channel1(); 80 SocketChannel sc2 = connection.channel2(); 81 82 // write to sc1 83 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 84 int n = sc1.write(bb); 85 assertTrue(n > 0); 86 87 // read from sc2 should not block 88 bb = ByteBuffer.allocate(10); 89 n = sc2.read(bb); 90 assertTrue(n > 0); 91 assertTrue(bb.get(0) == 'X'); 92 } 93 }); 94 } 95 96 /** 97 * Virtual thread blocks in SocketChannel read. 98 */ 99 @Test 100 void testSocketChannelRead() throws Exception { 101 VThreadRunner.run(() -> { 102 try (var connection = new Connection()) { 103 SocketChannel sc1 = connection.channel1(); 104 SocketChannel sc2 = connection.channel2(); 105 106 // write to sc1 when current thread blocks in sc2.read 107 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 108 runAfterParkedAsync(() -> sc1.write(bb1)); 109 110 // read from sc2 should block 111 ByteBuffer bb2 = ByteBuffer.allocate(10); 112 int n = sc2.read(bb2); 113 assertTrue(n > 0); 114 assertTrue(bb2.get(0) == 'X'); 115 } 116 }); 117 } 118 119 /** 120 * Virtual thread blocks in SocketChannel write. 121 */ 122 @Test 123 void testSocketChannelWrite() throws Exception { 124 VThreadRunner.run(() -> { 125 try (var connection = new Connection()) { 126 SocketChannel sc1 = connection.channel1(); 127 SocketChannel sc2 = connection.channel2(); 128 129 // read from sc2 to EOF when current thread blocks in sc1.write 130 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2)); 131 132 // write to sc1 should block 133 ByteBuffer bb = ByteBuffer.allocate(100*1024); 134 for (int i=0; i<1000; i++) { 135 int n = sc1.write(bb); 136 assertTrue(n > 0); 137 bb.clear(); 138 } 139 sc1.close(); 140 141 // wait for reader to finish 142 reader.join(); 143 } 144 }); 145 } 146 147 /** 148 * SocketChannel close while virtual thread blocked in read. 149 */ 150 @Test 151 void testSocketChannelReadAsyncClose() throws Exception { 152 VThreadRunner.run(() -> { 153 try (var connection = new Connection()) { 154 SocketChannel sc = connection.channel1(); 155 runAfterParkedAsync(sc::close); 156 try { 157 int n = sc.read(ByteBuffer.allocate(100)); 158 fail("read returned " + n); 159 } catch (AsynchronousCloseException expected) { } 160 } 161 }); 162 } 163 164 /** 165 * Virtual thread interrupted while blocked in SocketChannel read. 166 */ 167 @Test 168 void testSocketChannelReadInterrupt() throws Exception { 169 VThreadRunner.run(() -> { 170 try (var connection = new Connection()) { 171 SocketChannel sc = connection.channel1(); 172 173 // interrupt current thread when it blocks in read 174 Thread thisThread = Thread.currentThread(); 175 runAfterParkedAsync(thisThread::interrupt); 176 177 try { 178 int n = sc.read(ByteBuffer.allocate(100)); 179 fail("read returned " + n); 180 } catch (ClosedByInterruptException expected) { 181 assertTrue(Thread.interrupted()); 182 } 183 } 184 }); 185 } 186 187 /** 188 * SocketChannel close while virtual thread blocked in write. 189 */ 190 @Test 191 void testSocketChannelWriteAsyncClose() throws Exception { 192 VThreadRunner.run(() -> { 193 boolean retry = true; 194 while (retry) { 195 try (var connection = new Connection()) { 196 SocketChannel sc = connection.channel1(); 197 198 // close sc when current thread blocks in write 199 runAfterParkedAsync(sc::close); 200 try { 201 ByteBuffer bb = ByteBuffer.allocate(100*1024); 202 for (;;) { 203 int n = sc.write(bb); 204 assertTrue(n > 0); 205 bb.clear(); 206 } 207 } catch (AsynchronousCloseException expected) { 208 // closed when blocked in write 209 retry = false; 210 } catch (ClosedChannelException e) { 211 // closed when not blocked in write, need to retry test 212 } 213 } 214 } 215 }); 216 } 217 218 /** 219 * Virtual thread interrupted while blocked in SocketChannel write. 220 */ 221 @Test 222 void testSocketChannelWriteInterrupt() throws Exception { 223 VThreadRunner.run(() -> { 224 boolean retry = true; 225 while (retry) { 226 try (var connection = new Connection()) { 227 SocketChannel sc = connection.channel1(); 228 229 // interrupt current thread when it blocks in write 230 Thread thisThread = Thread.currentThread(); 231 runAfterParkedAsync(thisThread::interrupt); 232 233 try { 234 ByteBuffer bb = ByteBuffer.allocate(100*1024); 235 for (;;) { 236 int n = sc.write(bb); 237 assertTrue(n > 0); 238 bb.clear(); 239 } 240 } catch (ClosedByInterruptException e) { 241 // closed when blocked in write 242 assertTrue(Thread.interrupted()); 243 retry = false; 244 } catch (ClosedChannelException e) { 245 // closed when not blocked in write, need to retry test 246 } 247 } 248 } 249 }); 250 } 251 252 /** 253 * Virtual thread blocks in SocketChannel adaptor read. 254 */ 255 @Test 256 void testSocketAdaptorRead1() throws Exception { 257 testSocketAdaptorRead(0); 258 } 259 260 /** 261 * Virtual thread blocks in SocketChannel adaptor read with timeout. 262 */ 263 @Test 264 void testSocketAdaptorRead2() throws Exception { 265 testSocketAdaptorRead(60_000); 266 } 267 268 private void testSocketAdaptorRead(int timeout) throws Exception { 269 VThreadRunner.run(() -> { 270 try (var connection = new Connection()) { 271 SocketChannel sc1 = connection.channel1(); 272 SocketChannel sc2 = connection.channel2(); 273 274 // write to sc1 when currnet thread blocks reading from sc2 275 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 276 runAfterParkedAsync(() -> sc1.write(bb)); 277 278 // read from sc2 should block 279 byte[] array = new byte[100]; 280 if (timeout > 0) 281 sc2.socket().setSoTimeout(timeout); 282 int n = sc2.socket().getInputStream().read(array); 283 assertTrue(n > 0); 284 assertTrue(array[0] == 'X'); 285 } 286 }); 287 } 288 289 /** 290 * ServerSocketChannel accept, no blocking. 291 */ 292 @Test 293 void testServerSocketChannelAccept1() throws Exception { 294 VThreadRunner.run(() -> { 295 try (var ssc = ServerSocketChannel.open()) { 296 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 297 var sc1 = SocketChannel.open(ssc.getLocalAddress()); 298 // accept should not block 299 var sc2 = ssc.accept(); 300 sc1.close(); 301 sc2.close(); 302 } 303 }); 304 } 305 306 /** 307 * Virtual thread blocks in ServerSocketChannel accept. 308 */ 309 @Test 310 void testServerSocketChannelAccept2() throws Exception { 311 VThreadRunner.run(() -> { 312 try (var ssc = ServerSocketChannel.open()) { 313 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 314 var sc1 = SocketChannel.open(); 315 316 // connect when current thread when it blocks in accept 317 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress())); 318 319 // accept should block 320 var sc2 = ssc.accept(); 321 sc1.close(); 322 sc2.close(); 323 } 324 }); 325 } 326 327 /** 328 * SeverSocketChannel close while virtual thread blocked in accept. 329 */ 330 @Test 331 void testServerSocketChannelAcceptAsyncClose() throws Exception { 332 VThreadRunner.run(() -> { 333 try (var ssc = ServerSocketChannel.open()) { 334 InetAddress lh = InetAddress.getLoopbackAddress(); 335 ssc.bind(new InetSocketAddress(lh, 0)); 336 runAfterParkedAsync(ssc::close); 337 try { 338 SocketChannel sc = ssc.accept(); 339 sc.close(); 340 fail("connection accepted???"); 341 } catch (AsynchronousCloseException expected) { } 342 } 343 }); 344 } 345 346 /** 347 * Virtual thread interrupted while blocked in ServerSocketChannel accept. 348 */ 349 @Test 350 void testServerSocketChannelAcceptInterrupt() throws Exception { 351 VThreadRunner.run(() -> { 352 try (var ssc = ServerSocketChannel.open()) { 353 InetAddress lh = InetAddress.getLoopbackAddress(); 354 ssc.bind(new InetSocketAddress(lh, 0)); 355 356 // interrupt current thread when it blocks in accept 357 Thread thisThread = Thread.currentThread(); 358 runAfterParkedAsync(thisThread::interrupt); 359 360 try { 361 SocketChannel sc = ssc.accept(); 362 sc.close(); 363 fail("connection accepted???"); 364 } catch (ClosedByInterruptException expected) { 365 assertTrue(Thread.interrupted()); 366 } 367 } 368 }); 369 } 370 371 /** 372 * Virtual thread blocks in ServerSocketChannel adaptor accept. 373 */ 374 @Test 375 void testSocketChannelAdaptorAccept1() throws Exception { 376 testSocketChannelAdaptorAccept(0); 377 } 378 379 /** 380 * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout. 381 */ 382 @Test 383 void testSocketChannelAdaptorAccept2() throws Exception { 384 testSocketChannelAdaptorAccept(60_000); 385 } 386 387 private void testSocketChannelAdaptorAccept(int timeout) throws Exception { 388 VThreadRunner.run(() -> { 389 try (var ssc = ServerSocketChannel.open()) { 390 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 391 var sc = SocketChannel.open(); 392 393 // interrupt current thread when it blocks in accept 394 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress())); 395 396 // accept should block 397 if (timeout > 0) 398 ssc.socket().setSoTimeout(timeout); 399 Socket s = ssc.socket().accept(); 400 sc.close(); 401 s.close(); 402 } 403 }); 404 } 405 406 /** 407 * DatagramChannel receive/send, no blocking. 408 */ 409 @Test 410 void testDatagramChannelSendReceive1() throws Exception { 411 VThreadRunner.run(() -> { 412 try (DatagramChannel dc1 = DatagramChannel.open(); 413 DatagramChannel dc2 = DatagramChannel.open()) { 414 415 InetAddress lh = InetAddress.getLoopbackAddress(); 416 dc2.bind(new InetSocketAddress(lh, 0)); 417 418 // send should not block 419 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 420 int n = dc1.send(bb, dc2.getLocalAddress()); 421 assertTrue(n > 0); 422 423 // receive should not block 424 bb = ByteBuffer.allocate(10); 425 dc2.receive(bb); 426 assertTrue(bb.get(0) == 'X'); 427 } 428 }); 429 } 430 431 /** 432 * Virtual thread blocks in DatagramChannel receive. 433 */ 434 @Test 435 void testDatagramChannelSendReceive2() throws Exception { 436 VThreadRunner.run(() -> { 437 try (DatagramChannel dc1 = DatagramChannel.open(); 438 DatagramChannel dc2 = DatagramChannel.open()) { 439 440 InetAddress lh = InetAddress.getLoopbackAddress(); 441 dc2.bind(new InetSocketAddress(lh, 0)); 442 443 // send from dc1 when current thread blocked in dc2.receive 444 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 445 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress())); 446 447 // read from dc2 should block 448 ByteBuffer bb2 = ByteBuffer.allocate(10); 449 dc2.receive(bb2); 450 assertTrue(bb2.get(0) == 'X'); 451 } 452 }); 453 } 454 455 /** 456 * DatagramChannel close while virtual thread blocked in receive. 457 */ 458 @Test 459 void testDatagramChannelReceiveAsyncClose() throws Exception { 460 VThreadRunner.run(() -> { 461 try (DatagramChannel dc = DatagramChannel.open()) { 462 InetAddress lh = InetAddress.getLoopbackAddress(); 463 dc.bind(new InetSocketAddress(lh, 0)); 464 runAfterParkedAsync(dc::close); 465 try { 466 dc.receive(ByteBuffer.allocate(100)); 467 fail("receive returned"); 468 } catch (AsynchronousCloseException expected) { } 469 } 470 }); 471 } 472 473 /** 474 * Virtual thread interrupted while blocked in DatagramChannel receive. 475 */ 476 @Test 477 void testDatagramChannelReceiveInterrupt() throws Exception { 478 VThreadRunner.run(() -> { 479 try (DatagramChannel dc = DatagramChannel.open()) { 480 InetAddress lh = InetAddress.getLoopbackAddress(); 481 dc.bind(new InetSocketAddress(lh, 0)); 482 483 // interrupt current thread when it blocks in receive 484 Thread thisThread = Thread.currentThread(); 485 runAfterParkedAsync(thisThread::interrupt); 486 487 try { 488 dc.receive(ByteBuffer.allocate(100)); 489 fail("receive returned"); 490 } catch (ClosedByInterruptException expected) { 491 assertTrue(Thread.interrupted()); 492 } 493 } 494 }); 495 } 496 497 /** 498 * Virtual thread blocks in DatagramSocket adaptor receive. 499 */ 500 @Test 501 void testDatagramSocketAdaptorReceive1() throws Exception { 502 testDatagramSocketAdaptorReceive(0); 503 } 504 505 /** 506 * Virtual thread blocks in DatagramSocket adaptor receive with timeout. 507 */ 508 @Test 509 void testDatagramSocketAdaptorReceive2() throws Exception { 510 testDatagramSocketAdaptorReceive(60_000); 511 } 512 513 private void testDatagramSocketAdaptorReceive(int timeout) throws Exception { 514 VThreadRunner.run(() -> { 515 try (DatagramChannel dc1 = DatagramChannel.open(); 516 DatagramChannel dc2 = DatagramChannel.open()) { 517 518 InetAddress lh = InetAddress.getLoopbackAddress(); 519 dc2.bind(new InetSocketAddress(lh, 0)); 520 521 // send from dc1 when current thread blocks in dc2 receive 522 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 523 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress())); 524 525 // receive should block 526 byte[] array = new byte[100]; 527 DatagramPacket p = new DatagramPacket(array, 0, array.length); 528 if (timeout > 0) 529 dc2.socket().setSoTimeout(timeout); 530 dc2.socket().receive(p); 531 assertTrue(p.getLength() == 3 && array[0] == 'X'); 532 } 533 }); 534 } 535 536 /** 537 * DatagramChannel close while virtual thread blocked in adaptor receive. 538 */ 539 @Test 540 void testDatagramSocketAdaptorReceiveAsyncClose1() throws Exception { 541 testDatagramSocketAdaptorReceiveAsyncClose(0); 542 } 543 544 /** 545 * DatagramChannel close while virtual thread blocked in adaptor receive 546 * with timeout. 547 */ 548 @Test 549 void testDatagramSocketAdaptorReceiveAsyncClose2() throws Exception { 550 testDatagramSocketAdaptorReceiveAsyncClose(60_1000); 551 } 552 553 private void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception { 554 VThreadRunner.run(() -> { 555 try (DatagramChannel dc = DatagramChannel.open()) { 556 InetAddress lh = InetAddress.getLoopbackAddress(); 557 dc.bind(new InetSocketAddress(lh, 0)); 558 559 byte[] array = new byte[100]; 560 DatagramPacket p = new DatagramPacket(array, 0, array.length); 561 if (timeout > 0) 562 dc.socket().setSoTimeout(timeout); 563 564 // close channel/socket when current thread blocks in receive 565 runAfterParkedAsync(dc::close); 566 567 assertThrows(SocketException.class, () -> dc.socket().receive(p)); 568 } 569 }); 570 } 571 572 /** 573 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive. 574 */ 575 @Test 576 void testDatagramSocketAdaptorReceiveInterrupt1() throws Exception { 577 testDatagramSocketAdaptorReceiveInterrupt(0); 578 } 579 580 /** 581 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive 582 * with timeout. 583 */ 584 @Test 585 void testDatagramSocketAdaptorReceiveInterrupt2() throws Exception { 586 testDatagramSocketAdaptorReceiveInterrupt(60_1000); 587 } 588 589 private void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception { 590 VThreadRunner.run(() -> { 591 try (DatagramChannel dc = DatagramChannel.open()) { 592 InetAddress lh = InetAddress.getLoopbackAddress(); 593 dc.bind(new InetSocketAddress(lh, 0)); 594 595 byte[] array = new byte[100]; 596 DatagramPacket p = new DatagramPacket(array, 0, array.length); 597 if (timeout > 0) 598 dc.socket().setSoTimeout(timeout); 599 600 // interrupt current thread when it blocks in receive 601 Thread thisThread = Thread.currentThread(); 602 runAfterParkedAsync(thisThread::interrupt); 603 604 try { 605 dc.socket().receive(p); 606 fail(); 607 } catch (ClosedByInterruptException expected) { 608 assertTrue(Thread.interrupted()); 609 } 610 } 611 }); 612 } 613 614 /** 615 * Pipe read/write, no blocking. 616 */ 617 @Test 618 void testPipeReadWrite1() throws Exception { 619 VThreadRunner.run(() -> { 620 Pipe p = Pipe.open(); 621 try (Pipe.SinkChannel sink = p.sink(); 622 Pipe.SourceChannel source = p.source()) { 623 624 // write should not block 625 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 626 int n = sink.write(bb); 627 assertTrue(n > 0); 628 629 // read should not block 630 bb = ByteBuffer.allocate(10); 631 n = source.read(bb); 632 assertTrue(n > 0); 633 assertTrue(bb.get(0) == 'X'); 634 } 635 }); 636 } 637 638 /** 639 * Virtual thread blocks in Pipe.SourceChannel read. 640 */ 641 @Test 642 void testPipeReadWrite2() throws Exception { 643 VThreadRunner.run(() -> { 644 Pipe p = Pipe.open(); 645 try (Pipe.SinkChannel sink = p.sink(); 646 Pipe.SourceChannel source = p.source()) { 647 648 // write from sink when current thread blocks reading from source 649 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 650 runAfterParkedAsync(() -> sink.write(bb1)); 651 652 // read should block 653 ByteBuffer bb2 = ByteBuffer.allocate(10); 654 int n = source.read(bb2); 655 assertTrue(n > 0); 656 assertTrue(bb2.get(0) == 'X'); 657 } 658 }); 659 } 660 661 /** 662 * Virtual thread blocks in Pipe.SinkChannel write. 663 */ 664 @Test 665 void testPipeReadWrite3() throws Exception { 666 VThreadRunner.run(() -> { 667 Pipe p = Pipe.open(); 668 try (Pipe.SinkChannel sink = p.sink(); 669 Pipe.SourceChannel source = p.source()) { 670 671 // read from source to EOF when current thread blocking in write 672 Thread reader = runAfterParkedAsync(() -> readToEOF(source)); 673 674 // write to sink should block 675 ByteBuffer bb = ByteBuffer.allocate(100*1024); 676 for (int i=0; i<1000; i++) { 677 int n = sink.write(bb); 678 assertTrue(n > 0); 679 bb.clear(); 680 } 681 sink.close(); 682 683 // wait for reader to finish 684 reader.join(); 685 } 686 }); 687 } 688 689 /** 690 * Pipe.SourceChannel close while virtual thread blocked in read. 691 */ 692 @Test 693 void testPipeReadAsyncClose() throws Exception { 694 VThreadRunner.run(() -> { 695 Pipe p = Pipe.open(); 696 try (Pipe.SinkChannel sink = p.sink(); 697 Pipe.SourceChannel source = p.source()) { 698 runAfterParkedAsync(source::close); 699 try { 700 int n = source.read(ByteBuffer.allocate(100)); 701 fail("read returned " + n); 702 } catch (AsynchronousCloseException expected) { } 703 } 704 }); 705 } 706 707 /** 708 * Virtual thread interrupted while blocked in Pipe.SourceChannel read. 709 */ 710 @Test 711 void testPipeReadInterrupt() throws Exception { 712 VThreadRunner.run(() -> { 713 Pipe p = Pipe.open(); 714 try (Pipe.SinkChannel sink = p.sink(); 715 Pipe.SourceChannel source = p.source()) { 716 717 // interrupt current thread when it blocks reading from source 718 Thread thisThread = Thread.currentThread(); 719 runAfterParkedAsync(thisThread::interrupt); 720 721 try { 722 int n = source.read(ByteBuffer.allocate(100)); 723 fail("read returned " + n); 724 } catch (ClosedByInterruptException expected) { 725 assertTrue(Thread.interrupted()); 726 } 727 } 728 }); 729 } 730 731 /** 732 * Pipe.SinkChannel close while virtual thread blocked in write. 733 */ 734 @Test 735 void testPipeWriteAsyncClose() throws Exception { 736 VThreadRunner.run(() -> { 737 boolean retry = true; 738 while (retry) { 739 Pipe p = Pipe.open(); 740 try (Pipe.SinkChannel sink = p.sink(); 741 Pipe.SourceChannel source = p.source()) { 742 743 // close sink when current thread blocks in write 744 runAfterParkedAsync(sink::close); 745 try { 746 ByteBuffer bb = ByteBuffer.allocate(100*1024); 747 for (;;) { 748 int n = sink.write(bb); 749 assertTrue(n > 0); 750 bb.clear(); 751 } 752 } catch (AsynchronousCloseException e) { 753 // closed when blocked in write 754 retry = false; 755 } catch (ClosedChannelException e) { 756 // closed when not blocked in write, need to retry test 757 } 758 } 759 } 760 }); 761 } 762 763 /** 764 * Virtual thread interrupted while blocked in Pipe.SinkChannel write. 765 */ 766 @Test 767 void testPipeWriteInterrupt() throws Exception { 768 VThreadRunner.run(() -> { 769 boolean retry = true; 770 while (retry) { 771 Pipe p = Pipe.open(); 772 try (Pipe.SinkChannel sink = p.sink(); 773 Pipe.SourceChannel source = p.source()) { 774 775 // interrupt current thread when it blocks in write 776 Thread thisThread = Thread.currentThread(); 777 runAfterParkedAsync(thisThread::interrupt); 778 779 try { 780 ByteBuffer bb = ByteBuffer.allocate(100*1024); 781 for (;;) { 782 int n = sink.write(bb); 783 assertTrue(n > 0); 784 bb.clear(); 785 } 786 } catch (ClosedByInterruptException expected) { 787 // closed when blocked in write 788 assertTrue(Thread.interrupted()); 789 retry = false; 790 } catch (ClosedChannelException e) { 791 // closed when not blocked in write, need to retry test 792 } 793 } 794 } 795 }); 796 } 797 798 /** 799 * Creates a loopback connection 800 */ 801 static class Connection implements Closeable { 802 private final SocketChannel sc1; 803 private final SocketChannel sc2; 804 Connection() throws IOException { 805 var lh = InetAddress.getLoopbackAddress(); 806 try (var listener = ServerSocketChannel.open()) { 807 listener.bind(new InetSocketAddress(lh, 0)); 808 SocketChannel sc1 = SocketChannel.open(); 809 SocketChannel sc2 = null; 810 try { 811 sc1.socket().connect(listener.getLocalAddress()); 812 sc2 = listener.accept(); 813 } catch (IOException ioe) { 814 sc1.close(); 815 throw ioe; 816 } 817 this.sc1 = sc1; 818 this.sc2 = sc2; 819 } 820 } 821 SocketChannel channel1() { 822 return sc1; 823 } 824 SocketChannel channel2() { 825 return sc2; 826 } 827 @Override 828 public void close() throws IOException { 829 sc1.close(); 830 sc2.close(); 831 } 832 } 833 834 /** 835 * Read from a channel until all bytes have been read or an I/O error occurs. 836 */ 837 static void readToEOF(ReadableByteChannel rbc) throws IOException { 838 ByteBuffer bb = ByteBuffer.allocate(16*1024); 839 int n; 840 while ((n = rbc.read(bb)) > 0) { 841 bb.clear(); 842 } 843 } 844 845 @FunctionalInterface 846 interface ThrowingRunnable { 847 void run() throws Exception; 848 } 849 850 /** 851 * Runs the given task asynchronously after the current virtual thread has parked. 852 * @return the thread started to run the task 853 */ 854 static Thread runAfterParkedAsync(ThrowingRunnable task) { 855 Thread target = Thread.currentThread(); 856 if (!target.isVirtual()) 857 throw new WrongThreadException(); 858 return Thread.ofPlatform().daemon().start(() -> { 859 try { 860 Thread.State state = target.getState(); 861 while (state != Thread.State.WAITING 862 && state != Thread.State.TIMED_WAITING) { 863 Thread.sleep(20); 864 state = target.getState(); 865 } 866 Thread.sleep(20); // give a bit more time to release carrier 867 task.run(); 868 } catch (Exception e) { 869 e.printStackTrace(); 870 } 871 }); 872 } 873 }