1 /* 2 * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /** 25 * @test id=default 26 * @bug 8284161 27 * @summary Test virtual threads doing blocking I/O on NIO channels 28 * @library /test/lib 29 * @run junit BlockingChannelOps 30 */ 31 32 /** 33 * @test id=poller-modes 34 * @requires (os.family == "linux") | (os.family == "mac") 35 * @library /test/lib 36 * @run junit/othervm -Djdk.pollerMode=1 BlockingChannelOps 37 * @run junit/othervm -Djdk.pollerMode=2 BlockingChannelOps 38 * @run junit/othervm -Djdk.pollerMode=3 BlockingChannelOps 39 */ 40 41 /** 42 * @test id=no-vmcontinuations 43 * @requires vm.continuations 44 * @library /test/lib 45 * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps 46 */ 47 48 import java.io.Closeable; 49 import java.io.IOException; 50 import java.net.DatagramPacket; 51 import java.net.InetAddress; 52 import java.net.InetSocketAddress; 53 import java.net.Socket; 54 import java.net.SocketAddress; 55 import java.net.SocketException; 56 import java.nio.ByteBuffer; 57 import java.nio.channels.AsynchronousCloseException; 58 import java.nio.channels.ClosedByInterruptException; 59 import java.nio.channels.ClosedChannelException; 60 import java.nio.channels.DatagramChannel; 61 import java.nio.channels.Pipe; 62 import java.nio.channels.ReadableByteChannel; 63 import java.nio.channels.ServerSocketChannel; 64 import java.nio.channels.SocketChannel; 65 import java.nio.channels.WritableByteChannel; 66 import java.util.concurrent.ExecutorService; 67 import java.util.concurrent.Executors; 68 import java.util.stream.Stream; 69 70 import jdk.test.lib.thread.VThreadRunner; 71 import jdk.test.lib.thread.VThreadScheduler; 72 73 import org.junit.jupiter.api.BeforeAll; 74 import org.junit.jupiter.api.AfterAll; 75 import org.junit.jupiter.params.ParameterizedTest; 76 import org.junit.jupiter.params.provider.MethodSource; 77 import static org.junit.jupiter.api.Assertions.*; 78 79 class BlockingChannelOps { 80 private static ExecutorService threadPool; 81 private static Thread.VirtualThreadScheduler customScheduler; 82 83 @BeforeAll 84 static void setup() { 85 threadPool = Executors.newCachedThreadPool(); 86 customScheduler = (_, task) -> threadPool.execute(task); 87 } 88 89 @AfterAll 90 static void finish() { 91 threadPool.shutdown(); 92 } 93 94 /** 95 * Returns the Thread.Builder to create the virtual thread. 96 */ 97 static Stream<Thread.Builder.OfVirtual> threadBuilders() { 98 if (VThreadScheduler.supportsCustomScheduler()) { 99 return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler)); 100 } else { 101 return Stream.of(Thread.ofVirtual()); 102 } 103 } 104 105 /** 106 * SocketChannel read/write, no blocking. 107 */ 108 @ParameterizedTest 109 @MethodSource("threadBuilders") 110 void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception { 111 VThreadRunner.run(builder, () -> { 112 try (var connection = new Connection()) { 113 SocketChannel sc1 = connection.channel1(); 114 SocketChannel sc2 = connection.channel2(); 115 116 // write to sc1 117 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 118 int n = sc1.write(bb); 119 assertTrue(n > 0); 120 121 // read from sc2 should not block 122 bb = ByteBuffer.allocate(10); 123 n = sc2.read(bb); 124 assertTrue(n > 0); 125 assertTrue(bb.get(0) == 'X'); 126 } 127 }); 128 } 129 130 /** 131 * Virtual thread blocks in SocketChannel read. 132 */ 133 @ParameterizedTest 134 @MethodSource("threadBuilders") 135 void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception { 136 VThreadRunner.run(builder, () -> { 137 try (var connection = new Connection()) { 138 SocketChannel sc1 = connection.channel1(); 139 SocketChannel sc2 = connection.channel2(); 140 141 // write to sc1 when current thread blocks in sc2.read 142 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 143 runAfterParkedAsync(() -> sc1.write(bb1)); 144 145 // read from sc2 should block 146 ByteBuffer bb2 = ByteBuffer.allocate(10); 147 int n = sc2.read(bb2); 148 assertTrue(n > 0); 149 assertTrue(bb2.get(0) == 'X'); 150 } 151 }); 152 } 153 154 /** 155 * Virtual thread blocks in SocketChannel write. 156 */ 157 @ParameterizedTest 158 @MethodSource("threadBuilders") 159 void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception { 160 VThreadRunner.run(builder, () -> { 161 try (var connection = new Connection()) { 162 SocketChannel sc1 = connection.channel1(); 163 SocketChannel sc2 = connection.channel2(); 164 165 // read from sc2 to EOF when current thread blocks in sc1.write 166 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2)); 167 168 // write to sc1 should block 169 ByteBuffer bb = ByteBuffer.allocate(100*1024); 170 for (int i=0; i<1000; i++) { 171 int n = sc1.write(bb); 172 assertTrue(n > 0); 173 bb.clear(); 174 } 175 sc1.close(); 176 177 // wait for reader to finish 178 reader.join(); 179 } 180 }); 181 } 182 183 /** 184 * SocketChannel close while virtual thread blocked in read. 185 */ 186 @ParameterizedTest 187 @MethodSource("threadBuilders") 188 void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 189 VThreadRunner.run(builder, () -> { 190 try (var connection = new Connection()) { 191 SocketChannel sc = connection.channel1(); 192 runAfterParkedAsync(sc::close); 193 try { 194 int n = sc.read(ByteBuffer.allocate(100)); 195 fail("read returned " + n); 196 } catch (AsynchronousCloseException expected) { } 197 } 198 }); 199 } 200 201 /** 202 * SocketChannel shutdownInput while virtual thread blocked in read. 203 */ 204 @ParameterizedTest 205 @MethodSource("threadBuilders") 206 void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception { 207 VThreadRunner.run(builder, () -> { 208 try (var connection = new Connection()) { 209 SocketChannel sc = connection.channel1(); 210 runAfterParkedAsync(sc::shutdownInput); 211 int n = sc.read(ByteBuffer.allocate(100)); 212 assertEquals(-1, n); 213 assertTrue(sc.isOpen()); 214 } 215 }); 216 } 217 218 /** 219 * Virtual thread interrupted while blocked in SocketChannel read. 220 */ 221 @ParameterizedTest 222 @MethodSource("threadBuilders") 223 void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 224 VThreadRunner.run(builder, () -> { 225 try (var connection = new Connection()) { 226 SocketChannel sc = connection.channel1(); 227 228 // interrupt current thread when it blocks in read 229 Thread thisThread = Thread.currentThread(); 230 runAfterParkedAsync(thisThread::interrupt); 231 232 try { 233 int n = sc.read(ByteBuffer.allocate(100)); 234 fail("read returned " + n); 235 } catch (ClosedByInterruptException expected) { 236 assertTrue(Thread.interrupted()); 237 } 238 } 239 }); 240 } 241 242 /** 243 * SocketChannel close while virtual thread blocked in write. 244 */ 245 @ParameterizedTest 246 @MethodSource("threadBuilders") 247 void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 248 VThreadRunner.run(builder, () -> { 249 boolean retry = true; 250 while (retry) { 251 try (var connection = new Connection()) { 252 SocketChannel sc = connection.channel1(); 253 254 // close sc when current thread blocks in write 255 runAfterParkedAsync(sc::close); 256 try { 257 ByteBuffer bb = ByteBuffer.allocate(100*1024); 258 for (;;) { 259 int n = sc.write(bb); 260 assertTrue(n > 0); 261 bb.clear(); 262 } 263 } catch (AsynchronousCloseException expected) { 264 // closed when blocked in write 265 retry = false; 266 } catch (ClosedChannelException e) { 267 // closed when not blocked in write, need to retry test 268 } 269 } 270 } 271 }); 272 } 273 274 275 /** 276 * SocketChannel shutdownOutput while virtual thread blocked in write. 277 */ 278 @ParameterizedTest 279 @MethodSource("threadBuilders") 280 void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception { 281 VThreadRunner.run(builder, () -> { 282 try (var connection = new Connection()) { 283 SocketChannel sc = connection.channel1(); 284 285 // shutdown output when current thread blocks in write 286 runAfterParkedAsync(sc::shutdownOutput); 287 try { 288 ByteBuffer bb = ByteBuffer.allocate(100*1024); 289 for (;;) { 290 int n = sc.write(bb); 291 assertTrue(n > 0); 292 bb.clear(); 293 } 294 } catch (ClosedChannelException e) { 295 // expected 296 } 297 assertTrue(sc.isOpen()); 298 } 299 }); 300 } 301 302 /** 303 * Virtual thread interrupted while blocked in SocketChannel write. 304 */ 305 @ParameterizedTest 306 @MethodSource("threadBuilders") 307 void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 308 VThreadRunner.run(builder, () -> { 309 boolean retry = true; 310 while (retry) { 311 try (var connection = new Connection()) { 312 SocketChannel sc = connection.channel1(); 313 314 // interrupt current thread when it blocks in write 315 Thread thisThread = Thread.currentThread(); 316 runAfterParkedAsync(thisThread::interrupt); 317 318 try { 319 ByteBuffer bb = ByteBuffer.allocate(100*1024); 320 for (;;) { 321 int n = sc.write(bb); 322 assertTrue(n > 0); 323 bb.clear(); 324 } 325 } catch (ClosedByInterruptException e) { 326 // closed when blocked in write 327 assertTrue(Thread.interrupted()); 328 retry = false; 329 } catch (ClosedChannelException e) { 330 // closed when not blocked in write, need to retry test 331 } 332 } 333 } 334 }); 335 } 336 337 /** 338 * Virtual thread blocks in SocketChannel adaptor read. 339 */ 340 @ParameterizedTest 341 @MethodSource("threadBuilders") 342 void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception { 343 testSocketAdaptorRead(builder, 0); 344 } 345 346 /** 347 * Virtual thread blocks in SocketChannel adaptor read with timeout. 348 */ 349 @ParameterizedTest 350 @MethodSource("threadBuilders") 351 void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception { 352 testSocketAdaptorRead(builder, 60_000); 353 } 354 355 private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder, 356 int timeout) throws Exception { 357 VThreadRunner.run(builder, () -> { 358 try (var connection = new Connection()) { 359 SocketChannel sc1 = connection.channel1(); 360 SocketChannel sc2 = connection.channel2(); 361 362 // write to sc1 when currnet thread blocks reading from sc2 363 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 364 runAfterParkedAsync(() -> sc1.write(bb)); 365 366 // read from sc2 should block 367 byte[] array = new byte[100]; 368 if (timeout > 0) 369 sc2.socket().setSoTimeout(timeout); 370 int n = sc2.socket().getInputStream().read(array); 371 assertTrue(n > 0); 372 assertTrue(array[0] == 'X'); 373 } 374 }); 375 } 376 377 /** 378 * ServerSocketChannel accept, no blocking. 379 */ 380 @ParameterizedTest 381 @MethodSource("threadBuilders") 382 void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception { 383 VThreadRunner.run(builder, () -> { 384 try (var ssc = ServerSocketChannel.open()) { 385 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 386 var sc1 = SocketChannel.open(ssc.getLocalAddress()); 387 // accept should not block 388 var sc2 = ssc.accept(); 389 sc1.close(); 390 sc2.close(); 391 } 392 }); 393 } 394 395 /** 396 * Virtual thread blocks in ServerSocketChannel accept. 397 */ 398 @ParameterizedTest 399 @MethodSource("threadBuilders") 400 void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception { 401 VThreadRunner.run(builder, () -> { 402 try (var ssc = ServerSocketChannel.open()) { 403 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 404 var sc1 = SocketChannel.open(); 405 406 // connect when current thread when it blocks in accept 407 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress())); 408 409 // accept should block 410 var sc2 = ssc.accept(); 411 sc1.close(); 412 sc2.close(); 413 } 414 }); 415 } 416 417 /** 418 * SeverSocketChannel close while virtual thread blocked in accept. 419 */ 420 @ParameterizedTest 421 @MethodSource("threadBuilders") 422 void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 423 VThreadRunner.run(builder, () -> { 424 try (var ssc = ServerSocketChannel.open()) { 425 InetAddress lh = InetAddress.getLoopbackAddress(); 426 ssc.bind(new InetSocketAddress(lh, 0)); 427 runAfterParkedAsync(ssc::close); 428 try { 429 SocketChannel sc = ssc.accept(); 430 sc.close(); 431 fail("connection accepted???"); 432 } catch (AsynchronousCloseException expected) { } 433 } 434 }); 435 } 436 437 /** 438 * Virtual thread interrupted while blocked in ServerSocketChannel accept. 439 */ 440 @ParameterizedTest 441 @MethodSource("threadBuilders") 442 void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 443 VThreadRunner.run(builder, () -> { 444 try (var ssc = ServerSocketChannel.open()) { 445 InetAddress lh = InetAddress.getLoopbackAddress(); 446 ssc.bind(new InetSocketAddress(lh, 0)); 447 448 // interrupt current thread when it blocks in accept 449 Thread thisThread = Thread.currentThread(); 450 runAfterParkedAsync(thisThread::interrupt); 451 452 try { 453 SocketChannel sc = ssc.accept(); 454 sc.close(); 455 fail("connection accepted???"); 456 } catch (ClosedByInterruptException expected) { 457 assertTrue(Thread.interrupted()); 458 } 459 } 460 }); 461 } 462 463 /** 464 * Virtual thread blocks in ServerSocketChannel adaptor accept. 465 */ 466 @ParameterizedTest 467 @MethodSource("threadBuilders") 468 void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception { 469 testSocketChannelAdaptorAccept(builder, 0); 470 } 471 472 /** 473 * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout. 474 */ 475 @ParameterizedTest 476 @MethodSource("threadBuilders") 477 void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception { 478 testSocketChannelAdaptorAccept(builder, 60_000); 479 } 480 481 private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder, 482 int timeout) throws Exception { 483 VThreadRunner.run(builder, () -> { 484 try (var ssc = ServerSocketChannel.open()) { 485 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 486 var sc = SocketChannel.open(); 487 488 // interrupt current thread when it blocks in accept 489 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress())); 490 491 // accept should block 492 if (timeout > 0) 493 ssc.socket().setSoTimeout(timeout); 494 Socket s = ssc.socket().accept(); 495 sc.close(); 496 s.close(); 497 } 498 }); 499 } 500 501 /** 502 * DatagramChannel receive/send, no blocking. 503 */ 504 @ParameterizedTest 505 @MethodSource("threadBuilders") 506 void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception { 507 VThreadRunner.run(builder, () -> { 508 try (DatagramChannel dc1 = DatagramChannel.open(); 509 DatagramChannel dc2 = DatagramChannel.open()) { 510 511 InetAddress lh = InetAddress.getLoopbackAddress(); 512 dc2.bind(new InetSocketAddress(lh, 0)); 513 514 // send should not block 515 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 516 int n = dc1.send(bb, dc2.getLocalAddress()); 517 assertTrue(n > 0); 518 519 // receive should not block 520 bb = ByteBuffer.allocate(10); 521 dc2.receive(bb); 522 assertTrue(bb.get(0) == 'X'); 523 } 524 }); 525 } 526 527 /** 528 * Virtual thread blocks in DatagramChannel receive. 529 */ 530 @ParameterizedTest 531 @MethodSource("threadBuilders") 532 void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception { 533 VThreadRunner.run(builder, () -> { 534 try (DatagramChannel dc1 = DatagramChannel.open(); 535 DatagramChannel dc2 = DatagramChannel.open()) { 536 537 InetAddress lh = InetAddress.getLoopbackAddress(); 538 dc2.bind(new InetSocketAddress(lh, 0)); 539 540 // send from dc1 when current thread blocked in dc2.receive 541 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 542 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress())); 543 544 // read from dc2 should block 545 ByteBuffer bb2 = ByteBuffer.allocate(10); 546 dc2.receive(bb2); 547 assertTrue(bb2.get(0) == 'X'); 548 } 549 }); 550 } 551 552 /** 553 * DatagramChannel close while virtual thread blocked in receive. 554 */ 555 @ParameterizedTest 556 @MethodSource("threadBuilders") 557 void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 558 VThreadRunner.run(builder, () -> { 559 try (DatagramChannel dc = DatagramChannel.open()) { 560 InetAddress lh = InetAddress.getLoopbackAddress(); 561 dc.bind(new InetSocketAddress(lh, 0)); 562 runAfterParkedAsync(dc::close); 563 try { 564 dc.receive(ByteBuffer.allocate(100)); 565 fail("receive returned"); 566 } catch (AsynchronousCloseException expected) { } 567 } 568 }); 569 } 570 571 /** 572 * Virtual thread interrupted while blocked in DatagramChannel receive. 573 */ 574 @ParameterizedTest 575 @MethodSource("threadBuilders") 576 void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 577 VThreadRunner.run(builder, () -> { 578 try (DatagramChannel dc = DatagramChannel.open()) { 579 InetAddress lh = InetAddress.getLoopbackAddress(); 580 dc.bind(new InetSocketAddress(lh, 0)); 581 582 // interrupt current thread when it blocks in receive 583 Thread thisThread = Thread.currentThread(); 584 runAfterParkedAsync(thisThread::interrupt); 585 586 try { 587 dc.receive(ByteBuffer.allocate(100)); 588 fail("receive returned"); 589 } catch (ClosedByInterruptException expected) { 590 assertTrue(Thread.interrupted()); 591 } 592 } 593 }); 594 } 595 596 /** 597 * Virtual thread blocks in DatagramSocket adaptor receive. 598 */ 599 @ParameterizedTest 600 @MethodSource("threadBuilders") 601 void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception { 602 testDatagramSocketAdaptorReceive(builder, 0); 603 } 604 605 /** 606 * Virtual thread blocks in DatagramSocket adaptor receive with timeout. 607 */ 608 @ParameterizedTest 609 @MethodSource("threadBuilders") 610 void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception { 611 testDatagramSocketAdaptorReceive(builder, 60_000); 612 } 613 614 private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder, 615 int timeout) throws Exception { 616 VThreadRunner.run(builder, () -> { 617 try (DatagramChannel dc1 = DatagramChannel.open(); 618 DatagramChannel dc2 = DatagramChannel.open()) { 619 620 InetAddress lh = InetAddress.getLoopbackAddress(); 621 dc2.bind(new InetSocketAddress(lh, 0)); 622 623 // send from dc1 when current thread blocks in dc2 receive 624 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 625 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress())); 626 627 // receive should block 628 byte[] array = new byte[100]; 629 DatagramPacket p = new DatagramPacket(array, 0, array.length); 630 if (timeout > 0) 631 dc2.socket().setSoTimeout(timeout); 632 dc2.socket().receive(p); 633 assertTrue(p.getLength() == 3 && array[0] == 'X'); 634 } 635 }); 636 } 637 638 /** 639 * DatagramChannel close while virtual thread blocked in adaptor receive. 640 */ 641 @ParameterizedTest 642 @MethodSource("threadBuilders") 643 void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception { 644 testDatagramSocketAdaptorReceiveAsyncClose(builder, 0); 645 } 646 647 /** 648 * DatagramChannel close while virtual thread blocked in adaptor receive 649 * with timeout. 650 */ 651 @ParameterizedTest 652 @MethodSource("threadBuilders") 653 void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception { 654 testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000); 655 } 656 657 private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder, 658 int timeout) throws Exception { 659 VThreadRunner.run(builder, () -> { 660 try (DatagramChannel dc = DatagramChannel.open()) { 661 InetAddress lh = InetAddress.getLoopbackAddress(); 662 dc.bind(new InetSocketAddress(lh, 0)); 663 664 byte[] array = new byte[100]; 665 DatagramPacket p = new DatagramPacket(array, 0, array.length); 666 if (timeout > 0) 667 dc.socket().setSoTimeout(timeout); 668 669 // close channel/socket when current thread blocks in receive 670 runAfterParkedAsync(dc::close); 671 672 assertThrows(SocketException.class, () -> dc.socket().receive(p)); 673 } 674 }); 675 } 676 677 /** 678 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive. 679 */ 680 @ParameterizedTest 681 @MethodSource("threadBuilders") 682 void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception { 683 testDatagramSocketAdaptorReceiveInterrupt(builder, 0); 684 } 685 686 /** 687 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive 688 * with timeout. 689 */ 690 @ParameterizedTest 691 @MethodSource("threadBuilders") 692 void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception { 693 testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000); 694 } 695 696 private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder, 697 int timeout) throws Exception { 698 VThreadRunner.run(builder, () -> { 699 try (DatagramChannel dc = DatagramChannel.open()) { 700 InetAddress lh = InetAddress.getLoopbackAddress(); 701 dc.bind(new InetSocketAddress(lh, 0)); 702 703 byte[] array = new byte[100]; 704 DatagramPacket p = new DatagramPacket(array, 0, array.length); 705 if (timeout > 0) 706 dc.socket().setSoTimeout(timeout); 707 708 // interrupt current thread when it blocks in receive 709 Thread thisThread = Thread.currentThread(); 710 runAfterParkedAsync(thisThread::interrupt); 711 712 try { 713 dc.socket().receive(p); 714 fail(); 715 } catch (ClosedByInterruptException expected) { 716 assertTrue(Thread.interrupted()); 717 } 718 } 719 }); 720 } 721 722 /** 723 * Pipe read/write, no blocking. 724 */ 725 @ParameterizedTest 726 @MethodSource("threadBuilders") 727 void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception { 728 VThreadRunner.run(builder, () -> { 729 Pipe p = Pipe.open(); 730 try (Pipe.SinkChannel sink = p.sink(); 731 Pipe.SourceChannel source = p.source()) { 732 733 // write should not block 734 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 735 int n = sink.write(bb); 736 assertTrue(n > 0); 737 738 // read should not block 739 bb = ByteBuffer.allocate(10); 740 n = source.read(bb); 741 assertTrue(n > 0); 742 assertTrue(bb.get(0) == 'X'); 743 } 744 }); 745 } 746 747 /** 748 * Virtual thread blocks in Pipe.SourceChannel read. 749 */ 750 @ParameterizedTest 751 @MethodSource("threadBuilders") 752 void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception { 753 VThreadRunner.run(builder, () -> { 754 Pipe p = Pipe.open(); 755 try (Pipe.SinkChannel sink = p.sink(); 756 Pipe.SourceChannel source = p.source()) { 757 758 // write from sink when current thread blocks reading from source 759 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 760 runAfterParkedAsync(() -> sink.write(bb1)); 761 762 // read should block 763 ByteBuffer bb2 = ByteBuffer.allocate(10); 764 int n = source.read(bb2); 765 assertTrue(n > 0); 766 assertTrue(bb2.get(0) == 'X'); 767 } 768 }); 769 } 770 771 /** 772 * Virtual thread blocks in Pipe.SinkChannel write. 773 */ 774 @ParameterizedTest 775 @MethodSource("threadBuilders") 776 void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception { 777 VThreadRunner.run(builder, () -> { 778 Pipe p = Pipe.open(); 779 try (Pipe.SinkChannel sink = p.sink(); 780 Pipe.SourceChannel source = p.source()) { 781 782 // read from source to EOF when current thread blocking in write 783 Thread reader = runAfterParkedAsync(() -> readToEOF(source)); 784 785 // write to sink should block 786 ByteBuffer bb = ByteBuffer.allocate(100*1024); 787 for (int i=0; i<1000; i++) { 788 int n = sink.write(bb); 789 assertTrue(n > 0); 790 bb.clear(); 791 } 792 sink.close(); 793 794 // wait for reader to finish 795 reader.join(); 796 } 797 }); 798 } 799 800 /** 801 * Pipe.SourceChannel close while virtual thread blocked in read. 802 */ 803 @ParameterizedTest 804 @MethodSource("threadBuilders") 805 void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 806 VThreadRunner.run(builder, () -> { 807 Pipe p = Pipe.open(); 808 try (Pipe.SinkChannel sink = p.sink(); 809 Pipe.SourceChannel source = p.source()) { 810 runAfterParkedAsync(source::close); 811 try { 812 int n = source.read(ByteBuffer.allocate(100)); 813 fail("read returned " + n); 814 } catch (AsynchronousCloseException expected) { } 815 } 816 }); 817 } 818 819 /** 820 * Virtual thread interrupted while blocked in Pipe.SourceChannel read. 821 */ 822 @ParameterizedTest 823 @MethodSource("threadBuilders") 824 void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 825 VThreadRunner.run(builder, () -> { 826 Pipe p = Pipe.open(); 827 try (Pipe.SinkChannel sink = p.sink(); 828 Pipe.SourceChannel source = p.source()) { 829 830 // interrupt current thread when it blocks reading from source 831 Thread thisThread = Thread.currentThread(); 832 runAfterParkedAsync(thisThread::interrupt); 833 834 try { 835 int n = source.read(ByteBuffer.allocate(100)); 836 fail("read returned " + n); 837 } catch (ClosedByInterruptException expected) { 838 assertTrue(Thread.interrupted()); 839 } 840 } 841 }); 842 } 843 844 /** 845 * Pipe.SinkChannel close while virtual thread blocked in write. 846 */ 847 @ParameterizedTest 848 @MethodSource("threadBuilders") 849 void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 850 VThreadRunner.run(builder, () -> { 851 boolean retry = true; 852 while (retry) { 853 Pipe p = Pipe.open(); 854 try (Pipe.SinkChannel sink = p.sink(); 855 Pipe.SourceChannel source = p.source()) { 856 857 // close sink when current thread blocks in write 858 runAfterParkedAsync(sink::close); 859 try { 860 ByteBuffer bb = ByteBuffer.allocate(100*1024); 861 for (;;) { 862 int n = sink.write(bb); 863 assertTrue(n > 0); 864 bb.clear(); 865 } 866 } catch (AsynchronousCloseException e) { 867 // closed when blocked in write 868 retry = false; 869 } catch (ClosedChannelException e) { 870 // closed when not blocked in write, need to retry test 871 } 872 } 873 } 874 }); 875 } 876 877 /** 878 * Virtual thread interrupted while blocked in Pipe.SinkChannel write. 879 */ 880 @ParameterizedTest 881 @MethodSource("threadBuilders") 882 void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 883 VThreadRunner.run(builder, () -> { 884 boolean retry = true; 885 while (retry) { 886 Pipe p = Pipe.open(); 887 try (Pipe.SinkChannel sink = p.sink(); 888 Pipe.SourceChannel source = p.source()) { 889 890 // interrupt current thread when it blocks in write 891 Thread thisThread = Thread.currentThread(); 892 runAfterParkedAsync(thisThread::interrupt); 893 894 try { 895 ByteBuffer bb = ByteBuffer.allocate(100*1024); 896 for (;;) { 897 int n = sink.write(bb); 898 assertTrue(n > 0); 899 bb.clear(); 900 } 901 } catch (ClosedByInterruptException expected) { 902 // closed when blocked in write 903 assertTrue(Thread.interrupted()); 904 retry = false; 905 } catch (ClosedChannelException e) { 906 // closed when not blocked in write, need to retry test 907 } 908 } 909 } 910 }); 911 } 912 913 /** 914 * Creates a loopback connection 915 */ 916 static class Connection implements Closeable { 917 private final SocketChannel sc1; 918 private final SocketChannel sc2; 919 Connection() throws IOException { 920 var lh = InetAddress.getLoopbackAddress(); 921 try (var listener = ServerSocketChannel.open()) { 922 listener.bind(new InetSocketAddress(lh, 0)); 923 SocketChannel sc1 = SocketChannel.open(); 924 SocketChannel sc2 = null; 925 try { 926 sc1.socket().connect(listener.getLocalAddress()); 927 sc2 = listener.accept(); 928 } catch (IOException ioe) { 929 sc1.close(); 930 throw ioe; 931 } 932 this.sc1 = sc1; 933 this.sc2 = sc2; 934 } 935 } 936 SocketChannel channel1() { 937 return sc1; 938 } 939 SocketChannel channel2() { 940 return sc2; 941 } 942 @Override 943 public void close() throws IOException { 944 sc1.close(); 945 sc2.close(); 946 } 947 } 948 949 /** 950 * Read from a channel until all bytes have been read or an I/O error occurs. 951 */ 952 static void readToEOF(ReadableByteChannel rbc) throws IOException { 953 ByteBuffer bb = ByteBuffer.allocate(16*1024); 954 int n; 955 while ((n = rbc.read(bb)) > 0) { 956 bb.clear(); 957 } 958 } 959 960 @FunctionalInterface 961 interface ThrowingRunnable { 962 void run() throws Exception; 963 } 964 965 /** 966 * Runs the given task asynchronously after the current virtual thread has parked. 967 * @return the thread started to run the task 968 */ 969 static Thread runAfterParkedAsync(ThrowingRunnable task) { 970 Thread target = Thread.currentThread(); 971 if (!target.isVirtual()) 972 throw new WrongThreadException(); 973 return Thread.ofPlatform().daemon().start(() -> { 974 try { 975 Thread.State state = target.getState(); 976 while (state != Thread.State.WAITING 977 && state != Thread.State.TIMED_WAITING) { 978 Thread.sleep(20); 979 state = target.getState(); 980 } 981 Thread.sleep(20); // give a bit more time to release carrier 982 task.run(); 983 } catch (Exception e) { 984 e.printStackTrace(); 985 } 986 }); 987 } 988 } --- EOF ---