1 /* 2 * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /** 25 * @test id=default 26 * @bug 8284161 27 * @summary Test virtual threads doing blocking I/O on NIO channels 28 * @library /test/lib 29 * @run junit BlockingChannelOps 30 */ 31 32 /** 33 * @test id=poller-modes 34 * @requires (os.family == "linux") | (os.family == "mac") 35 * @library /test/lib 36 * @run junit/othervm -Djdk.pollerMode=1 BlockingChannelOps 37 * @run junit/othervm -Djdk.pollerMode=2 BlockingChannelOps 38 */ 39 40 /** 41 * @test id=no-vmcontinuations 42 * @requires vm.continuations 43 * @library /test/lib 44 * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps 45 */ 46 47 import java.io.Closeable; 48 import java.io.IOException; 49 import java.net.DatagramPacket; 50 import java.net.InetAddress; 51 import java.net.InetSocketAddress; 52 import java.net.Socket; 53 import java.net.SocketAddress; 54 import java.net.SocketException; 55 import java.nio.ByteBuffer; 56 import java.nio.channels.AsynchronousCloseException; 57 import java.nio.channels.ClosedByInterruptException; 58 import java.nio.channels.ClosedChannelException; 59 import java.nio.channels.DatagramChannel; 60 import java.nio.channels.Pipe; 61 import java.nio.channels.ReadableByteChannel; 62 import java.nio.channels.ServerSocketChannel; 63 import java.nio.channels.SocketChannel; 64 import java.nio.channels.WritableByteChannel; 65 import java.util.concurrent.ExecutorService; 66 import java.util.concurrent.Executors; 67 import java.util.stream.Stream; 68 69 import jdk.test.lib.thread.VThreadRunner; 70 import jdk.test.lib.thread.VThreadScheduler; 71 72 import org.junit.jupiter.api.BeforeAll; 73 import org.junit.jupiter.api.AfterAll; 74 import org.junit.jupiter.params.ParameterizedTest; 75 import org.junit.jupiter.params.provider.MethodSource; 76 import static org.junit.jupiter.api.Assertions.*; 77 78 class BlockingChannelOps { 79 private static ExecutorService threadPool; 80 private static Thread.VirtualThreadScheduler customScheduler; 81 82 @BeforeAll 83 static void setup() { 84 threadPool = Executors.newCachedThreadPool(); 85 customScheduler = (_, task) -> threadPool.execute(task); 86 } 87 88 @AfterAll 89 static void finish() { 90 threadPool.shutdown(); 91 } 92 93 /** 94 * Returns the Thread.Builder to create the virtual thread. 95 */ 96 static Stream<Thread.Builder.OfVirtual> threadBuilders() { 97 if (VThreadScheduler.supportsCustomScheduler()) { 98 return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler)); 99 } else { 100 return Stream.of(Thread.ofVirtual()); 101 } 102 } 103 104 /** 105 * SocketChannel read/write, no blocking. 106 */ 107 @ParameterizedTest 108 @MethodSource("threadBuilders") 109 void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception { 110 VThreadRunner.run(builder, () -> { 111 try (var connection = new Connection()) { 112 SocketChannel sc1 = connection.channel1(); 113 SocketChannel sc2 = connection.channel2(); 114 115 // write to sc1 116 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 117 int n = sc1.write(bb); 118 assertTrue(n > 0); 119 120 // read from sc2 should not block 121 bb = ByteBuffer.allocate(10); 122 n = sc2.read(bb); 123 assertTrue(n > 0); 124 assertTrue(bb.get(0) == 'X'); 125 } 126 }); 127 } 128 129 /** 130 * Virtual thread blocks in SocketChannel read. 131 */ 132 @ParameterizedTest 133 @MethodSource("threadBuilders") 134 void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception { 135 VThreadRunner.run(builder, () -> { 136 try (var connection = new Connection()) { 137 SocketChannel sc1 = connection.channel1(); 138 SocketChannel sc2 = connection.channel2(); 139 140 // write to sc1 when current thread blocks in sc2.read 141 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 142 runAfterParkedAsync(() -> sc1.write(bb1)); 143 144 // read from sc2 should block 145 ByteBuffer bb2 = ByteBuffer.allocate(10); 146 int n = sc2.read(bb2); 147 assertTrue(n > 0); 148 assertTrue(bb2.get(0) == 'X'); 149 } 150 }); 151 } 152 153 /** 154 * Virtual thread blocks in SocketChannel write. 155 */ 156 @ParameterizedTest 157 @MethodSource("threadBuilders") 158 void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception { 159 VThreadRunner.run(builder, () -> { 160 try (var connection = new Connection()) { 161 SocketChannel sc1 = connection.channel1(); 162 SocketChannel sc2 = connection.channel2(); 163 164 // read from sc2 to EOF when current thread blocks in sc1.write 165 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2)); 166 167 // write to sc1 should block 168 ByteBuffer bb = ByteBuffer.allocate(100*1024); 169 for (int i=0; i<1000; i++) { 170 int n = sc1.write(bb); 171 assertTrue(n > 0); 172 bb.clear(); 173 } 174 sc1.close(); 175 176 // wait for reader to finish 177 reader.join(); 178 } 179 }); 180 } 181 182 /** 183 * SocketChannel close while virtual thread blocked in read. 184 */ 185 @ParameterizedTest 186 @MethodSource("threadBuilders") 187 void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 188 VThreadRunner.run(builder, () -> { 189 try (var connection = new Connection()) { 190 SocketChannel sc = connection.channel1(); 191 runAfterParkedAsync(sc::close); 192 try { 193 int n = sc.read(ByteBuffer.allocate(100)); 194 fail("read returned " + n); 195 } catch (AsynchronousCloseException expected) { } 196 } 197 }); 198 } 199 200 /** 201 * SocketChannel shutdownInput while virtual thread blocked in read. 202 */ 203 @ParameterizedTest 204 @MethodSource("threadBuilders") 205 void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception { 206 VThreadRunner.run(builder, () -> { 207 try (var connection = new Connection()) { 208 SocketChannel sc = connection.channel1(); 209 runAfterParkedAsync(sc::shutdownInput); 210 int n = sc.read(ByteBuffer.allocate(100)); 211 assertEquals(-1, n); 212 assertTrue(sc.isOpen()); 213 } 214 }); 215 } 216 217 /** 218 * Virtual thread interrupted while blocked in SocketChannel read. 219 */ 220 @ParameterizedTest 221 @MethodSource("threadBuilders") 222 void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 223 VThreadRunner.run(builder, () -> { 224 try (var connection = new Connection()) { 225 SocketChannel sc = connection.channel1(); 226 227 // interrupt current thread when it blocks in read 228 Thread thisThread = Thread.currentThread(); 229 runAfterParkedAsync(thisThread::interrupt); 230 231 try { 232 int n = sc.read(ByteBuffer.allocate(100)); 233 fail("read returned " + n); 234 } catch (ClosedByInterruptException expected) { 235 assertTrue(Thread.interrupted()); 236 } 237 } 238 }); 239 } 240 241 /** 242 * SocketChannel close while virtual thread blocked in write. 243 */ 244 @ParameterizedTest 245 @MethodSource("threadBuilders") 246 void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 247 VThreadRunner.run(builder, () -> { 248 boolean retry = true; 249 while (retry) { 250 try (var connection = new Connection()) { 251 SocketChannel sc = connection.channel1(); 252 253 // close sc when current thread blocks in write 254 runAfterParkedAsync(sc::close); 255 try { 256 ByteBuffer bb = ByteBuffer.allocate(100*1024); 257 for (;;) { 258 int n = sc.write(bb); 259 assertTrue(n > 0); 260 bb.clear(); 261 } 262 } catch (AsynchronousCloseException expected) { 263 // closed when blocked in write 264 retry = false; 265 } catch (ClosedChannelException e) { 266 // closed when not blocked in write, need to retry test 267 } 268 } 269 } 270 }); 271 } 272 273 274 /** 275 * SocketChannel shutdownOutput while virtual thread blocked in write. 276 */ 277 @ParameterizedTest 278 @MethodSource("threadBuilders") 279 void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception { 280 VThreadRunner.run(builder, () -> { 281 try (var connection = new Connection()) { 282 SocketChannel sc = connection.channel1(); 283 284 // shutdown output when current thread blocks in write 285 runAfterParkedAsync(sc::shutdownOutput); 286 try { 287 ByteBuffer bb = ByteBuffer.allocate(100*1024); 288 for (;;) { 289 int n = sc.write(bb); 290 assertTrue(n > 0); 291 bb.clear(); 292 } 293 } catch (ClosedChannelException e) { 294 // expected 295 } 296 assertTrue(sc.isOpen()); 297 } 298 }); 299 } 300 301 /** 302 * Virtual thread interrupted while blocked in SocketChannel write. 303 */ 304 @ParameterizedTest 305 @MethodSource("threadBuilders") 306 void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 307 VThreadRunner.run(builder, () -> { 308 boolean retry = true; 309 while (retry) { 310 try (var connection = new Connection()) { 311 SocketChannel sc = connection.channel1(); 312 313 // interrupt current thread when it blocks in write 314 Thread thisThread = Thread.currentThread(); 315 runAfterParkedAsync(thisThread::interrupt); 316 317 try { 318 ByteBuffer bb = ByteBuffer.allocate(100*1024); 319 for (;;) { 320 int n = sc.write(bb); 321 assertTrue(n > 0); 322 bb.clear(); 323 } 324 } catch (ClosedByInterruptException e) { 325 // closed when blocked in write 326 assertTrue(Thread.interrupted()); 327 retry = false; 328 } catch (ClosedChannelException e) { 329 // closed when not blocked in write, need to retry test 330 } 331 } 332 } 333 }); 334 } 335 336 /** 337 * Virtual thread blocks in SocketChannel adaptor read. 338 */ 339 @ParameterizedTest 340 @MethodSource("threadBuilders") 341 void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception { 342 testSocketAdaptorRead(builder, 0); 343 } 344 345 /** 346 * Virtual thread blocks in SocketChannel adaptor read with timeout. 347 */ 348 @ParameterizedTest 349 @MethodSource("threadBuilders") 350 void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception { 351 testSocketAdaptorRead(builder, 60_000); 352 } 353 354 private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder, 355 int timeout) throws Exception { 356 VThreadRunner.run(builder, () -> { 357 try (var connection = new Connection()) { 358 SocketChannel sc1 = connection.channel1(); 359 SocketChannel sc2 = connection.channel2(); 360 361 // write to sc1 when currnet thread blocks reading from sc2 362 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 363 runAfterParkedAsync(() -> sc1.write(bb)); 364 365 // read from sc2 should block 366 byte[] array = new byte[100]; 367 if (timeout > 0) 368 sc2.socket().setSoTimeout(timeout); 369 int n = sc2.socket().getInputStream().read(array); 370 assertTrue(n > 0); 371 assertTrue(array[0] == 'X'); 372 } 373 }); 374 } 375 376 /** 377 * ServerSocketChannel accept, no blocking. 378 */ 379 @ParameterizedTest 380 @MethodSource("threadBuilders") 381 void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception { 382 VThreadRunner.run(builder, () -> { 383 try (var ssc = ServerSocketChannel.open()) { 384 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 385 var sc1 = SocketChannel.open(ssc.getLocalAddress()); 386 // accept should not block 387 var sc2 = ssc.accept(); 388 sc1.close(); 389 sc2.close(); 390 } 391 }); 392 } 393 394 /** 395 * Virtual thread blocks in ServerSocketChannel accept. 396 */ 397 @ParameterizedTest 398 @MethodSource("threadBuilders") 399 void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception { 400 VThreadRunner.run(builder, () -> { 401 try (var ssc = ServerSocketChannel.open()) { 402 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 403 var sc1 = SocketChannel.open(); 404 405 // connect when current thread when it blocks in accept 406 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress())); 407 408 // accept should block 409 var sc2 = ssc.accept(); 410 sc1.close(); 411 sc2.close(); 412 } 413 }); 414 } 415 416 /** 417 * SeverSocketChannel close while virtual thread blocked in accept. 418 */ 419 @ParameterizedTest 420 @MethodSource("threadBuilders") 421 void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 422 VThreadRunner.run(builder, () -> { 423 try (var ssc = ServerSocketChannel.open()) { 424 InetAddress lh = InetAddress.getLoopbackAddress(); 425 ssc.bind(new InetSocketAddress(lh, 0)); 426 runAfterParkedAsync(ssc::close); 427 try { 428 SocketChannel sc = ssc.accept(); 429 sc.close(); 430 fail("connection accepted???"); 431 } catch (AsynchronousCloseException expected) { } 432 } 433 }); 434 } 435 436 /** 437 * Virtual thread interrupted while blocked in ServerSocketChannel accept. 438 */ 439 @ParameterizedTest 440 @MethodSource("threadBuilders") 441 void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 442 VThreadRunner.run(builder, () -> { 443 try (var ssc = ServerSocketChannel.open()) { 444 InetAddress lh = InetAddress.getLoopbackAddress(); 445 ssc.bind(new InetSocketAddress(lh, 0)); 446 447 // interrupt current thread when it blocks in accept 448 Thread thisThread = Thread.currentThread(); 449 runAfterParkedAsync(thisThread::interrupt); 450 451 try { 452 SocketChannel sc = ssc.accept(); 453 sc.close(); 454 fail("connection accepted???"); 455 } catch (ClosedByInterruptException expected) { 456 assertTrue(Thread.interrupted()); 457 } 458 } 459 }); 460 } 461 462 /** 463 * Virtual thread blocks in ServerSocketChannel adaptor accept. 464 */ 465 @ParameterizedTest 466 @MethodSource("threadBuilders") 467 void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception { 468 testSocketChannelAdaptorAccept(builder, 0); 469 } 470 471 /** 472 * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout. 473 */ 474 @ParameterizedTest 475 @MethodSource("threadBuilders") 476 void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception { 477 testSocketChannelAdaptorAccept(builder, 60_000); 478 } 479 480 private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder, 481 int timeout) throws Exception { 482 VThreadRunner.run(builder, () -> { 483 try (var ssc = ServerSocketChannel.open()) { 484 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 485 var sc = SocketChannel.open(); 486 487 // interrupt current thread when it blocks in accept 488 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress())); 489 490 // accept should block 491 if (timeout > 0) 492 ssc.socket().setSoTimeout(timeout); 493 Socket s = ssc.socket().accept(); 494 sc.close(); 495 s.close(); 496 } 497 }); 498 } 499 500 /** 501 * DatagramChannel receive/send, no blocking. 502 */ 503 @ParameterizedTest 504 @MethodSource("threadBuilders") 505 void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception { 506 VThreadRunner.run(builder, () -> { 507 try (DatagramChannel dc1 = DatagramChannel.open(); 508 DatagramChannel dc2 = DatagramChannel.open()) { 509 510 InetAddress lh = InetAddress.getLoopbackAddress(); 511 dc2.bind(new InetSocketAddress(lh, 0)); 512 513 // send should not block 514 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 515 int n = dc1.send(bb, dc2.getLocalAddress()); 516 assertTrue(n > 0); 517 518 // receive should not block 519 bb = ByteBuffer.allocate(10); 520 dc2.receive(bb); 521 assertTrue(bb.get(0) == 'X'); 522 } 523 }); 524 } 525 526 /** 527 * Virtual thread blocks in DatagramChannel receive. 528 */ 529 @ParameterizedTest 530 @MethodSource("threadBuilders") 531 void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception { 532 VThreadRunner.run(builder, () -> { 533 try (DatagramChannel dc1 = DatagramChannel.open(); 534 DatagramChannel dc2 = DatagramChannel.open()) { 535 536 InetAddress lh = InetAddress.getLoopbackAddress(); 537 dc2.bind(new InetSocketAddress(lh, 0)); 538 539 // send from dc1 when current thread blocked in dc2.receive 540 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 541 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress())); 542 543 // read from dc2 should block 544 ByteBuffer bb2 = ByteBuffer.allocate(10); 545 dc2.receive(bb2); 546 assertTrue(bb2.get(0) == 'X'); 547 } 548 }); 549 } 550 551 /** 552 * DatagramChannel close while virtual thread blocked in receive. 553 */ 554 @ParameterizedTest 555 @MethodSource("threadBuilders") 556 void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 557 VThreadRunner.run(builder, () -> { 558 try (DatagramChannel dc = DatagramChannel.open()) { 559 InetAddress lh = InetAddress.getLoopbackAddress(); 560 dc.bind(new InetSocketAddress(lh, 0)); 561 runAfterParkedAsync(dc::close); 562 try { 563 dc.receive(ByteBuffer.allocate(100)); 564 fail("receive returned"); 565 } catch (AsynchronousCloseException expected) { } 566 } 567 }); 568 } 569 570 /** 571 * Virtual thread interrupted while blocked in DatagramChannel receive. 572 */ 573 @ParameterizedTest 574 @MethodSource("threadBuilders") 575 void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 576 VThreadRunner.run(builder, () -> { 577 try (DatagramChannel dc = DatagramChannel.open()) { 578 InetAddress lh = InetAddress.getLoopbackAddress(); 579 dc.bind(new InetSocketAddress(lh, 0)); 580 581 // interrupt current thread when it blocks in receive 582 Thread thisThread = Thread.currentThread(); 583 runAfterParkedAsync(thisThread::interrupt); 584 585 try { 586 dc.receive(ByteBuffer.allocate(100)); 587 fail("receive returned"); 588 } catch (ClosedByInterruptException expected) { 589 assertTrue(Thread.interrupted()); 590 } 591 } 592 }); 593 } 594 595 /** 596 * Virtual thread blocks in DatagramSocket adaptor receive. 597 */ 598 @ParameterizedTest 599 @MethodSource("threadBuilders") 600 void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception { 601 testDatagramSocketAdaptorReceive(builder, 0); 602 } 603 604 /** 605 * Virtual thread blocks in DatagramSocket adaptor receive with timeout. 606 */ 607 @ParameterizedTest 608 @MethodSource("threadBuilders") 609 void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception { 610 testDatagramSocketAdaptorReceive(builder, 60_000); 611 } 612 613 private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder, 614 int timeout) throws Exception { 615 VThreadRunner.run(builder, () -> { 616 try (DatagramChannel dc1 = DatagramChannel.open(); 617 DatagramChannel dc2 = DatagramChannel.open()) { 618 619 InetAddress lh = InetAddress.getLoopbackAddress(); 620 dc2.bind(new InetSocketAddress(lh, 0)); 621 622 // send from dc1 when current thread blocks in dc2 receive 623 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 624 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress())); 625 626 // receive should block 627 byte[] array = new byte[100]; 628 DatagramPacket p = new DatagramPacket(array, 0, array.length); 629 if (timeout > 0) 630 dc2.socket().setSoTimeout(timeout); 631 dc2.socket().receive(p); 632 assertTrue(p.getLength() == 3 && array[0] == 'X'); 633 } 634 }); 635 } 636 637 /** 638 * DatagramChannel close while virtual thread blocked in adaptor receive. 639 */ 640 @ParameterizedTest 641 @MethodSource("threadBuilders") 642 void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception { 643 testDatagramSocketAdaptorReceiveAsyncClose(builder, 0); 644 } 645 646 /** 647 * DatagramChannel close while virtual thread blocked in adaptor receive 648 * with timeout. 649 */ 650 @ParameterizedTest 651 @MethodSource("threadBuilders") 652 void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception { 653 testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000); 654 } 655 656 private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder, 657 int timeout) throws Exception { 658 VThreadRunner.run(builder, () -> { 659 try (DatagramChannel dc = DatagramChannel.open()) { 660 InetAddress lh = InetAddress.getLoopbackAddress(); 661 dc.bind(new InetSocketAddress(lh, 0)); 662 663 byte[] array = new byte[100]; 664 DatagramPacket p = new DatagramPacket(array, 0, array.length); 665 if (timeout > 0) 666 dc.socket().setSoTimeout(timeout); 667 668 // close channel/socket when current thread blocks in receive 669 runAfterParkedAsync(dc::close); 670 671 assertThrows(SocketException.class, () -> dc.socket().receive(p)); 672 } 673 }); 674 } 675 676 /** 677 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive. 678 */ 679 @ParameterizedTest 680 @MethodSource("threadBuilders") 681 void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception { 682 testDatagramSocketAdaptorReceiveInterrupt(builder, 0); 683 } 684 685 /** 686 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive 687 * with timeout. 688 */ 689 @ParameterizedTest 690 @MethodSource("threadBuilders") 691 void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception { 692 testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000); 693 } 694 695 private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder, 696 int timeout) throws Exception { 697 VThreadRunner.run(builder, () -> { 698 try (DatagramChannel dc = DatagramChannel.open()) { 699 InetAddress lh = InetAddress.getLoopbackAddress(); 700 dc.bind(new InetSocketAddress(lh, 0)); 701 702 byte[] array = new byte[100]; 703 DatagramPacket p = new DatagramPacket(array, 0, array.length); 704 if (timeout > 0) 705 dc.socket().setSoTimeout(timeout); 706 707 // interrupt current thread when it blocks in receive 708 Thread thisThread = Thread.currentThread(); 709 runAfterParkedAsync(thisThread::interrupt); 710 711 try { 712 dc.socket().receive(p); 713 fail(); 714 } catch (ClosedByInterruptException expected) { 715 assertTrue(Thread.interrupted()); 716 } 717 } 718 }); 719 } 720 721 /** 722 * Pipe read/write, no blocking. 723 */ 724 @ParameterizedTest 725 @MethodSource("threadBuilders") 726 void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception { 727 VThreadRunner.run(builder, () -> { 728 Pipe p = Pipe.open(); 729 try (Pipe.SinkChannel sink = p.sink(); 730 Pipe.SourceChannel source = p.source()) { 731 732 // write should not block 733 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 734 int n = sink.write(bb); 735 assertTrue(n > 0); 736 737 // read should not block 738 bb = ByteBuffer.allocate(10); 739 n = source.read(bb); 740 assertTrue(n > 0); 741 assertTrue(bb.get(0) == 'X'); 742 } 743 }); 744 } 745 746 /** 747 * Virtual thread blocks in Pipe.SourceChannel read. 748 */ 749 @ParameterizedTest 750 @MethodSource("threadBuilders") 751 void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception { 752 VThreadRunner.run(builder, () -> { 753 Pipe p = Pipe.open(); 754 try (Pipe.SinkChannel sink = p.sink(); 755 Pipe.SourceChannel source = p.source()) { 756 757 // write from sink when current thread blocks reading from source 758 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 759 runAfterParkedAsync(() -> sink.write(bb1)); 760 761 // read should block 762 ByteBuffer bb2 = ByteBuffer.allocate(10); 763 int n = source.read(bb2); 764 assertTrue(n > 0); 765 assertTrue(bb2.get(0) == 'X'); 766 } 767 }); 768 } 769 770 /** 771 * Virtual thread blocks in Pipe.SinkChannel write. 772 */ 773 @ParameterizedTest 774 @MethodSource("threadBuilders") 775 void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception { 776 VThreadRunner.run(builder, () -> { 777 Pipe p = Pipe.open(); 778 try (Pipe.SinkChannel sink = p.sink(); 779 Pipe.SourceChannel source = p.source()) { 780 781 // read from source to EOF when current thread blocking in write 782 Thread reader = runAfterParkedAsync(() -> readToEOF(source)); 783 784 // write to sink should block 785 ByteBuffer bb = ByteBuffer.allocate(100*1024); 786 for (int i=0; i<1000; i++) { 787 int n = sink.write(bb); 788 assertTrue(n > 0); 789 bb.clear(); 790 } 791 sink.close(); 792 793 // wait for reader to finish 794 reader.join(); 795 } 796 }); 797 } 798 799 /** 800 * Pipe.SourceChannel close while virtual thread blocked in read. 801 */ 802 @ParameterizedTest 803 @MethodSource("threadBuilders") 804 void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 805 VThreadRunner.run(builder, () -> { 806 Pipe p = Pipe.open(); 807 try (Pipe.SinkChannel sink = p.sink(); 808 Pipe.SourceChannel source = p.source()) { 809 runAfterParkedAsync(source::close); 810 try { 811 int n = source.read(ByteBuffer.allocate(100)); 812 fail("read returned " + n); 813 } catch (AsynchronousCloseException expected) { } 814 } 815 }); 816 } 817 818 /** 819 * Virtual thread interrupted while blocked in Pipe.SourceChannel read. 820 */ 821 @ParameterizedTest 822 @MethodSource("threadBuilders") 823 void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 824 VThreadRunner.run(builder, () -> { 825 Pipe p = Pipe.open(); 826 try (Pipe.SinkChannel sink = p.sink(); 827 Pipe.SourceChannel source = p.source()) { 828 829 // interrupt current thread when it blocks reading from source 830 Thread thisThread = Thread.currentThread(); 831 runAfterParkedAsync(thisThread::interrupt); 832 833 try { 834 int n = source.read(ByteBuffer.allocate(100)); 835 fail("read returned " + n); 836 } catch (ClosedByInterruptException expected) { 837 assertTrue(Thread.interrupted()); 838 } 839 } 840 }); 841 } 842 843 /** 844 * Pipe.SinkChannel close while virtual thread blocked in write. 845 */ 846 @ParameterizedTest 847 @MethodSource("threadBuilders") 848 void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 849 VThreadRunner.run(builder, () -> { 850 boolean retry = true; 851 while (retry) { 852 Pipe p = Pipe.open(); 853 try (Pipe.SinkChannel sink = p.sink(); 854 Pipe.SourceChannel source = p.source()) { 855 856 // close sink when current thread blocks in write 857 runAfterParkedAsync(sink::close); 858 try { 859 ByteBuffer bb = ByteBuffer.allocate(100*1024); 860 for (;;) { 861 int n = sink.write(bb); 862 assertTrue(n > 0); 863 bb.clear(); 864 } 865 } catch (AsynchronousCloseException e) { 866 // closed when blocked in write 867 retry = false; 868 } catch (ClosedChannelException e) { 869 // closed when not blocked in write, need to retry test 870 } 871 } 872 } 873 }); 874 } 875 876 /** 877 * Virtual thread interrupted while blocked in Pipe.SinkChannel write. 878 */ 879 @ParameterizedTest 880 @MethodSource("threadBuilders") 881 void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 882 VThreadRunner.run(builder, () -> { 883 boolean retry = true; 884 while (retry) { 885 Pipe p = Pipe.open(); 886 try (Pipe.SinkChannel sink = p.sink(); 887 Pipe.SourceChannel source = p.source()) { 888 889 // interrupt current thread when it blocks in write 890 Thread thisThread = Thread.currentThread(); 891 runAfterParkedAsync(thisThread::interrupt); 892 893 try { 894 ByteBuffer bb = ByteBuffer.allocate(100*1024); 895 for (;;) { 896 int n = sink.write(bb); 897 assertTrue(n > 0); 898 bb.clear(); 899 } 900 } catch (ClosedByInterruptException expected) { 901 // closed when blocked in write 902 assertTrue(Thread.interrupted()); 903 retry = false; 904 } catch (ClosedChannelException e) { 905 // closed when not blocked in write, need to retry test 906 } 907 } 908 } 909 }); 910 } 911 912 /** 913 * Creates a loopback connection 914 */ 915 static class Connection implements Closeable { 916 private final SocketChannel sc1; 917 private final SocketChannel sc2; 918 Connection() throws IOException { 919 var lh = InetAddress.getLoopbackAddress(); 920 try (var listener = ServerSocketChannel.open()) { 921 listener.bind(new InetSocketAddress(lh, 0)); 922 SocketChannel sc1 = SocketChannel.open(); 923 SocketChannel sc2 = null; 924 try { 925 sc1.socket().connect(listener.getLocalAddress()); 926 sc2 = listener.accept(); 927 } catch (IOException ioe) { 928 sc1.close(); 929 throw ioe; 930 } 931 this.sc1 = sc1; 932 this.sc2 = sc2; 933 } 934 } 935 SocketChannel channel1() { 936 return sc1; 937 } 938 SocketChannel channel2() { 939 return sc2; 940 } 941 @Override 942 public void close() throws IOException { 943 sc1.close(); 944 sc2.close(); 945 } 946 } 947 948 /** 949 * Read from a channel until all bytes have been read or an I/O error occurs. 950 */ 951 static void readToEOF(ReadableByteChannel rbc) throws IOException { 952 ByteBuffer bb = ByteBuffer.allocate(16*1024); 953 int n; 954 while ((n = rbc.read(bb)) > 0) { 955 bb.clear(); 956 } 957 } 958 959 @FunctionalInterface 960 interface ThrowingRunnable { 961 void run() throws Exception; 962 } 963 964 /** 965 * Runs the given task asynchronously after the current virtual thread has parked. 966 * @return the thread started to run the task 967 */ 968 static Thread runAfterParkedAsync(ThrowingRunnable task) { 969 Thread target = Thread.currentThread(); 970 if (!target.isVirtual()) 971 throw new WrongThreadException(); 972 return Thread.ofPlatform().daemon().start(() -> { 973 try { 974 Thread.State state = target.getState(); 975 while (state != Thread.State.WAITING 976 && state != Thread.State.TIMED_WAITING) { 977 Thread.sleep(20); 978 state = target.getState(); 979 } 980 Thread.sleep(20); // give a bit more time to release carrier 981 task.run(); 982 } catch (Exception e) { 983 e.printStackTrace(); 984 } 985 }); 986 } 987 }