1 /* 2 * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test id=default 26 * @bug 8284161 27 * @summary Test virtual threads doing blocking I/O on NIO channels 28 * @library /test/lib 29 * @run junit/timeout=480 BlockingChannelOps 30 */ 31 32 /* 33 * @test id=poller-modes 34 * @requires (os.family == "linux") | (os.family == "mac") 35 * @library /test/lib 36 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps 37 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps 38 */ 39 40 /* 41 * @test id=no-vmcontinuations 42 * @requires vm.continuations 43 * @library /test/lib 44 * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps 45 */ 46 47 import java.io.Closeable; 48 import java.io.IOException; 49 import java.net.DatagramPacket; 50 import java.net.InetAddress; 51 import java.net.InetSocketAddress; 52 import java.net.Socket; 53 import java.net.SocketAddress; 54 import java.net.SocketException; 55 import java.nio.ByteBuffer; 56 import java.nio.channels.AsynchronousCloseException; 57 import java.nio.channels.ClosedByInterruptException; 58 import java.nio.channels.ClosedChannelException; 59 import java.nio.channels.DatagramChannel; 60 import java.nio.channels.Pipe; 61 import java.nio.channels.ReadableByteChannel; 62 import java.nio.channels.ServerSocketChannel; 63 import java.nio.channels.SocketChannel; 64 import java.nio.channels.WritableByteChannel; 65 import java.util.concurrent.locks.LockSupport; 66 67 import jdk.test.lib.thread.VThreadRunner; 68 import org.junit.jupiter.api.Test; 69 import static org.junit.jupiter.api.Assertions.*; 70 71 class BlockingChannelOps { 72 73 /** 74 * SocketChannel read/write, no blocking. 75 */ 76 @Test 77 void testSocketChannelReadWrite1() throws Exception { 78 VThreadRunner.run(() -> { 79 try (var connection = new Connection()) { 80 SocketChannel sc1 = connection.channel1(); 81 SocketChannel sc2 = connection.channel2(); 82 83 // write to sc1 84 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 85 int n = sc1.write(bb); 86 assertTrue(n > 0); 87 88 // read from sc2 should not block 89 bb = ByteBuffer.allocate(10); 90 n = sc2.read(bb); 91 assertTrue(n > 0); 92 assertTrue(bb.get(0) == 'X'); 93 } 94 }); 95 } 96 97 /** 98 * Virtual thread blocks in SocketChannel read. 99 */ 100 @Test 101 void testSocketChannelRead() throws Exception { 102 VThreadRunner.run(() -> { 103 try (var connection = new Connection()) { 104 SocketChannel sc1 = connection.channel1(); 105 SocketChannel sc2 = connection.channel2(); 106 107 // write to sc1 when current thread blocks in sc2.read 108 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 109 runAfterParkedAsync(() -> sc1.write(bb1)); 110 111 // read from sc2 should block 112 ByteBuffer bb2 = ByteBuffer.allocate(10); 113 int n = sc2.read(bb2); 114 assertTrue(n > 0); 115 assertTrue(bb2.get(0) == 'X'); 116 } 117 }); 118 } 119 120 /** 121 * Virtual thread blocks in SocketChannel write. 122 */ 123 @Test 124 void testSocketChannelWrite() throws Exception { 125 VThreadRunner.run(() -> { 126 try (var connection = new Connection()) { 127 SocketChannel sc1 = connection.channel1(); 128 SocketChannel sc2 = connection.channel2(); 129 130 // read from sc2 to EOF when current thread blocks in sc1.write 131 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2)); 132 133 // write to sc1 should block 134 ByteBuffer bb = ByteBuffer.allocate(100*1024); 135 for (int i=0; i<1000; i++) { 136 int n = sc1.write(bb); 137 assertTrue(n > 0); 138 bb.clear(); 139 } 140 sc1.close(); 141 142 // wait for reader to finish 143 reader.join(); 144 } 145 }); 146 } 147 148 /** 149 * SocketChannel close while virtual thread blocked in read. 150 */ 151 @Test 152 void testSocketChannelReadAsyncClose() throws Exception { 153 VThreadRunner.run(() -> { 154 try (var connection = new Connection()) { 155 SocketChannel sc = connection.channel1(); 156 runAfterParkedAsync(sc::close); 157 try { 158 int n = sc.read(ByteBuffer.allocate(100)); 159 fail("read returned " + n); 160 } catch (AsynchronousCloseException expected) { } 161 } 162 }); 163 } 164 165 /** 166 * SocketChannel shutdownInput while virtual thread blocked in read. 167 */ 168 @Test 169 void testSocketChannelReadAsyncShutdownInput() throws Exception { 170 VThreadRunner.run(() -> { 171 try (var connection = new Connection()) { 172 SocketChannel sc = connection.channel1(); 173 runAfterParkedAsync(sc::shutdownInput); 174 int n = sc.read(ByteBuffer.allocate(100)); 175 assertEquals(-1, n); 176 assertTrue(sc.isOpen()); 177 } 178 }); 179 } 180 181 /** 182 * Virtual thread interrupted while blocked in SocketChannel read. 183 */ 184 @Test 185 void testSocketChannelReadInterrupt() throws Exception { 186 VThreadRunner.run(() -> { 187 try (var connection = new Connection()) { 188 SocketChannel sc = connection.channel1(); 189 190 // interrupt current thread when it blocks in read 191 Thread thisThread = Thread.currentThread(); 192 runAfterParkedAsync(thisThread::interrupt); 193 194 try { 195 int n = sc.read(ByteBuffer.allocate(100)); 196 fail("read returned " + n); 197 } catch (ClosedByInterruptException expected) { 198 assertTrue(Thread.interrupted()); 199 } 200 } 201 }); 202 } 203 204 /** 205 * SocketChannel close while virtual thread blocked in write. 206 */ 207 @Test 208 void testSocketChannelWriteAsyncClose() throws Exception { 209 VThreadRunner.run(() -> { 210 boolean done = false; 211 while (!done) { 212 try (var connection = new Connection()) { 213 SocketChannel sc = connection.channel1(); 214 215 // close sc when current thread blocks in write 216 runAfterParkedAsync(sc::close, true); 217 218 // write until channel is closed 219 try { 220 ByteBuffer bb = ByteBuffer.allocate(100*1024); 221 for (;;) { 222 int n = sc.write(bb); 223 assertTrue(n > 0); 224 bb.clear(); 225 } 226 } catch (AsynchronousCloseException expected) { 227 // closed when blocked in write 228 done = true; 229 } catch (ClosedChannelException e) { 230 // closed but not blocked in write, need to retry test 231 System.err.format("%s, need to retry!%n", e); 232 } 233 } 234 } 235 }); 236 } 237 238 239 /** 240 * SocketChannel shutdownOutput while virtual thread blocked in write. 241 */ 242 @Test 243 void testSocketChannelWriteAsyncShutdownOutput() throws Exception { 244 VThreadRunner.run(() -> { 245 try (var connection = new Connection()) { 246 SocketChannel sc = connection.channel1(); 247 248 // shutdown output when current thread blocks in write 249 runAfterParkedAsync(sc::shutdownOutput); 250 try { 251 ByteBuffer bb = ByteBuffer.allocate(100*1024); 252 for (;;) { 253 int n = sc.write(bb); 254 assertTrue(n > 0); 255 bb.clear(); 256 } 257 } catch (ClosedChannelException e) { 258 // expected 259 } 260 assertTrue(sc.isOpen()); 261 } 262 }); 263 } 264 265 /** 266 * Virtual thread interrupted while blocked in SocketChannel write. 267 */ 268 @Test 269 void testSocketChannelWriteInterrupt() throws Exception { 270 VThreadRunner.run(() -> { 271 boolean done = false; 272 while (!done) { 273 try (var connection = new Connection()) { 274 SocketChannel sc = connection.channel1(); 275 276 // interrupt current thread when it blocks in write 277 Thread thisThread = Thread.currentThread(); 278 runAfterParkedAsync(thisThread::interrupt, true); 279 280 // write until channel is closed 281 try { 282 ByteBuffer bb = ByteBuffer.allocate(100*1024); 283 for (;;) { 284 int n = sc.write(bb); 285 assertTrue(n > 0); 286 bb.clear(); 287 } 288 } catch (ClosedByInterruptException e) { 289 // closed when blocked in write 290 assertTrue(Thread.interrupted()); 291 done = true; 292 } catch (ClosedChannelException e) { 293 // closed but not blocked in write, need to retry test 294 System.err.format("%s, need to retry!%n", e); 295 } 296 } 297 } 298 }); 299 } 300 301 /** 302 * Virtual thread blocks in SocketChannel adaptor read. 303 */ 304 @Test 305 void testSocketAdaptorRead1() throws Exception { 306 testSocketAdaptorRead(0); 307 } 308 309 /** 310 * Virtual thread blocks in SocketChannel adaptor read with timeout. 311 */ 312 @Test 313 void testSocketAdaptorRead2() throws Exception { 314 testSocketAdaptorRead(60_000); 315 } 316 317 private void testSocketAdaptorRead(int timeout) throws Exception { 318 VThreadRunner.run(() -> { 319 try (var connection = new Connection()) { 320 SocketChannel sc1 = connection.channel1(); 321 SocketChannel sc2 = connection.channel2(); 322 323 // write to sc1 when currnet thread blocks reading from sc2 324 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 325 runAfterParkedAsync(() -> sc1.write(bb)); 326 327 // read from sc2 should block 328 byte[] array = new byte[100]; 329 if (timeout > 0) 330 sc2.socket().setSoTimeout(timeout); 331 int n = sc2.socket().getInputStream().read(array); 332 assertTrue(n > 0); 333 assertTrue(array[0] == 'X'); 334 } 335 }); 336 } 337 338 /** 339 * ServerSocketChannel accept, no blocking. 340 */ 341 @Test 342 void testServerSocketChannelAccept1() throws Exception { 343 VThreadRunner.run(() -> { 344 try (var ssc = ServerSocketChannel.open()) { 345 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 346 var sc1 = SocketChannel.open(ssc.getLocalAddress()); 347 // accept should not block 348 var sc2 = ssc.accept(); 349 sc1.close(); 350 sc2.close(); 351 } 352 }); 353 } 354 355 /** 356 * Virtual thread blocks in ServerSocketChannel accept. 357 */ 358 @Test 359 void testServerSocketChannelAccept2() throws Exception { 360 VThreadRunner.run(() -> { 361 try (var ssc = ServerSocketChannel.open()) { 362 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 363 var sc1 = SocketChannel.open(); 364 365 // connect when current thread when it blocks in accept 366 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress())); 367 368 // accept should block 369 var sc2 = ssc.accept(); 370 sc1.close(); 371 sc2.close(); 372 } 373 }); 374 } 375 376 /** 377 * SeverSocketChannel close while virtual thread blocked in accept. 378 */ 379 @Test 380 void testServerSocketChannelAcceptAsyncClose() throws Exception { 381 VThreadRunner.run(() -> { 382 try (var ssc = ServerSocketChannel.open()) { 383 InetAddress lh = InetAddress.getLoopbackAddress(); 384 ssc.bind(new InetSocketAddress(lh, 0)); 385 runAfterParkedAsync(ssc::close); 386 try { 387 SocketChannel sc = ssc.accept(); 388 sc.close(); 389 fail("connection accepted???"); 390 } catch (AsynchronousCloseException expected) { } 391 } 392 }); 393 } 394 395 /** 396 * Virtual thread interrupted while blocked in ServerSocketChannel accept. 397 */ 398 @Test 399 void testServerSocketChannelAcceptInterrupt() throws Exception { 400 VThreadRunner.run(() -> { 401 try (var ssc = ServerSocketChannel.open()) { 402 InetAddress lh = InetAddress.getLoopbackAddress(); 403 ssc.bind(new InetSocketAddress(lh, 0)); 404 405 // interrupt current thread when it blocks in accept 406 Thread thisThread = Thread.currentThread(); 407 runAfterParkedAsync(thisThread::interrupt); 408 409 try { 410 SocketChannel sc = ssc.accept(); 411 sc.close(); 412 fail("connection accepted???"); 413 } catch (ClosedByInterruptException expected) { 414 assertTrue(Thread.interrupted()); 415 } 416 } 417 }); 418 } 419 420 /** 421 * Virtual thread blocks in ServerSocketChannel adaptor accept. 422 */ 423 @Test 424 void testSocketChannelAdaptorAccept1() throws Exception { 425 testSocketChannelAdaptorAccept(0); 426 } 427 428 /** 429 * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout. 430 */ 431 @Test 432 void testSocketChannelAdaptorAccept2() throws Exception { 433 testSocketChannelAdaptorAccept(60_000); 434 } 435 436 private void testSocketChannelAdaptorAccept(int timeout) throws Exception { 437 VThreadRunner.run(() -> { 438 try (var ssc = ServerSocketChannel.open()) { 439 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 440 var sc = SocketChannel.open(); 441 442 // interrupt current thread when it blocks in accept 443 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress())); 444 445 // accept should block 446 if (timeout > 0) 447 ssc.socket().setSoTimeout(timeout); 448 Socket s = ssc.socket().accept(); 449 sc.close(); 450 s.close(); 451 } 452 }); 453 } 454 455 /** 456 * DatagramChannel receive/send, no blocking. 457 */ 458 @Test 459 void testDatagramChannelSendReceive1() throws Exception { 460 VThreadRunner.run(() -> { 461 try (DatagramChannel dc1 = DatagramChannel.open(); 462 DatagramChannel dc2 = DatagramChannel.open()) { 463 464 InetAddress lh = InetAddress.getLoopbackAddress(); 465 dc2.bind(new InetSocketAddress(lh, 0)); 466 467 // send should not block 468 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 469 int n = dc1.send(bb, dc2.getLocalAddress()); 470 assertTrue(n > 0); 471 472 // receive should not block 473 bb = ByteBuffer.allocate(10); 474 dc2.receive(bb); 475 assertTrue(bb.get(0) == 'X'); 476 } 477 }); 478 } 479 480 /** 481 * Virtual thread blocks in DatagramChannel receive. 482 */ 483 @Test 484 void testDatagramChannelSendReceive2() throws Exception { 485 VThreadRunner.run(() -> { 486 try (DatagramChannel dc1 = DatagramChannel.open(); 487 DatagramChannel dc2 = DatagramChannel.open()) { 488 489 InetAddress lh = InetAddress.getLoopbackAddress(); 490 dc2.bind(new InetSocketAddress(lh, 0)); 491 492 // send from dc1 when current thread blocked in dc2.receive 493 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 494 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress())); 495 496 // read from dc2 should block 497 ByteBuffer bb2 = ByteBuffer.allocate(10); 498 dc2.receive(bb2); 499 assertTrue(bb2.get(0) == 'X'); 500 } 501 }); 502 } 503 504 /** 505 * DatagramChannel close while virtual thread blocked in receive. 506 */ 507 @Test 508 void testDatagramChannelReceiveAsyncClose() throws Exception { 509 VThreadRunner.run(() -> { 510 try (DatagramChannel dc = DatagramChannel.open()) { 511 InetAddress lh = InetAddress.getLoopbackAddress(); 512 dc.bind(new InetSocketAddress(lh, 0)); 513 runAfterParkedAsync(dc::close); 514 try { 515 dc.receive(ByteBuffer.allocate(100)); 516 fail("receive returned"); 517 } catch (AsynchronousCloseException expected) { } 518 } 519 }); 520 } 521 522 /** 523 * Virtual thread interrupted while blocked in DatagramChannel receive. 524 */ 525 @Test 526 void testDatagramChannelReceiveInterrupt() throws Exception { 527 VThreadRunner.run(() -> { 528 try (DatagramChannel dc = DatagramChannel.open()) { 529 InetAddress lh = InetAddress.getLoopbackAddress(); 530 dc.bind(new InetSocketAddress(lh, 0)); 531 532 // interrupt current thread when it blocks in receive 533 Thread thisThread = Thread.currentThread(); 534 runAfterParkedAsync(thisThread::interrupt); 535 536 try { 537 dc.receive(ByteBuffer.allocate(100)); 538 fail("receive returned"); 539 } catch (ClosedByInterruptException expected) { 540 assertTrue(Thread.interrupted()); 541 } 542 } 543 }); 544 } 545 546 /** 547 * Virtual thread blocks in DatagramSocket adaptor receive. 548 */ 549 @Test 550 void testDatagramSocketAdaptorReceive1() throws Exception { 551 testDatagramSocketAdaptorReceive(0); 552 } 553 554 /** 555 * Virtual thread blocks in DatagramSocket adaptor receive with timeout. 556 */ 557 @Test 558 void testDatagramSocketAdaptorReceive2() throws Exception { 559 testDatagramSocketAdaptorReceive(60_000); 560 } 561 562 private void testDatagramSocketAdaptorReceive(int timeout) throws Exception { 563 VThreadRunner.run(() -> { 564 try (DatagramChannel dc1 = DatagramChannel.open(); 565 DatagramChannel dc2 = DatagramChannel.open()) { 566 567 InetAddress lh = InetAddress.getLoopbackAddress(); 568 dc2.bind(new InetSocketAddress(lh, 0)); 569 570 // send from dc1 when current thread blocks in dc2 receive 571 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 572 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress())); 573 574 // receive should block 575 byte[] array = new byte[100]; 576 DatagramPacket p = new DatagramPacket(array, 0, array.length); 577 if (timeout > 0) 578 dc2.socket().setSoTimeout(timeout); 579 dc2.socket().receive(p); 580 assertTrue(p.getLength() == 3 && array[0] == 'X'); 581 } 582 }); 583 } 584 585 /** 586 * DatagramChannel close while virtual thread blocked in adaptor receive. 587 */ 588 @Test 589 void testDatagramSocketAdaptorReceiveAsyncClose1() throws Exception { 590 testDatagramSocketAdaptorReceiveAsyncClose(0); 591 } 592 593 /** 594 * DatagramChannel close while virtual thread blocked in adaptor receive 595 * with timeout. 596 */ 597 @Test 598 void testDatagramSocketAdaptorReceiveAsyncClose2() throws Exception { 599 testDatagramSocketAdaptorReceiveAsyncClose(60_1000); 600 } 601 602 private void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception { 603 VThreadRunner.run(() -> { 604 try (DatagramChannel dc = DatagramChannel.open()) { 605 InetAddress lh = InetAddress.getLoopbackAddress(); 606 dc.bind(new InetSocketAddress(lh, 0)); 607 608 byte[] array = new byte[100]; 609 DatagramPacket p = new DatagramPacket(array, 0, array.length); 610 if (timeout > 0) 611 dc.socket().setSoTimeout(timeout); 612 613 // close channel/socket when current thread blocks in receive 614 runAfterParkedAsync(dc::close); 615 616 assertThrows(SocketException.class, () -> dc.socket().receive(p)); 617 } 618 }); 619 } 620 621 /** 622 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive. 623 */ 624 @Test 625 void testDatagramSocketAdaptorReceiveInterrupt1() throws Exception { 626 testDatagramSocketAdaptorReceiveInterrupt(0); 627 } 628 629 /** 630 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive 631 * with timeout. 632 */ 633 @Test 634 void testDatagramSocketAdaptorReceiveInterrupt2() throws Exception { 635 testDatagramSocketAdaptorReceiveInterrupt(60_1000); 636 } 637 638 private void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception { 639 VThreadRunner.run(() -> { 640 try (DatagramChannel dc = DatagramChannel.open()) { 641 InetAddress lh = InetAddress.getLoopbackAddress(); 642 dc.bind(new InetSocketAddress(lh, 0)); 643 644 byte[] array = new byte[100]; 645 DatagramPacket p = new DatagramPacket(array, 0, array.length); 646 if (timeout > 0) 647 dc.socket().setSoTimeout(timeout); 648 649 // interrupt current thread when it blocks in receive 650 Thread thisThread = Thread.currentThread(); 651 runAfterParkedAsync(thisThread::interrupt); 652 653 try { 654 dc.socket().receive(p); 655 fail(); 656 } catch (ClosedByInterruptException expected) { 657 assertTrue(Thread.interrupted()); 658 } 659 } 660 }); 661 } 662 663 /** 664 * Pipe read/write, no blocking. 665 */ 666 @Test 667 void testPipeReadWrite1() throws Exception { 668 VThreadRunner.run(() -> { 669 Pipe p = Pipe.open(); 670 try (Pipe.SinkChannel sink = p.sink(); 671 Pipe.SourceChannel source = p.source()) { 672 673 // write should not block 674 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 675 int n = sink.write(bb); 676 assertTrue(n > 0); 677 678 // read should not block 679 bb = ByteBuffer.allocate(10); 680 n = source.read(bb); 681 assertTrue(n > 0); 682 assertTrue(bb.get(0) == 'X'); 683 } 684 }); 685 } 686 687 /** 688 * Virtual thread blocks in Pipe.SourceChannel read. 689 */ 690 @Test 691 void testPipeReadWrite2() throws Exception { 692 VThreadRunner.run(() -> { 693 Pipe p = Pipe.open(); 694 try (Pipe.SinkChannel sink = p.sink(); 695 Pipe.SourceChannel source = p.source()) { 696 697 // write from sink when current thread blocks reading from source 698 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 699 runAfterParkedAsync(() -> sink.write(bb1)); 700 701 // read should block 702 ByteBuffer bb2 = ByteBuffer.allocate(10); 703 int n = source.read(bb2); 704 assertTrue(n > 0); 705 assertTrue(bb2.get(0) == 'X'); 706 } 707 }); 708 } 709 710 /** 711 * Virtual thread blocks in Pipe.SinkChannel write. 712 */ 713 @Test 714 void testPipeReadWrite3() throws Exception { 715 VThreadRunner.run(() -> { 716 Pipe p = Pipe.open(); 717 try (Pipe.SinkChannel sink = p.sink(); 718 Pipe.SourceChannel source = p.source()) { 719 720 // read from source to EOF when current thread blocking in write 721 Thread reader = runAfterParkedAsync(() -> readToEOF(source)); 722 723 // write to sink should block 724 ByteBuffer bb = ByteBuffer.allocate(100*1024); 725 for (int i=0; i<1000; i++) { 726 int n = sink.write(bb); 727 assertTrue(n > 0); 728 bb.clear(); 729 } 730 sink.close(); 731 732 // wait for reader to finish 733 reader.join(); 734 } 735 }); 736 } 737 738 /** 739 * Pipe.SourceChannel close while virtual thread blocked in read. 740 */ 741 @Test 742 void testPipeReadAsyncClose() throws Exception { 743 VThreadRunner.run(() -> { 744 Pipe p = Pipe.open(); 745 try (Pipe.SinkChannel sink = p.sink(); 746 Pipe.SourceChannel source = p.source()) { 747 runAfterParkedAsync(source::close); 748 try { 749 int n = source.read(ByteBuffer.allocate(100)); 750 fail("read returned " + n); 751 } catch (AsynchronousCloseException expected) { } 752 } 753 }); 754 } 755 756 /** 757 * Virtual thread interrupted while blocked in Pipe.SourceChannel read. 758 */ 759 @Test 760 void testPipeReadInterrupt() throws Exception { 761 VThreadRunner.run(() -> { 762 Pipe p = Pipe.open(); 763 try (Pipe.SinkChannel sink = p.sink(); 764 Pipe.SourceChannel source = p.source()) { 765 766 // interrupt current thread when it blocks reading from source 767 Thread thisThread = Thread.currentThread(); 768 runAfterParkedAsync(thisThread::interrupt); 769 770 try { 771 int n = source.read(ByteBuffer.allocate(100)); 772 fail("read returned " + n); 773 } catch (ClosedByInterruptException expected) { 774 assertTrue(Thread.interrupted()); 775 } 776 } 777 }); 778 } 779 780 /** 781 * Pipe.SinkChannel close while virtual thread blocked in write. 782 */ 783 @Test 784 void testPipeWriteAsyncClose() throws Exception { 785 VThreadRunner.run(() -> { 786 boolean done = false; 787 while (!done) { 788 Pipe p = Pipe.open(); 789 try (Pipe.SinkChannel sink = p.sink(); 790 Pipe.SourceChannel source = p.source()) { 791 792 // close sink when current thread blocks in write 793 runAfterParkedAsync(sink::close, true); 794 795 // write until channel is closed 796 try { 797 ByteBuffer bb = ByteBuffer.allocate(100*1024); 798 for (;;) { 799 int n = sink.write(bb); 800 assertTrue(n > 0); 801 bb.clear(); 802 } 803 } catch (AsynchronousCloseException e) { 804 // closed when blocked in write 805 done = true; 806 } catch (ClosedChannelException e) { 807 // closed but not blocked in write, need to retry test 808 System.err.format("%s, need to retry!%n", e); 809 } 810 } 811 } 812 }); 813 } 814 815 /** 816 * Virtual thread interrupted while blocked in Pipe.SinkChannel write. 817 */ 818 @Test 819 void testPipeWriteInterrupt() throws Exception { 820 VThreadRunner.run(() -> { 821 boolean done = false; 822 while (!done) { 823 Pipe p = Pipe.open(); 824 try (Pipe.SinkChannel sink = p.sink(); 825 Pipe.SourceChannel source = p.source()) { 826 827 // interrupt current thread when it blocks in write 828 Thread thisThread = Thread.currentThread(); 829 runAfterParkedAsync(thisThread::interrupt, true); 830 831 // write until channel is closed 832 try { 833 ByteBuffer bb = ByteBuffer.allocate(100*1024); 834 for (;;) { 835 int n = sink.write(bb); 836 assertTrue(n > 0); 837 bb.clear(); 838 } 839 } catch (ClosedByInterruptException expected) { 840 // closed when blocked in write 841 assertTrue(Thread.interrupted()); 842 done = true; 843 } catch (ClosedChannelException e) { 844 // closed but not blocked in write, need to retry test 845 System.err.format("%s, need to retry!%n", e); 846 } 847 } 848 } 849 }); 850 } 851 852 /** 853 * Creates a loopback connection 854 */ 855 static class Connection implements Closeable { 856 private final SocketChannel sc1; 857 private final SocketChannel sc2; 858 Connection() throws IOException { 859 var lh = InetAddress.getLoopbackAddress(); 860 try (var listener = ServerSocketChannel.open()) { 861 listener.bind(new InetSocketAddress(lh, 0)); 862 SocketChannel sc1 = SocketChannel.open(); 863 SocketChannel sc2 = null; 864 try { 865 sc1.socket().connect(listener.getLocalAddress()); 866 sc2 = listener.accept(); 867 } catch (IOException ioe) { 868 sc1.close(); 869 throw ioe; 870 } 871 this.sc1 = sc1; 872 this.sc2 = sc2; 873 } 874 } 875 SocketChannel channel1() { 876 return sc1; 877 } 878 SocketChannel channel2() { 879 return sc2; 880 } 881 @Override 882 public void close() throws IOException { 883 sc1.close(); 884 sc2.close(); 885 } 886 } 887 888 /** 889 * Read from a channel until all bytes have been read or an I/O error occurs. 890 */ 891 static void readToEOF(ReadableByteChannel rbc) throws IOException { 892 ByteBuffer bb = ByteBuffer.allocate(16*1024); 893 int n; 894 while ((n = rbc.read(bb)) > 0) { 895 bb.clear(); 896 } 897 } 898 899 @FunctionalInterface 900 interface ThrowingRunnable { 901 void run() throws Exception; 902 } 903 904 /** 905 * Runs the given task asynchronously after the current virtual thread parks. 906 * @param writing if the thread will block in write 907 * @return the thread started to run the task 908 */ 909 private static Thread runAfterParkedAsync(ThrowingRunnable task, boolean writing) { 910 Thread target = Thread.currentThread(); 911 if (!target.isVirtual()) 912 throw new WrongThreadException(); 913 return Thread.ofPlatform().daemon().start(() -> { 914 try { 915 // wait for target thread to park 916 while (!isWaiting(target)) { 917 Thread.sleep(20); 918 } 919 920 // if the target thread is parked in write then we nudge it a few times 921 // to avoid wakeup with some bytes written 922 if (writing) { 923 for (int i = 0; i < 3; i++) { 924 LockSupport.unpark(target); 925 while (!isWaiting(target)) { 926 Thread.sleep(20); 927 } 928 } 929 } 930 931 task.run(); 932 933 } catch (Exception e) { 934 e.printStackTrace(); 935 } 936 }); 937 } 938 939 private static Thread runAfterParkedAsync(ThrowingRunnable task) { 940 return runAfterParkedAsync(task, false); 941 } 942 943 /** 944 * Return true if the given Thread is parked. 945 */ 946 private static boolean isWaiting(Thread target) { 947 Thread.State state = target.getState(); 948 assertNotEquals(Thread.State.TERMINATED, state); 949 return (state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING); 950 } 951 }