1 /* 2 * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /** 25 * @test id=default 26 * @bug 8284161 27 * @summary Test virtual threads doing blocking I/O on NIO channels 28 * @library /test/lib 29 * @run junit BlockingChannelOps 30 */ 31 32 /** 33 * @test id=poller-modes 34 * @requires (os.family == "linux") | (os.family == "mac") 35 * @library /test/lib 36 * @run junit/othervm -Djdk.pollerMode=1 BlockingChannelOps 37 * @run junit/othervm -Djdk.pollerMode=2 BlockingChannelOps 38 */ 39 40 /** 41 * @test id=no-vmcontinuations 42 * @requires vm.continuations 43 * @library /test/lib 44 * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps 45 */ 46 47 import java.io.Closeable; 48 import java.io.IOException; 49 import java.net.DatagramPacket; 50 import java.net.InetAddress; 51 import java.net.InetSocketAddress; 52 import java.net.Socket; 53 import java.net.SocketAddress; 54 import java.net.SocketException; 55 import java.nio.ByteBuffer; 56 import java.nio.channels.AsynchronousCloseException; 57 import java.nio.channels.ClosedByInterruptException; 58 import java.nio.channels.ClosedChannelException; 59 import java.nio.channels.DatagramChannel; 60 import java.nio.channels.Pipe; 61 import java.nio.channels.ReadableByteChannel; 62 import java.nio.channels.ServerSocketChannel; 63 import java.nio.channels.SocketChannel; 64 import java.nio.channels.WritableByteChannel; 65 import java.util.concurrent.ExecutorService; 66 import java.util.concurrent.Executors; 67 import java.util.stream.Stream; 68 69 import jdk.test.lib.thread.VThreadRunner; 70 import jdk.test.lib.thread.VThreadScheduler; 71 72 import org.junit.jupiter.api.BeforeAll; 73 import org.junit.jupiter.api.AfterAll; 74 import org.junit.jupiter.params.ParameterizedTest; 75 import org.junit.jupiter.params.provider.MethodSource; 76 import static org.junit.jupiter.api.Assertions.*; 77 78 class BlockingChannelOps { 79 80 private static ExecutorService customScheduler; 81 82 @BeforeAll 83 static void setup() { 84 customScheduler = Executors.newCachedThreadPool(); 85 } 86 87 @AfterAll 88 static void finish() { 89 customScheduler.shutdown(); 90 } 91 92 /** 93 * Returns the Thread.Builder to create the virtual thread. 94 */ 95 static Stream<Thread.Builder.OfVirtual> threadBuilders() { 96 if (VThreadScheduler.supportsCustomScheduler()) { 97 return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler)); 98 } else { 99 return Stream.of(Thread.ofVirtual()); 100 } 101 } 102 103 /** 104 * SocketChannel read/write, no blocking. 105 */ 106 @ParameterizedTest 107 @MethodSource("threadBuilders") 108 void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception { 109 VThreadRunner.run(builder, () -> { 110 try (var connection = new Connection()) { 111 SocketChannel sc1 = connection.channel1(); 112 SocketChannel sc2 = connection.channel2(); 113 114 // write to sc1 115 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 116 int n = sc1.write(bb); 117 assertTrue(n > 0); 118 119 // read from sc2 should not block 120 bb = ByteBuffer.allocate(10); 121 n = sc2.read(bb); 122 assertTrue(n > 0); 123 assertTrue(bb.get(0) == 'X'); 124 } 125 }); 126 } 127 128 /** 129 * Virtual thread blocks in SocketChannel read. 130 */ 131 @ParameterizedTest 132 @MethodSource("threadBuilders") 133 void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception { 134 VThreadRunner.run(builder, () -> { 135 try (var connection = new Connection()) { 136 SocketChannel sc1 = connection.channel1(); 137 SocketChannel sc2 = connection.channel2(); 138 139 // write to sc1 when current thread blocks in sc2.read 140 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 141 runAfterParkedAsync(() -> sc1.write(bb1)); 142 143 // read from sc2 should block 144 ByteBuffer bb2 = ByteBuffer.allocate(10); 145 int n = sc2.read(bb2); 146 assertTrue(n > 0); 147 assertTrue(bb2.get(0) == 'X'); 148 } 149 }); 150 } 151 152 /** 153 * Virtual thread blocks in SocketChannel write. 154 */ 155 @ParameterizedTest 156 @MethodSource("threadBuilders") 157 void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception { 158 VThreadRunner.run(builder, () -> { 159 try (var connection = new Connection()) { 160 SocketChannel sc1 = connection.channel1(); 161 SocketChannel sc2 = connection.channel2(); 162 163 // read from sc2 to EOF when current thread blocks in sc1.write 164 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2)); 165 166 // write to sc1 should block 167 ByteBuffer bb = ByteBuffer.allocate(100*1024); 168 for (int i=0; i<1000; i++) { 169 int n = sc1.write(bb); 170 assertTrue(n > 0); 171 bb.clear(); 172 } 173 sc1.close(); 174 175 // wait for reader to finish 176 reader.join(); 177 } 178 }); 179 } 180 181 /** 182 * SocketChannel close while virtual thread blocked in read. 183 */ 184 @ParameterizedTest 185 @MethodSource("threadBuilders") 186 void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 187 VThreadRunner.run(builder, () -> { 188 try (var connection = new Connection()) { 189 SocketChannel sc = connection.channel1(); 190 runAfterParkedAsync(sc::close); 191 try { 192 int n = sc.read(ByteBuffer.allocate(100)); 193 fail("read returned " + n); 194 } catch (AsynchronousCloseException expected) { } 195 } 196 }); 197 } 198 199 /** 200 * SocketChannel shutdownInput while virtual thread blocked in read. 201 */ 202 @ParameterizedTest 203 @MethodSource("threadBuilders") 204 void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception { 205 VThreadRunner.run(builder, () -> { 206 try (var connection = new Connection()) { 207 SocketChannel sc = connection.channel1(); 208 runAfterParkedAsync(sc::shutdownInput); 209 int n = sc.read(ByteBuffer.allocate(100)); 210 assertEquals(-1, n); 211 assertTrue(sc.isOpen()); 212 } 213 }); 214 } 215 216 /** 217 * Virtual thread interrupted while blocked in SocketChannel read. 218 */ 219 @ParameterizedTest 220 @MethodSource("threadBuilders") 221 void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 222 VThreadRunner.run(builder, () -> { 223 try (var connection = new Connection()) { 224 SocketChannel sc = connection.channel1(); 225 226 // interrupt current thread when it blocks in read 227 Thread thisThread = Thread.currentThread(); 228 runAfterParkedAsync(thisThread::interrupt); 229 230 try { 231 int n = sc.read(ByteBuffer.allocate(100)); 232 fail("read returned " + n); 233 } catch (ClosedByInterruptException expected) { 234 assertTrue(Thread.interrupted()); 235 } 236 } 237 }); 238 } 239 240 /** 241 * SocketChannel close while virtual thread blocked in write. 242 */ 243 @ParameterizedTest 244 @MethodSource("threadBuilders") 245 void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 246 VThreadRunner.run(builder, () -> { 247 boolean retry = true; 248 while (retry) { 249 try (var connection = new Connection()) { 250 SocketChannel sc = connection.channel1(); 251 252 // close sc when current thread blocks in write 253 runAfterParkedAsync(sc::close); 254 try { 255 ByteBuffer bb = ByteBuffer.allocate(100*1024); 256 for (;;) { 257 int n = sc.write(bb); 258 assertTrue(n > 0); 259 bb.clear(); 260 } 261 } catch (AsynchronousCloseException expected) { 262 // closed when blocked in write 263 retry = false; 264 } catch (ClosedChannelException e) { 265 // closed when not blocked in write, need to retry test 266 } 267 } 268 } 269 }); 270 } 271 272 273 /** 274 * SocketChannel shutdownOutput while virtual thread blocked in write. 275 */ 276 @ParameterizedTest 277 @MethodSource("threadBuilders") 278 void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception { 279 VThreadRunner.run(builder, () -> { 280 try (var connection = new Connection()) { 281 SocketChannel sc = connection.channel1(); 282 283 // shutdown output when current thread blocks in write 284 runAfterParkedAsync(sc::shutdownOutput); 285 try { 286 ByteBuffer bb = ByteBuffer.allocate(100*1024); 287 for (;;) { 288 int n = sc.write(bb); 289 assertTrue(n > 0); 290 bb.clear(); 291 } 292 } catch (ClosedChannelException e) { 293 // expected 294 } 295 assertTrue(sc.isOpen()); 296 } 297 }); 298 } 299 300 /** 301 * Virtual thread interrupted while blocked in SocketChannel write. 302 */ 303 @ParameterizedTest 304 @MethodSource("threadBuilders") 305 void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 306 VThreadRunner.run(builder, () -> { 307 boolean retry = true; 308 while (retry) { 309 try (var connection = new Connection()) { 310 SocketChannel sc = connection.channel1(); 311 312 // interrupt current thread when it blocks in write 313 Thread thisThread = Thread.currentThread(); 314 runAfterParkedAsync(thisThread::interrupt); 315 316 try { 317 ByteBuffer bb = ByteBuffer.allocate(100*1024); 318 for (;;) { 319 int n = sc.write(bb); 320 assertTrue(n > 0); 321 bb.clear(); 322 } 323 } catch (ClosedByInterruptException e) { 324 // closed when blocked in write 325 assertTrue(Thread.interrupted()); 326 retry = false; 327 } catch (ClosedChannelException e) { 328 // closed when not blocked in write, need to retry test 329 } 330 } 331 } 332 }); 333 } 334 335 /** 336 * Virtual thread blocks in SocketChannel adaptor read. 337 */ 338 @ParameterizedTest 339 @MethodSource("threadBuilders") 340 void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception { 341 testSocketAdaptorRead(builder, 0); 342 } 343 344 /** 345 * Virtual thread blocks in SocketChannel adaptor read with timeout. 346 */ 347 @ParameterizedTest 348 @MethodSource("threadBuilders") 349 void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception { 350 testSocketAdaptorRead(builder, 60_000); 351 } 352 353 private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder, 354 int timeout) throws Exception { 355 VThreadRunner.run(builder, () -> { 356 try (var connection = new Connection()) { 357 SocketChannel sc1 = connection.channel1(); 358 SocketChannel sc2 = connection.channel2(); 359 360 // write to sc1 when currnet thread blocks reading from sc2 361 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 362 runAfterParkedAsync(() -> sc1.write(bb)); 363 364 // read from sc2 should block 365 byte[] array = new byte[100]; 366 if (timeout > 0) 367 sc2.socket().setSoTimeout(timeout); 368 int n = sc2.socket().getInputStream().read(array); 369 assertTrue(n > 0); 370 assertTrue(array[0] == 'X'); 371 } 372 }); 373 } 374 375 /** 376 * ServerSocketChannel accept, no blocking. 377 */ 378 @ParameterizedTest 379 @MethodSource("threadBuilders") 380 void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception { 381 VThreadRunner.run(builder, () -> { 382 try (var ssc = ServerSocketChannel.open()) { 383 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 384 var sc1 = SocketChannel.open(ssc.getLocalAddress()); 385 // accept should not block 386 var sc2 = ssc.accept(); 387 sc1.close(); 388 sc2.close(); 389 } 390 }); 391 } 392 393 /** 394 * Virtual thread blocks in ServerSocketChannel accept. 395 */ 396 @ParameterizedTest 397 @MethodSource("threadBuilders") 398 void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception { 399 VThreadRunner.run(builder, () -> { 400 try (var ssc = ServerSocketChannel.open()) { 401 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 402 var sc1 = SocketChannel.open(); 403 404 // connect when current thread when it blocks in accept 405 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress())); 406 407 // accept should block 408 var sc2 = ssc.accept(); 409 sc1.close(); 410 sc2.close(); 411 } 412 }); 413 } 414 415 /** 416 * SeverSocketChannel close while virtual thread blocked in accept. 417 */ 418 @ParameterizedTest 419 @MethodSource("threadBuilders") 420 void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 421 VThreadRunner.run(builder, () -> { 422 try (var ssc = ServerSocketChannel.open()) { 423 InetAddress lh = InetAddress.getLoopbackAddress(); 424 ssc.bind(new InetSocketAddress(lh, 0)); 425 runAfterParkedAsync(ssc::close); 426 try { 427 SocketChannel sc = ssc.accept(); 428 sc.close(); 429 fail("connection accepted???"); 430 } catch (AsynchronousCloseException expected) { } 431 } 432 }); 433 } 434 435 /** 436 * Virtual thread interrupted while blocked in ServerSocketChannel accept. 437 */ 438 @ParameterizedTest 439 @MethodSource("threadBuilders") 440 void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 441 VThreadRunner.run(builder, () -> { 442 try (var ssc = ServerSocketChannel.open()) { 443 InetAddress lh = InetAddress.getLoopbackAddress(); 444 ssc.bind(new InetSocketAddress(lh, 0)); 445 446 // interrupt current thread when it blocks in accept 447 Thread thisThread = Thread.currentThread(); 448 runAfterParkedAsync(thisThread::interrupt); 449 450 try { 451 SocketChannel sc = ssc.accept(); 452 sc.close(); 453 fail("connection accepted???"); 454 } catch (ClosedByInterruptException expected) { 455 assertTrue(Thread.interrupted()); 456 } 457 } 458 }); 459 } 460 461 /** 462 * Virtual thread blocks in ServerSocketChannel adaptor accept. 463 */ 464 @ParameterizedTest 465 @MethodSource("threadBuilders") 466 void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception { 467 testSocketChannelAdaptorAccept(builder, 0); 468 } 469 470 /** 471 * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout. 472 */ 473 @ParameterizedTest 474 @MethodSource("threadBuilders") 475 void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception { 476 testSocketChannelAdaptorAccept(builder, 60_000); 477 } 478 479 private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder, 480 int timeout) throws Exception { 481 VThreadRunner.run(builder, () -> { 482 try (var ssc = ServerSocketChannel.open()) { 483 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 484 var sc = SocketChannel.open(); 485 486 // interrupt current thread when it blocks in accept 487 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress())); 488 489 // accept should block 490 if (timeout > 0) 491 ssc.socket().setSoTimeout(timeout); 492 Socket s = ssc.socket().accept(); 493 sc.close(); 494 s.close(); 495 } 496 }); 497 } 498 499 /** 500 * DatagramChannel receive/send, no blocking. 501 */ 502 @ParameterizedTest 503 @MethodSource("threadBuilders") 504 void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception { 505 VThreadRunner.run(builder, () -> { 506 try (DatagramChannel dc1 = DatagramChannel.open(); 507 DatagramChannel dc2 = DatagramChannel.open()) { 508 509 InetAddress lh = InetAddress.getLoopbackAddress(); 510 dc2.bind(new InetSocketAddress(lh, 0)); 511 512 // send should not block 513 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 514 int n = dc1.send(bb, dc2.getLocalAddress()); 515 assertTrue(n > 0); 516 517 // receive should not block 518 bb = ByteBuffer.allocate(10); 519 dc2.receive(bb); 520 assertTrue(bb.get(0) == 'X'); 521 } 522 }); 523 } 524 525 /** 526 * Virtual thread blocks in DatagramChannel receive. 527 */ 528 @ParameterizedTest 529 @MethodSource("threadBuilders") 530 void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception { 531 VThreadRunner.run(builder, () -> { 532 try (DatagramChannel dc1 = DatagramChannel.open(); 533 DatagramChannel dc2 = DatagramChannel.open()) { 534 535 InetAddress lh = InetAddress.getLoopbackAddress(); 536 dc2.bind(new InetSocketAddress(lh, 0)); 537 538 // send from dc1 when current thread blocked in dc2.receive 539 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 540 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress())); 541 542 // read from dc2 should block 543 ByteBuffer bb2 = ByteBuffer.allocate(10); 544 dc2.receive(bb2); 545 assertTrue(bb2.get(0) == 'X'); 546 } 547 }); 548 } 549 550 /** 551 * DatagramChannel close while virtual thread blocked in receive. 552 */ 553 @ParameterizedTest 554 @MethodSource("threadBuilders") 555 void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 556 VThreadRunner.run(builder, () -> { 557 try (DatagramChannel dc = DatagramChannel.open()) { 558 InetAddress lh = InetAddress.getLoopbackAddress(); 559 dc.bind(new InetSocketAddress(lh, 0)); 560 runAfterParkedAsync(dc::close); 561 try { 562 dc.receive(ByteBuffer.allocate(100)); 563 fail("receive returned"); 564 } catch (AsynchronousCloseException expected) { } 565 } 566 }); 567 } 568 569 /** 570 * Virtual thread interrupted while blocked in DatagramChannel receive. 571 */ 572 @ParameterizedTest 573 @MethodSource("threadBuilders") 574 void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 575 VThreadRunner.run(builder, () -> { 576 try (DatagramChannel dc = DatagramChannel.open()) { 577 InetAddress lh = InetAddress.getLoopbackAddress(); 578 dc.bind(new InetSocketAddress(lh, 0)); 579 580 // interrupt current thread when it blocks in receive 581 Thread thisThread = Thread.currentThread(); 582 runAfterParkedAsync(thisThread::interrupt); 583 584 try { 585 dc.receive(ByteBuffer.allocate(100)); 586 fail("receive returned"); 587 } catch (ClosedByInterruptException expected) { 588 assertTrue(Thread.interrupted()); 589 } 590 } 591 }); 592 } 593 594 /** 595 * Virtual thread blocks in DatagramSocket adaptor receive. 596 */ 597 @ParameterizedTest 598 @MethodSource("threadBuilders") 599 void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception { 600 testDatagramSocketAdaptorReceive(builder, 0); 601 } 602 603 /** 604 * Virtual thread blocks in DatagramSocket adaptor receive with timeout. 605 */ 606 @ParameterizedTest 607 @MethodSource("threadBuilders") 608 void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception { 609 testDatagramSocketAdaptorReceive(builder, 60_000); 610 } 611 612 private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder, 613 int timeout) throws Exception { 614 VThreadRunner.run(builder, () -> { 615 try (DatagramChannel dc1 = DatagramChannel.open(); 616 DatagramChannel dc2 = DatagramChannel.open()) { 617 618 InetAddress lh = InetAddress.getLoopbackAddress(); 619 dc2.bind(new InetSocketAddress(lh, 0)); 620 621 // send from dc1 when current thread blocks in dc2 receive 622 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 623 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress())); 624 625 // receive should block 626 byte[] array = new byte[100]; 627 DatagramPacket p = new DatagramPacket(array, 0, array.length); 628 if (timeout > 0) 629 dc2.socket().setSoTimeout(timeout); 630 dc2.socket().receive(p); 631 assertTrue(p.getLength() == 3 && array[0] == 'X'); 632 } 633 }); 634 } 635 636 /** 637 * DatagramChannel close while virtual thread blocked in adaptor receive. 638 */ 639 @ParameterizedTest 640 @MethodSource("threadBuilders") 641 void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception { 642 testDatagramSocketAdaptorReceiveAsyncClose(builder, 0); 643 } 644 645 /** 646 * DatagramChannel close while virtual thread blocked in adaptor receive 647 * with timeout. 648 */ 649 @ParameterizedTest 650 @MethodSource("threadBuilders") 651 void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception { 652 testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000); 653 } 654 655 private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder, 656 int timeout) throws Exception { 657 VThreadRunner.run(builder, () -> { 658 try (DatagramChannel dc = DatagramChannel.open()) { 659 InetAddress lh = InetAddress.getLoopbackAddress(); 660 dc.bind(new InetSocketAddress(lh, 0)); 661 662 byte[] array = new byte[100]; 663 DatagramPacket p = new DatagramPacket(array, 0, array.length); 664 if (timeout > 0) 665 dc.socket().setSoTimeout(timeout); 666 667 // close channel/socket when current thread blocks in receive 668 runAfterParkedAsync(dc::close); 669 670 assertThrows(SocketException.class, () -> dc.socket().receive(p)); 671 } 672 }); 673 } 674 675 /** 676 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive. 677 */ 678 @ParameterizedTest 679 @MethodSource("threadBuilders") 680 void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception { 681 testDatagramSocketAdaptorReceiveInterrupt(builder, 0); 682 } 683 684 /** 685 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive 686 * with timeout. 687 */ 688 @ParameterizedTest 689 @MethodSource("threadBuilders") 690 void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception { 691 testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000); 692 } 693 694 private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder, 695 int timeout) throws Exception { 696 VThreadRunner.run(builder, () -> { 697 try (DatagramChannel dc = DatagramChannel.open()) { 698 InetAddress lh = InetAddress.getLoopbackAddress(); 699 dc.bind(new InetSocketAddress(lh, 0)); 700 701 byte[] array = new byte[100]; 702 DatagramPacket p = new DatagramPacket(array, 0, array.length); 703 if (timeout > 0) 704 dc.socket().setSoTimeout(timeout); 705 706 // interrupt current thread when it blocks in receive 707 Thread thisThread = Thread.currentThread(); 708 runAfterParkedAsync(thisThread::interrupt); 709 710 try { 711 dc.socket().receive(p); 712 fail(); 713 } catch (ClosedByInterruptException expected) { 714 assertTrue(Thread.interrupted()); 715 } 716 } 717 }); 718 } 719 720 /** 721 * Pipe read/write, no blocking. 722 */ 723 @ParameterizedTest 724 @MethodSource("threadBuilders") 725 void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception { 726 VThreadRunner.run(builder, () -> { 727 Pipe p = Pipe.open(); 728 try (Pipe.SinkChannel sink = p.sink(); 729 Pipe.SourceChannel source = p.source()) { 730 731 // write should not block 732 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 733 int n = sink.write(bb); 734 assertTrue(n > 0); 735 736 // read should not block 737 bb = ByteBuffer.allocate(10); 738 n = source.read(bb); 739 assertTrue(n > 0); 740 assertTrue(bb.get(0) == 'X'); 741 } 742 }); 743 } 744 745 /** 746 * Virtual thread blocks in Pipe.SourceChannel read. 747 */ 748 @ParameterizedTest 749 @MethodSource("threadBuilders") 750 void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception { 751 VThreadRunner.run(builder, () -> { 752 Pipe p = Pipe.open(); 753 try (Pipe.SinkChannel sink = p.sink(); 754 Pipe.SourceChannel source = p.source()) { 755 756 // write from sink when current thread blocks reading from source 757 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 758 runAfterParkedAsync(() -> sink.write(bb1)); 759 760 // read should block 761 ByteBuffer bb2 = ByteBuffer.allocate(10); 762 int n = source.read(bb2); 763 assertTrue(n > 0); 764 assertTrue(bb2.get(0) == 'X'); 765 } 766 }); 767 } 768 769 /** 770 * Virtual thread blocks in Pipe.SinkChannel write. 771 */ 772 @ParameterizedTest 773 @MethodSource("threadBuilders") 774 void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception { 775 VThreadRunner.run(builder, () -> { 776 Pipe p = Pipe.open(); 777 try (Pipe.SinkChannel sink = p.sink(); 778 Pipe.SourceChannel source = p.source()) { 779 780 // read from source to EOF when current thread blocking in write 781 Thread reader = runAfterParkedAsync(() -> readToEOF(source)); 782 783 // write to sink should block 784 ByteBuffer bb = ByteBuffer.allocate(100*1024); 785 for (int i=0; i<1000; i++) { 786 int n = sink.write(bb); 787 assertTrue(n > 0); 788 bb.clear(); 789 } 790 sink.close(); 791 792 // wait for reader to finish 793 reader.join(); 794 } 795 }); 796 } 797 798 /** 799 * Pipe.SourceChannel close while virtual thread blocked in read. 800 */ 801 @ParameterizedTest 802 @MethodSource("threadBuilders") 803 void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 804 VThreadRunner.run(builder, () -> { 805 Pipe p = Pipe.open(); 806 try (Pipe.SinkChannel sink = p.sink(); 807 Pipe.SourceChannel source = p.source()) { 808 runAfterParkedAsync(source::close); 809 try { 810 int n = source.read(ByteBuffer.allocate(100)); 811 fail("read returned " + n); 812 } catch (AsynchronousCloseException expected) { } 813 } 814 }); 815 } 816 817 /** 818 * Virtual thread interrupted while blocked in Pipe.SourceChannel read. 819 */ 820 @ParameterizedTest 821 @MethodSource("threadBuilders") 822 void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 823 VThreadRunner.run(builder, () -> { 824 Pipe p = Pipe.open(); 825 try (Pipe.SinkChannel sink = p.sink(); 826 Pipe.SourceChannel source = p.source()) { 827 828 // interrupt current thread when it blocks reading from source 829 Thread thisThread = Thread.currentThread(); 830 runAfterParkedAsync(thisThread::interrupt); 831 832 try { 833 int n = source.read(ByteBuffer.allocate(100)); 834 fail("read returned " + n); 835 } catch (ClosedByInterruptException expected) { 836 assertTrue(Thread.interrupted()); 837 } 838 } 839 }); 840 } 841 842 /** 843 * Pipe.SinkChannel close while virtual thread blocked in write. 844 */ 845 @ParameterizedTest 846 @MethodSource("threadBuilders") 847 void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 848 VThreadRunner.run(builder, () -> { 849 boolean retry = true; 850 while (retry) { 851 Pipe p = Pipe.open(); 852 try (Pipe.SinkChannel sink = p.sink(); 853 Pipe.SourceChannel source = p.source()) { 854 855 // close sink when current thread blocks in write 856 runAfterParkedAsync(sink::close); 857 try { 858 ByteBuffer bb = ByteBuffer.allocate(100*1024); 859 for (;;) { 860 int n = sink.write(bb); 861 assertTrue(n > 0); 862 bb.clear(); 863 } 864 } catch (AsynchronousCloseException e) { 865 // closed when blocked in write 866 retry = false; 867 } catch (ClosedChannelException e) { 868 // closed when not blocked in write, need to retry test 869 } 870 } 871 } 872 }); 873 } 874 875 /** 876 * Virtual thread interrupted while blocked in Pipe.SinkChannel write. 877 */ 878 @ParameterizedTest 879 @MethodSource("threadBuilders") 880 void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 881 VThreadRunner.run(builder, () -> { 882 boolean retry = true; 883 while (retry) { 884 Pipe p = Pipe.open(); 885 try (Pipe.SinkChannel sink = p.sink(); 886 Pipe.SourceChannel source = p.source()) { 887 888 // interrupt current thread when it blocks in write 889 Thread thisThread = Thread.currentThread(); 890 runAfterParkedAsync(thisThread::interrupt); 891 892 try { 893 ByteBuffer bb = ByteBuffer.allocate(100*1024); 894 for (;;) { 895 int n = sink.write(bb); 896 assertTrue(n > 0); 897 bb.clear(); 898 } 899 } catch (ClosedByInterruptException expected) { 900 // closed when blocked in write 901 assertTrue(Thread.interrupted()); 902 retry = false; 903 } catch (ClosedChannelException e) { 904 // closed when not blocked in write, need to retry test 905 } 906 } 907 } 908 }); 909 } 910 911 /** 912 * Creates a loopback connection 913 */ 914 static class Connection implements Closeable { 915 private final SocketChannel sc1; 916 private final SocketChannel sc2; 917 Connection() throws IOException { 918 var lh = InetAddress.getLoopbackAddress(); 919 try (var listener = ServerSocketChannel.open()) { 920 listener.bind(new InetSocketAddress(lh, 0)); 921 SocketChannel sc1 = SocketChannel.open(); 922 SocketChannel sc2 = null; 923 try { 924 sc1.socket().connect(listener.getLocalAddress()); 925 sc2 = listener.accept(); 926 } catch (IOException ioe) { 927 sc1.close(); 928 throw ioe; 929 } 930 this.sc1 = sc1; 931 this.sc2 = sc2; 932 } 933 } 934 SocketChannel channel1() { 935 return sc1; 936 } 937 SocketChannel channel2() { 938 return sc2; 939 } 940 @Override 941 public void close() throws IOException { 942 sc1.close(); 943 sc2.close(); 944 } 945 } 946 947 /** 948 * Read from a channel until all bytes have been read or an I/O error occurs. 949 */ 950 static void readToEOF(ReadableByteChannel rbc) throws IOException { 951 ByteBuffer bb = ByteBuffer.allocate(16*1024); 952 int n; 953 while ((n = rbc.read(bb)) > 0) { 954 bb.clear(); 955 } 956 } 957 958 @FunctionalInterface 959 interface ThrowingRunnable { 960 void run() throws Exception; 961 } 962 963 /** 964 * Runs the given task asynchronously after the current virtual thread has parked. 965 * @return the thread started to run the task 966 */ 967 static Thread runAfterParkedAsync(ThrowingRunnable task) { 968 Thread target = Thread.currentThread(); 969 if (!target.isVirtual()) 970 throw new WrongThreadException(); 971 return Thread.ofPlatform().daemon().start(() -> { 972 try { 973 Thread.State state = target.getState(); 974 while (state != Thread.State.WAITING 975 && state != Thread.State.TIMED_WAITING) { 976 Thread.sleep(20); 977 state = target.getState(); 978 } 979 Thread.sleep(20); // give a bit more time to release carrier 980 task.run(); 981 } catch (Exception e) { 982 e.printStackTrace(); 983 } 984 }); 985 } 986 }