1 /* 2 * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test id=default 26 * @bug 8284161 27 * @summary Test virtual threads doing blocking I/O on NIO channels 28 * @library /test/lib 29 * @run junit/othervm BlockingChannelOps 30 */ 31 32 /* 33 * @test id=poller-modes 34 * @requires (os.family == "linux") | (os.family == "mac") 35 * @library /test/lib 36 * @run junit/othervm -Djdk.pollerMode=1 BlockingChannelOps 37 * @run junit/othervm -Djdk.pollerMode=2 BlockingChannelOps 38 * @run junit/othervm -Djdk.pollerMode=3 BlockingChannelOps 39 */ 40 41 /* 42 * @test id=io_uring 43 * @requires os.family == "linux" 44 * @library /test/lib 45 * @run junit/othervm -Djdk.pollerMode=1 -Djdk.io_uring=true BlockingChannelOps 46 * @run junit/othervm -Djdk.pollerMode=2 -Djdk.io_uring=true BlockingChannelOps 47 * @run junit/othervm -Djdk.pollerMode=3 -Djdk.io_uring=true BlockingChannelOps 48 */ 49 50 /* 51 * @test id=no-vmcontinuations 52 * @requires vm.continuations 53 * @library /test/lib 54 * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps 55 */ 56 57 import java.io.Closeable; 58 import java.io.IOException; 59 import java.net.DatagramPacket; 60 import java.net.InetAddress; 61 import java.net.InetSocketAddress; 62 import java.net.Socket; 63 import java.net.SocketAddress; 64 import java.net.SocketException; 65 import java.nio.ByteBuffer; 66 import java.nio.channels.AsynchronousCloseException; 67 import java.nio.channels.ClosedByInterruptException; 68 import java.nio.channels.ClosedChannelException; 69 import java.nio.channels.DatagramChannel; 70 import java.nio.channels.Pipe; 71 import java.nio.channels.ReadableByteChannel; 72 import java.nio.channels.ServerSocketChannel; 73 import java.nio.channels.SocketChannel; 74 import java.nio.channels.WritableByteChannel; 75 import java.util.concurrent.ExecutorService; 76 import java.util.concurrent.Executors; 77 import java.util.concurrent.ThreadFactory; 78 import java.util.stream.Stream; 79 80 import jdk.test.lib.thread.VThreadRunner; 81 import jdk.test.lib.thread.VThreadScheduler; 82 83 import org.junit.jupiter.api.BeforeAll; 84 import org.junit.jupiter.params.ParameterizedTest; 85 import org.junit.jupiter.params.provider.MethodSource; 86 import static org.junit.jupiter.api.Assertions.*; 87 88 class BlockingChannelOps { 89 private static ExecutorService threadPool; 90 private static Thread.VirtualThreadScheduler customScheduler; 91 92 @BeforeAll 93 static void setup() { 94 ThreadFactory factory = Thread.ofPlatform().daemon().factory(); 95 threadPool = Executors.newCachedThreadPool(factory); 96 customScheduler = (_, task) -> threadPool.execute(task); 97 } 98 99 /** 100 * Returns the Thread.Builder to create the virtual thread. 101 */ 102 static Stream<Thread.Builder.OfVirtual> threadBuilders() { 103 if (VThreadScheduler.supportsCustomScheduler()) { 104 return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler)); 105 } else { 106 return Stream.of(Thread.ofVirtual()); 107 } 108 } 109 110 /** 111 * SocketChannel read/write, no blocking. 112 */ 113 @ParameterizedTest 114 @MethodSource("threadBuilders") 115 void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception { 116 VThreadRunner.run(builder, () -> { 117 try (var connection = new Connection()) { 118 SocketChannel sc1 = connection.channel1(); 119 SocketChannel sc2 = connection.channel2(); 120 121 // write to sc1 122 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 123 int n = sc1.write(bb); 124 assertTrue(n > 0); 125 126 // read from sc2 should not block 127 bb = ByteBuffer.allocate(10); 128 n = sc2.read(bb); 129 assertTrue(n > 0); 130 assertTrue(bb.get(0) == 'X'); 131 } 132 }); 133 } 134 135 /** 136 * Virtual thread blocks in SocketChannel read. 137 */ 138 @ParameterizedTest 139 @MethodSource("threadBuilders") 140 void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception { 141 VThreadRunner.run(builder, () -> { 142 try (var connection = new Connection()) { 143 SocketChannel sc1 = connection.channel1(); 144 SocketChannel sc2 = connection.channel2(); 145 146 // write to sc1 when current thread blocks in sc2.read 147 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 148 runAfterParkedAsync(() -> sc1.write(bb1)); 149 150 // read from sc2 should block 151 ByteBuffer bb2 = ByteBuffer.allocate(10); 152 int n = sc2.read(bb2); 153 assertTrue(n > 0); 154 assertTrue(bb2.get(0) == 'X'); 155 } 156 }); 157 } 158 159 /** 160 * Virtual thread blocks in SocketChannel write. 161 */ 162 @ParameterizedTest 163 @MethodSource("threadBuilders") 164 void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception { 165 VThreadRunner.run(builder, () -> { 166 try (var connection = new Connection()) { 167 SocketChannel sc1 = connection.channel1(); 168 SocketChannel sc2 = connection.channel2(); 169 170 // read from sc2 to EOF when current thread blocks in sc1.write 171 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2)); 172 173 // write to sc1 should block 174 ByteBuffer bb = ByteBuffer.allocate(100*1024); 175 for (int i=0; i<1000; i++) { 176 int n = sc1.write(bb); 177 assertTrue(n > 0); 178 bb.clear(); 179 } 180 sc1.close(); 181 182 // wait for reader to finish 183 reader.join(); 184 } 185 }); 186 } 187 188 /** 189 * SocketChannel close while virtual thread blocked in read. 190 */ 191 @ParameterizedTest 192 @MethodSource("threadBuilders") 193 void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 194 VThreadRunner.run(builder, () -> { 195 try (var connection = new Connection()) { 196 SocketChannel sc = connection.channel1(); 197 runAfterParkedAsync(sc::close); 198 try { 199 int n = sc.read(ByteBuffer.allocate(100)); 200 fail("read returned " + n); 201 } catch (AsynchronousCloseException expected) { } 202 } 203 }); 204 } 205 206 /** 207 * SocketChannel shutdownInput while virtual thread blocked in read. 208 */ 209 @ParameterizedTest 210 @MethodSource("threadBuilders") 211 void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception { 212 VThreadRunner.run(builder, () -> { 213 try (var connection = new Connection()) { 214 SocketChannel sc = connection.channel1(); 215 runAfterParkedAsync(sc::shutdownInput); 216 int n = sc.read(ByteBuffer.allocate(100)); 217 assertEquals(-1, n); 218 assertTrue(sc.isOpen()); 219 } 220 }); 221 } 222 223 /** 224 * Virtual thread interrupted while blocked in SocketChannel read. 225 */ 226 @ParameterizedTest 227 @MethodSource("threadBuilders") 228 void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 229 VThreadRunner.run(builder, () -> { 230 try (var connection = new Connection()) { 231 SocketChannel sc = connection.channel1(); 232 233 // interrupt current thread when it blocks in read 234 Thread thisThread = Thread.currentThread(); 235 runAfterParkedAsync(thisThread::interrupt); 236 237 try { 238 int n = sc.read(ByteBuffer.allocate(100)); 239 fail("read returned " + n); 240 } catch (ClosedByInterruptException expected) { 241 assertTrue(Thread.interrupted()); 242 } 243 } 244 }); 245 } 246 247 /** 248 * SocketChannel close while virtual thread blocked in write. 249 */ 250 @ParameterizedTest 251 @MethodSource("threadBuilders") 252 void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 253 VThreadRunner.run(builder, () -> { 254 boolean retry = true; 255 while (retry) { 256 try (var connection = new Connection()) { 257 SocketChannel sc = connection.channel1(); 258 259 // close sc when current thread blocks in write 260 runAfterParkedAsync(sc::close); 261 try { 262 ByteBuffer bb = ByteBuffer.allocate(100*1024); 263 for (;;) { 264 int n = sc.write(bb); 265 assertTrue(n > 0); 266 bb.clear(); 267 } 268 } catch (AsynchronousCloseException expected) { 269 // closed when blocked in write 270 retry = false; 271 } catch (ClosedChannelException e) { 272 // closed when not blocked in write, need to retry test 273 } 274 } 275 } 276 }); 277 } 278 279 280 /** 281 * SocketChannel shutdownOutput while virtual thread blocked in write. 282 */ 283 @ParameterizedTest 284 @MethodSource("threadBuilders") 285 void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception { 286 VThreadRunner.run(builder, () -> { 287 try (var connection = new Connection()) { 288 SocketChannel sc = connection.channel1(); 289 290 // shutdown output when current thread blocks in write 291 runAfterParkedAsync(sc::shutdownOutput); 292 try { 293 ByteBuffer bb = ByteBuffer.allocate(100*1024); 294 for (;;) { 295 int n = sc.write(bb); 296 assertTrue(n > 0); 297 bb.clear(); 298 } 299 } catch (ClosedChannelException e) { 300 // expected 301 } 302 assertTrue(sc.isOpen()); 303 } 304 }); 305 } 306 307 /** 308 * Virtual thread interrupted while blocked in SocketChannel write. 309 */ 310 @ParameterizedTest 311 @MethodSource("threadBuilders") 312 void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 313 VThreadRunner.run(builder, () -> { 314 boolean retry = true; 315 while (retry) { 316 try (var connection = new Connection()) { 317 SocketChannel sc = connection.channel1(); 318 319 // interrupt current thread when it blocks in write 320 Thread thisThread = Thread.currentThread(); 321 runAfterParkedAsync(thisThread::interrupt); 322 323 try { 324 ByteBuffer bb = ByteBuffer.allocate(100*1024); 325 for (;;) { 326 int n = sc.write(bb); 327 assertTrue(n > 0); 328 bb.clear(); 329 } 330 } catch (ClosedByInterruptException e) { 331 // closed when blocked in write 332 assertTrue(Thread.interrupted()); 333 retry = false; 334 } catch (ClosedChannelException e) { 335 // closed when not blocked in write, need to retry test 336 } 337 } 338 } 339 }); 340 } 341 342 /** 343 * Virtual thread blocks in SocketChannel adaptor read. 344 */ 345 @ParameterizedTest 346 @MethodSource("threadBuilders") 347 void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception { 348 testSocketAdaptorRead(builder, 0); 349 } 350 351 /** 352 * Virtual thread blocks in SocketChannel adaptor read with timeout. 353 */ 354 @ParameterizedTest 355 @MethodSource("threadBuilders") 356 void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception { 357 testSocketAdaptorRead(builder, 60_000); 358 } 359 360 private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder, 361 int timeout) throws Exception { 362 VThreadRunner.run(builder, () -> { 363 try (var connection = new Connection()) { 364 SocketChannel sc1 = connection.channel1(); 365 SocketChannel sc2 = connection.channel2(); 366 367 // write to sc1 when currnet thread blocks reading from sc2 368 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 369 runAfterParkedAsync(() -> sc1.write(bb)); 370 371 // read from sc2 should block 372 byte[] array = new byte[100]; 373 if (timeout > 0) 374 sc2.socket().setSoTimeout(timeout); 375 int n = sc2.socket().getInputStream().read(array); 376 assertTrue(n > 0); 377 assertTrue(array[0] == 'X'); 378 } 379 }); 380 } 381 382 /** 383 * ServerSocketChannel accept, no blocking. 384 */ 385 @ParameterizedTest 386 @MethodSource("threadBuilders") 387 void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception { 388 VThreadRunner.run(builder, () -> { 389 try (var ssc = ServerSocketChannel.open()) { 390 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 391 var sc1 = SocketChannel.open(ssc.getLocalAddress()); 392 // accept should not block 393 var sc2 = ssc.accept(); 394 sc1.close(); 395 sc2.close(); 396 } 397 }); 398 } 399 400 /** 401 * Virtual thread blocks in ServerSocketChannel accept. 402 */ 403 @ParameterizedTest 404 @MethodSource("threadBuilders") 405 void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception { 406 VThreadRunner.run(builder, () -> { 407 try (var ssc = ServerSocketChannel.open()) { 408 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 409 var sc1 = SocketChannel.open(); 410 411 // connect when current thread when it blocks in accept 412 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress())); 413 414 // accept should block 415 var sc2 = ssc.accept(); 416 sc1.close(); 417 sc2.close(); 418 } 419 }); 420 } 421 422 /** 423 * SeverSocketChannel close while virtual thread blocked in accept. 424 */ 425 @ParameterizedTest 426 @MethodSource("threadBuilders") 427 void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 428 VThreadRunner.run(builder, () -> { 429 try (var ssc = ServerSocketChannel.open()) { 430 InetAddress lh = InetAddress.getLoopbackAddress(); 431 ssc.bind(new InetSocketAddress(lh, 0)); 432 runAfterParkedAsync(ssc::close); 433 try { 434 SocketChannel sc = ssc.accept(); 435 sc.close(); 436 fail("connection accepted???"); 437 } catch (AsynchronousCloseException expected) { } 438 } 439 }); 440 } 441 442 /** 443 * Virtual thread interrupted while blocked in ServerSocketChannel accept. 444 */ 445 @ParameterizedTest 446 @MethodSource("threadBuilders") 447 void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 448 VThreadRunner.run(builder, () -> { 449 try (var ssc = ServerSocketChannel.open()) { 450 InetAddress lh = InetAddress.getLoopbackAddress(); 451 ssc.bind(new InetSocketAddress(lh, 0)); 452 453 // interrupt current thread when it blocks in accept 454 Thread thisThread = Thread.currentThread(); 455 runAfterParkedAsync(thisThread::interrupt); 456 457 try { 458 SocketChannel sc = ssc.accept(); 459 sc.close(); 460 fail("connection accepted???"); 461 } catch (ClosedByInterruptException expected) { 462 assertTrue(Thread.interrupted()); 463 } 464 } 465 }); 466 } 467 468 /** 469 * Virtual thread blocks in ServerSocketChannel adaptor accept. 470 */ 471 @ParameterizedTest 472 @MethodSource("threadBuilders") 473 void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception { 474 testSocketChannelAdaptorAccept(builder, 0); 475 } 476 477 /** 478 * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout. 479 */ 480 @ParameterizedTest 481 @MethodSource("threadBuilders") 482 void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception { 483 testSocketChannelAdaptorAccept(builder, 60_000); 484 } 485 486 private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder, 487 int timeout) throws Exception { 488 VThreadRunner.run(builder, () -> { 489 try (var ssc = ServerSocketChannel.open()) { 490 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 491 var sc = SocketChannel.open(); 492 493 // interrupt current thread when it blocks in accept 494 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress())); 495 496 // accept should block 497 if (timeout > 0) 498 ssc.socket().setSoTimeout(timeout); 499 Socket s = ssc.socket().accept(); 500 sc.close(); 501 s.close(); 502 } 503 }); 504 } 505 506 /** 507 * DatagramChannel receive/send, no blocking. 508 */ 509 @ParameterizedTest 510 @MethodSource("threadBuilders") 511 void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception { 512 VThreadRunner.run(builder, () -> { 513 try (DatagramChannel dc1 = DatagramChannel.open(); 514 DatagramChannel dc2 = DatagramChannel.open()) { 515 516 InetAddress lh = InetAddress.getLoopbackAddress(); 517 dc2.bind(new InetSocketAddress(lh, 0)); 518 519 // send should not block 520 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 521 int n = dc1.send(bb, dc2.getLocalAddress()); 522 assertTrue(n > 0); 523 524 // receive should not block 525 bb = ByteBuffer.allocate(10); 526 dc2.receive(bb); 527 assertTrue(bb.get(0) == 'X'); 528 } 529 }); 530 } 531 532 /** 533 * Virtual thread blocks in DatagramChannel receive. 534 */ 535 @ParameterizedTest 536 @MethodSource("threadBuilders") 537 void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception { 538 VThreadRunner.run(builder, () -> { 539 try (DatagramChannel dc1 = DatagramChannel.open(); 540 DatagramChannel dc2 = DatagramChannel.open()) { 541 542 InetAddress lh = InetAddress.getLoopbackAddress(); 543 dc2.bind(new InetSocketAddress(lh, 0)); 544 545 // send from dc1 when current thread blocked in dc2.receive 546 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 547 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress())); 548 549 // read from dc2 should block 550 ByteBuffer bb2 = ByteBuffer.allocate(10); 551 dc2.receive(bb2); 552 assertTrue(bb2.get(0) == 'X'); 553 } 554 }); 555 } 556 557 /** 558 * DatagramChannel close while virtual thread blocked in receive. 559 */ 560 @ParameterizedTest 561 @MethodSource("threadBuilders") 562 void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 563 VThreadRunner.run(builder, () -> { 564 try (DatagramChannel dc = DatagramChannel.open()) { 565 InetAddress lh = InetAddress.getLoopbackAddress(); 566 dc.bind(new InetSocketAddress(lh, 0)); 567 runAfterParkedAsync(dc::close); 568 try { 569 dc.receive(ByteBuffer.allocate(100)); 570 fail("receive returned"); 571 } catch (AsynchronousCloseException expected) { } 572 } 573 }); 574 } 575 576 /** 577 * Virtual thread interrupted while blocked in DatagramChannel receive. 578 */ 579 @ParameterizedTest 580 @MethodSource("threadBuilders") 581 void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 582 VThreadRunner.run(builder, () -> { 583 try (DatagramChannel dc = DatagramChannel.open()) { 584 InetAddress lh = InetAddress.getLoopbackAddress(); 585 dc.bind(new InetSocketAddress(lh, 0)); 586 587 // interrupt current thread when it blocks in receive 588 Thread thisThread = Thread.currentThread(); 589 runAfterParkedAsync(thisThread::interrupt); 590 591 try { 592 dc.receive(ByteBuffer.allocate(100)); 593 fail("receive returned"); 594 } catch (ClosedByInterruptException expected) { 595 assertTrue(Thread.interrupted()); 596 } 597 } 598 }); 599 } 600 601 /** 602 * Virtual thread blocks in DatagramSocket adaptor receive. 603 */ 604 @ParameterizedTest 605 @MethodSource("threadBuilders") 606 void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception { 607 testDatagramSocketAdaptorReceive(builder, 0); 608 } 609 610 /** 611 * Virtual thread blocks in DatagramSocket adaptor receive with timeout. 612 */ 613 @ParameterizedTest 614 @MethodSource("threadBuilders") 615 void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception { 616 testDatagramSocketAdaptorReceive(builder, 60_000); 617 } 618 619 private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder, 620 int timeout) throws Exception { 621 VThreadRunner.run(builder, () -> { 622 try (DatagramChannel dc1 = DatagramChannel.open(); 623 DatagramChannel dc2 = DatagramChannel.open()) { 624 625 InetAddress lh = InetAddress.getLoopbackAddress(); 626 dc2.bind(new InetSocketAddress(lh, 0)); 627 628 // send from dc1 when current thread blocks in dc2 receive 629 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 630 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress())); 631 632 // receive should block 633 byte[] array = new byte[100]; 634 DatagramPacket p = new DatagramPacket(array, 0, array.length); 635 if (timeout > 0) 636 dc2.socket().setSoTimeout(timeout); 637 dc2.socket().receive(p); 638 assertTrue(p.getLength() == 3 && array[0] == 'X'); 639 } 640 }); 641 } 642 643 /** 644 * DatagramChannel close while virtual thread blocked in adaptor receive. 645 */ 646 @ParameterizedTest 647 @MethodSource("threadBuilders") 648 void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception { 649 testDatagramSocketAdaptorReceiveAsyncClose(builder, 0); 650 } 651 652 /** 653 * DatagramChannel close while virtual thread blocked in adaptor receive 654 * with timeout. 655 */ 656 @ParameterizedTest 657 @MethodSource("threadBuilders") 658 void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception { 659 testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000); 660 } 661 662 private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder, 663 int timeout) throws Exception { 664 VThreadRunner.run(builder, () -> { 665 try (DatagramChannel dc = DatagramChannel.open()) { 666 InetAddress lh = InetAddress.getLoopbackAddress(); 667 dc.bind(new InetSocketAddress(lh, 0)); 668 669 byte[] array = new byte[100]; 670 DatagramPacket p = new DatagramPacket(array, 0, array.length); 671 if (timeout > 0) 672 dc.socket().setSoTimeout(timeout); 673 674 // close channel/socket when current thread blocks in receive 675 runAfterParkedAsync(dc::close); 676 677 assertThrows(SocketException.class, () -> dc.socket().receive(p)); 678 } 679 }); 680 } 681 682 /** 683 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive. 684 */ 685 @ParameterizedTest 686 @MethodSource("threadBuilders") 687 void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception { 688 testDatagramSocketAdaptorReceiveInterrupt(builder, 0); 689 } 690 691 /** 692 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive 693 * with timeout. 694 */ 695 @ParameterizedTest 696 @MethodSource("threadBuilders") 697 void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception { 698 testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000); 699 } 700 701 private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder, 702 int timeout) throws Exception { 703 VThreadRunner.run(builder, () -> { 704 try (DatagramChannel dc = DatagramChannel.open()) { 705 InetAddress lh = InetAddress.getLoopbackAddress(); 706 dc.bind(new InetSocketAddress(lh, 0)); 707 708 byte[] array = new byte[100]; 709 DatagramPacket p = new DatagramPacket(array, 0, array.length); 710 if (timeout > 0) 711 dc.socket().setSoTimeout(timeout); 712 713 // interrupt current thread when it blocks in receive 714 Thread thisThread = Thread.currentThread(); 715 runAfterParkedAsync(thisThread::interrupt); 716 717 try { 718 dc.socket().receive(p); 719 fail(); 720 } catch (ClosedByInterruptException expected) { 721 assertTrue(Thread.interrupted()); 722 } 723 } 724 }); 725 } 726 727 /** 728 * Pipe read/write, no blocking. 729 */ 730 @ParameterizedTest 731 @MethodSource("threadBuilders") 732 void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception { 733 VThreadRunner.run(builder, () -> { 734 Pipe p = Pipe.open(); 735 try (Pipe.SinkChannel sink = p.sink(); 736 Pipe.SourceChannel source = p.source()) { 737 738 // write should not block 739 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 740 int n = sink.write(bb); 741 assertTrue(n > 0); 742 743 // read should not block 744 bb = ByteBuffer.allocate(10); 745 n = source.read(bb); 746 assertTrue(n > 0); 747 assertTrue(bb.get(0) == 'X'); 748 } 749 }); 750 } 751 752 /** 753 * Virtual thread blocks in Pipe.SourceChannel read. 754 */ 755 @ParameterizedTest 756 @MethodSource("threadBuilders") 757 void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception { 758 VThreadRunner.run(builder, () -> { 759 Pipe p = Pipe.open(); 760 try (Pipe.SinkChannel sink = p.sink(); 761 Pipe.SourceChannel source = p.source()) { 762 763 // write from sink when current thread blocks reading from source 764 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 765 runAfterParkedAsync(() -> sink.write(bb1)); 766 767 // read should block 768 ByteBuffer bb2 = ByteBuffer.allocate(10); 769 int n = source.read(bb2); 770 assertTrue(n > 0); 771 assertTrue(bb2.get(0) == 'X'); 772 } 773 }); 774 } 775 776 /** 777 * Virtual thread blocks in Pipe.SinkChannel write. 778 */ 779 @ParameterizedTest 780 @MethodSource("threadBuilders") 781 void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception { 782 VThreadRunner.run(builder, () -> { 783 Pipe p = Pipe.open(); 784 try (Pipe.SinkChannel sink = p.sink(); 785 Pipe.SourceChannel source = p.source()) { 786 787 // read from source to EOF when current thread blocking in write 788 Thread reader = runAfterParkedAsync(() -> readToEOF(source)); 789 790 // write to sink should block 791 ByteBuffer bb = ByteBuffer.allocate(100*1024); 792 for (int i=0; i<1000; i++) { 793 int n = sink.write(bb); 794 assertTrue(n > 0); 795 bb.clear(); 796 } 797 sink.close(); 798 799 // wait for reader to finish 800 reader.join(); 801 } 802 }); 803 } 804 805 /** 806 * Pipe.SourceChannel close while virtual thread blocked in read. 807 */ 808 @ParameterizedTest 809 @MethodSource("threadBuilders") 810 void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 811 VThreadRunner.run(builder, () -> { 812 Pipe p = Pipe.open(); 813 try (Pipe.SinkChannel sink = p.sink(); 814 Pipe.SourceChannel source = p.source()) { 815 runAfterParkedAsync(source::close); 816 try { 817 int n = source.read(ByteBuffer.allocate(100)); 818 fail("read returned " + n); 819 } catch (AsynchronousCloseException expected) { } 820 } 821 }); 822 } 823 824 /** 825 * Virtual thread interrupted while blocked in Pipe.SourceChannel read. 826 */ 827 @ParameterizedTest 828 @MethodSource("threadBuilders") 829 void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 830 VThreadRunner.run(builder, () -> { 831 Pipe p = Pipe.open(); 832 try (Pipe.SinkChannel sink = p.sink(); 833 Pipe.SourceChannel source = p.source()) { 834 835 // interrupt current thread when it blocks reading from source 836 Thread thisThread = Thread.currentThread(); 837 runAfterParkedAsync(thisThread::interrupt); 838 839 try { 840 int n = source.read(ByteBuffer.allocate(100)); 841 fail("read returned " + n); 842 } catch (ClosedByInterruptException expected) { 843 assertTrue(Thread.interrupted()); 844 } 845 } 846 }); 847 } 848 849 /** 850 * Pipe.SinkChannel close while virtual thread blocked in write. 851 */ 852 @ParameterizedTest 853 @MethodSource("threadBuilders") 854 void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 855 VThreadRunner.run(builder, () -> { 856 boolean retry = true; 857 while (retry) { 858 Pipe p = Pipe.open(); 859 try (Pipe.SinkChannel sink = p.sink(); 860 Pipe.SourceChannel source = p.source()) { 861 862 // close sink when current thread blocks in write 863 runAfterParkedAsync(sink::close); 864 try { 865 ByteBuffer bb = ByteBuffer.allocate(100*1024); 866 for (;;) { 867 int n = sink.write(bb); 868 assertTrue(n > 0); 869 bb.clear(); 870 } 871 } catch (AsynchronousCloseException e) { 872 // closed when blocked in write 873 retry = false; 874 } catch (ClosedChannelException e) { 875 // closed when not blocked in write, need to retry test 876 } 877 } 878 } 879 }); 880 } 881 882 /** 883 * Virtual thread interrupted while blocked in Pipe.SinkChannel write. 884 */ 885 @ParameterizedTest 886 @MethodSource("threadBuilders") 887 void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 888 VThreadRunner.run(builder, () -> { 889 boolean retry = true; 890 while (retry) { 891 Pipe p = Pipe.open(); 892 try (Pipe.SinkChannel sink = p.sink(); 893 Pipe.SourceChannel source = p.source()) { 894 895 // interrupt current thread when it blocks in write 896 Thread thisThread = Thread.currentThread(); 897 runAfterParkedAsync(thisThread::interrupt); 898 899 try { 900 ByteBuffer bb = ByteBuffer.allocate(100*1024); 901 for (;;) { 902 int n = sink.write(bb); 903 assertTrue(n > 0); 904 bb.clear(); 905 } 906 } catch (ClosedByInterruptException expected) { 907 // closed when blocked in write 908 assertTrue(Thread.interrupted()); 909 retry = false; 910 } catch (ClosedChannelException e) { 911 // closed when not blocked in write, need to retry test 912 } 913 } 914 } 915 }); 916 } 917 918 /** 919 * Creates a loopback connection 920 */ 921 static class Connection implements Closeable { 922 private final SocketChannel sc1; 923 private final SocketChannel sc2; 924 Connection() throws IOException { 925 var lh = InetAddress.getLoopbackAddress(); 926 try (var listener = ServerSocketChannel.open()) { 927 listener.bind(new InetSocketAddress(lh, 0)); 928 SocketChannel sc1 = SocketChannel.open(); 929 SocketChannel sc2 = null; 930 try { 931 sc1.socket().connect(listener.getLocalAddress()); 932 sc2 = listener.accept(); 933 } catch (IOException ioe) { 934 sc1.close(); 935 throw ioe; 936 } 937 this.sc1 = sc1; 938 this.sc2 = sc2; 939 } 940 } 941 SocketChannel channel1() { 942 return sc1; 943 } 944 SocketChannel channel2() { 945 return sc2; 946 } 947 @Override 948 public void close() throws IOException { 949 sc1.close(); 950 sc2.close(); 951 } 952 } 953 954 /** 955 * Read from a channel until all bytes have been read or an I/O error occurs. 956 */ 957 static void readToEOF(ReadableByteChannel rbc) throws IOException { 958 ByteBuffer bb = ByteBuffer.allocate(16*1024); 959 int n; 960 while ((n = rbc.read(bb)) > 0) { 961 bb.clear(); 962 } 963 } 964 965 @FunctionalInterface 966 interface ThrowingRunnable { 967 void run() throws Exception; 968 } 969 970 /** 971 * Runs the given task asynchronously after the current virtual thread has parked. 972 * @return the thread started to run the task 973 */ 974 static Thread runAfterParkedAsync(ThrowingRunnable task) { 975 Thread target = Thread.currentThread(); 976 if (!target.isVirtual()) 977 throw new WrongThreadException(); 978 return Thread.ofPlatform().daemon().start(() -> { 979 try { 980 Thread.State state = target.getState(); 981 while (state != Thread.State.WAITING 982 && state != Thread.State.TIMED_WAITING) { 983 Thread.sleep(20); 984 state = target.getState(); 985 } 986 Thread.sleep(20); // give a bit more time to release carrier 987 task.run(); 988 } catch (Exception e) { 989 e.printStackTrace(); 990 } 991 }); 992 } 993 }