1 /* 2 * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test id=default 26 * @bug 8284161 27 * @summary Test virtual threads doing blocking I/O on NIO channels 28 * @library /test/lib 29 * @run junit/othervm/timeout=480 BlockingChannelOps 30 */ 31 32 /* 33 * @test id=poller-modes 34 * @requires (os.family == "linux") | (os.family == "mac") 35 * @library /test/lib 36 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps 37 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps 38 * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 BlockingChannelOps 39 */ 40 41 /* 42 * @test id=io_uring 43 * @requires os.family == "linux" 44 * @library /test/lib 45 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring=true BlockingChannelOps 46 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring=true BlockingChannelOps 47 * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring=true BlockingChannelOps 48 */ 49 50 /* 51 * @test id=no-vmcontinuations 52 * @requires vm.continuations 53 * @library /test/lib 54 * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps 55 */ 56 57 import java.io.Closeable; 58 import java.io.IOException; 59 import java.net.DatagramPacket; 60 import java.net.InetAddress; 61 import java.net.InetSocketAddress; 62 import java.net.Socket; 63 import java.net.SocketAddress; 64 import java.net.SocketException; 65 import java.nio.ByteBuffer; 66 import java.nio.channels.AsynchronousCloseException; 67 import java.nio.channels.ClosedByInterruptException; 68 import java.nio.channels.ClosedChannelException; 69 import java.nio.channels.DatagramChannel; 70 import java.nio.channels.Pipe; 71 import java.nio.channels.ReadableByteChannel; 72 import java.nio.channels.ServerSocketChannel; 73 import java.nio.channels.SocketChannel; 74 import java.nio.channels.WritableByteChannel; 75 import java.util.concurrent.ExecutorService; 76 import java.util.concurrent.Executors; 77 import java.util.concurrent.ThreadFactory; 78 import java.util.concurrent.locks.LockSupport; 79 import java.util.stream.Stream; 80 81 import jdk.test.lib.thread.VThreadRunner; 82 import jdk.test.lib.thread.VThreadScheduler; 83 84 import org.junit.jupiter.api.BeforeAll; 85 import org.junit.jupiter.params.ParameterizedTest; 86 import org.junit.jupiter.params.provider.MethodSource; 87 import static org.junit.jupiter.api.Assertions.*; 88 89 class BlockingChannelOps { 90 private static ExecutorService threadPool; 91 private static Thread.VirtualThreadScheduler customScheduler; 92 93 @BeforeAll 94 static void setup() { 95 ThreadFactory factory = Thread.ofPlatform().daemon().factory(); 96 threadPool = Executors.newCachedThreadPool(factory); 97 customScheduler = (_, task) -> threadPool.execute(task); 98 } 99 100 /** 101 * Returns the Thread.Builder to create the virtual thread. 102 */ 103 static Stream<Thread.Builder.OfVirtual> threadBuilders() { 104 if (VThreadScheduler.supportsCustomScheduler()) { 105 return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler)); 106 } else { 107 return Stream.of(Thread.ofVirtual()); 108 } 109 } 110 111 /** 112 * SocketChannel read/write, no blocking. 113 */ 114 @ParameterizedTest 115 @MethodSource("threadBuilders") 116 void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception { 117 VThreadRunner.run(builder, () -> { 118 try (var connection = new Connection()) { 119 SocketChannel sc1 = connection.channel1(); 120 SocketChannel sc2 = connection.channel2(); 121 122 // write to sc1 123 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 124 int n = sc1.write(bb); 125 assertTrue(n > 0); 126 127 // read from sc2 should not block 128 bb = ByteBuffer.allocate(10); 129 n = sc2.read(bb); 130 assertTrue(n > 0); 131 assertTrue(bb.get(0) == 'X'); 132 } 133 }); 134 } 135 136 /** 137 * Virtual thread blocks in SocketChannel read. 138 */ 139 @ParameterizedTest 140 @MethodSource("threadBuilders") 141 void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception { 142 VThreadRunner.run(builder, () -> { 143 try (var connection = new Connection()) { 144 SocketChannel sc1 = connection.channel1(); 145 SocketChannel sc2 = connection.channel2(); 146 147 // write to sc1 when current thread blocks in sc2.read 148 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 149 runAfterParkedAsync(() -> sc1.write(bb1)); 150 151 // read from sc2 should block 152 ByteBuffer bb2 = ByteBuffer.allocate(10); 153 int n = sc2.read(bb2); 154 assertTrue(n > 0); 155 assertTrue(bb2.get(0) == 'X'); 156 } 157 }); 158 } 159 160 /** 161 * Virtual thread blocks in SocketChannel write. 162 */ 163 @ParameterizedTest 164 @MethodSource("threadBuilders") 165 void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception { 166 VThreadRunner.run(builder, () -> { 167 try (var connection = new Connection()) { 168 SocketChannel sc1 = connection.channel1(); 169 SocketChannel sc2 = connection.channel2(); 170 171 // read from sc2 to EOF when current thread blocks in sc1.write 172 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2)); 173 174 // write to sc1 should block 175 ByteBuffer bb = ByteBuffer.allocate(100*1024); 176 for (int i=0; i<1000; i++) { 177 int n = sc1.write(bb); 178 assertTrue(n > 0); 179 bb.clear(); 180 } 181 sc1.close(); 182 183 // wait for reader to finish 184 reader.join(); 185 } 186 }); 187 } 188 189 /** 190 * SocketChannel close while virtual thread blocked in read. 191 */ 192 @ParameterizedTest 193 @MethodSource("threadBuilders") 194 void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 195 VThreadRunner.run(builder, () -> { 196 try (var connection = new Connection()) { 197 SocketChannel sc = connection.channel1(); 198 runAfterParkedAsync(sc::close); 199 try { 200 int n = sc.read(ByteBuffer.allocate(100)); 201 fail("read returned " + n); 202 } catch (AsynchronousCloseException expected) { } 203 } 204 }); 205 } 206 207 /** 208 * SocketChannel shutdownInput while virtual thread blocked in read. 209 */ 210 @ParameterizedTest 211 @MethodSource("threadBuilders") 212 void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception { 213 VThreadRunner.run(builder, () -> { 214 try (var connection = new Connection()) { 215 SocketChannel sc = connection.channel1(); 216 runAfterParkedAsync(sc::shutdownInput); 217 int n = sc.read(ByteBuffer.allocate(100)); 218 assertEquals(-1, n); 219 assertTrue(sc.isOpen()); 220 } 221 }); 222 } 223 224 /** 225 * Virtual thread interrupted while blocked in SocketChannel read. 226 */ 227 @ParameterizedTest 228 @MethodSource("threadBuilders") 229 void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 230 VThreadRunner.run(builder, () -> { 231 try (var connection = new Connection()) { 232 SocketChannel sc = connection.channel1(); 233 234 // interrupt current thread when it blocks in read 235 Thread thisThread = Thread.currentThread(); 236 runAfterParkedAsync(thisThread::interrupt); 237 238 try { 239 int n = sc.read(ByteBuffer.allocate(100)); 240 fail("read returned " + n); 241 } catch (ClosedByInterruptException expected) { 242 assertTrue(Thread.interrupted()); 243 } 244 } 245 }); 246 } 247 248 /** 249 * SocketChannel close while virtual thread blocked in write. 250 */ 251 @ParameterizedTest 252 @MethodSource("threadBuilders") 253 void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 254 VThreadRunner.run(builder, () -> { 255 boolean done = false; 256 while (!done) { 257 try (var connection = new Connection()) { 258 SocketChannel sc = connection.channel1(); 259 260 // close sc when current thread blocks in write 261 runAfterParkedAsync(sc::close, true); 262 263 // write until channel is closed 264 try { 265 ByteBuffer bb = ByteBuffer.allocate(100*1024); 266 for (;;) { 267 int n = sc.write(bb); 268 assertTrue(n > 0); 269 bb.clear(); 270 } 271 } catch (AsynchronousCloseException expected) { 272 // closed when blocked in write 273 done = true; 274 } catch (ClosedChannelException e) { 275 // closed but not blocked in write, need to retry test 276 System.err.format("%s, need to retry!%n", e); 277 } 278 } 279 } 280 }); 281 } 282 283 /** 284 * SocketChannel shutdownOutput while virtual thread blocked in write. 285 */ 286 @ParameterizedTest 287 @MethodSource("threadBuilders") 288 void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception { 289 VThreadRunner.run(builder, () -> { 290 try (var connection = new Connection()) { 291 SocketChannel sc = connection.channel1(); 292 293 // shutdown output when current thread blocks in write 294 runAfterParkedAsync(sc::shutdownOutput); 295 try { 296 ByteBuffer bb = ByteBuffer.allocate(100*1024); 297 for (;;) { 298 int n = sc.write(bb); 299 assertTrue(n > 0); 300 bb.clear(); 301 } 302 } catch (ClosedChannelException e) { 303 // expected 304 } 305 assertTrue(sc.isOpen()); 306 } 307 }); 308 } 309 310 /** 311 * Virtual thread interrupted while blocked in SocketChannel write. 312 */ 313 @ParameterizedTest 314 @MethodSource("threadBuilders") 315 void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 316 VThreadRunner.run(builder, () -> { 317 boolean done = false; 318 while (!done) { 319 try (var connection = new Connection()) { 320 SocketChannel sc = connection.channel1(); 321 322 // interrupt current thread when it blocks in write 323 Thread thisThread = Thread.currentThread(); 324 runAfterParkedAsync(thisThread::interrupt, true); 325 326 // write until channel is closed 327 try { 328 ByteBuffer bb = ByteBuffer.allocate(100*1024); 329 for (;;) { 330 int n = sc.write(bb); 331 assertTrue(n > 0); 332 bb.clear(); 333 } 334 } catch (ClosedByInterruptException e) { 335 // closed when blocked in write 336 assertTrue(Thread.interrupted()); 337 done = true; 338 } catch (ClosedChannelException e) { 339 // closed but not blocked in write, need to retry test 340 System.err.format("%s, need to retry!%n", e); 341 } 342 } 343 } 344 }); 345 } 346 347 /** 348 * Virtual thread blocks in SocketChannel adaptor read. 349 */ 350 @ParameterizedTest 351 @MethodSource("threadBuilders") 352 void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception { 353 testSocketAdaptorRead(builder, 0); 354 } 355 356 /** 357 * Virtual thread blocks in SocketChannel adaptor read with timeout. 358 */ 359 @ParameterizedTest 360 @MethodSource("threadBuilders") 361 void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception { 362 testSocketAdaptorRead(builder, 60_000); 363 } 364 365 private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder, 366 int timeout) throws Exception { 367 VThreadRunner.run(builder, () -> { 368 try (var connection = new Connection()) { 369 SocketChannel sc1 = connection.channel1(); 370 SocketChannel sc2 = connection.channel2(); 371 372 // write to sc1 when currnet thread blocks reading from sc2 373 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 374 runAfterParkedAsync(() -> sc1.write(bb)); 375 376 // read from sc2 should block 377 byte[] array = new byte[100]; 378 if (timeout > 0) 379 sc2.socket().setSoTimeout(timeout); 380 int n = sc2.socket().getInputStream().read(array); 381 assertTrue(n > 0); 382 assertTrue(array[0] == 'X'); 383 } 384 }); 385 } 386 387 /** 388 * ServerSocketChannel accept, no blocking. 389 */ 390 @ParameterizedTest 391 @MethodSource("threadBuilders") 392 void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception { 393 VThreadRunner.run(builder, () -> { 394 try (var ssc = ServerSocketChannel.open()) { 395 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 396 var sc1 = SocketChannel.open(ssc.getLocalAddress()); 397 // accept should not block 398 var sc2 = ssc.accept(); 399 sc1.close(); 400 sc2.close(); 401 } 402 }); 403 } 404 405 /** 406 * Virtual thread blocks in ServerSocketChannel accept. 407 */ 408 @ParameterizedTest 409 @MethodSource("threadBuilders") 410 void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception { 411 VThreadRunner.run(builder, () -> { 412 try (var ssc = ServerSocketChannel.open()) { 413 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 414 var sc1 = SocketChannel.open(); 415 416 // connect when current thread when it blocks in accept 417 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress())); 418 419 // accept should block 420 var sc2 = ssc.accept(); 421 sc1.close(); 422 sc2.close(); 423 } 424 }); 425 } 426 427 /** 428 * SeverSocketChannel close while virtual thread blocked in accept. 429 */ 430 @ParameterizedTest 431 @MethodSource("threadBuilders") 432 void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 433 VThreadRunner.run(builder, () -> { 434 try (var ssc = ServerSocketChannel.open()) { 435 InetAddress lh = InetAddress.getLoopbackAddress(); 436 ssc.bind(new InetSocketAddress(lh, 0)); 437 runAfterParkedAsync(ssc::close); 438 try { 439 SocketChannel sc = ssc.accept(); 440 sc.close(); 441 fail("connection accepted???"); 442 } catch (AsynchronousCloseException expected) { } 443 } 444 }); 445 } 446 447 /** 448 * Virtual thread interrupted while blocked in ServerSocketChannel accept. 449 */ 450 @ParameterizedTest 451 @MethodSource("threadBuilders") 452 void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 453 VThreadRunner.run(builder, () -> { 454 try (var ssc = ServerSocketChannel.open()) { 455 InetAddress lh = InetAddress.getLoopbackAddress(); 456 ssc.bind(new InetSocketAddress(lh, 0)); 457 458 // interrupt current thread when it blocks in accept 459 Thread thisThread = Thread.currentThread(); 460 runAfterParkedAsync(thisThread::interrupt); 461 462 try { 463 SocketChannel sc = ssc.accept(); 464 sc.close(); 465 fail("connection accepted???"); 466 } catch (ClosedByInterruptException expected) { 467 assertTrue(Thread.interrupted()); 468 } 469 } 470 }); 471 } 472 473 /** 474 * Virtual thread blocks in ServerSocketChannel adaptor accept. 475 */ 476 @ParameterizedTest 477 @MethodSource("threadBuilders") 478 void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception { 479 testSocketChannelAdaptorAccept(builder, 0); 480 } 481 482 /** 483 * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout. 484 */ 485 @ParameterizedTest 486 @MethodSource("threadBuilders") 487 void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception { 488 testSocketChannelAdaptorAccept(builder, 60_000); 489 } 490 491 private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder, 492 int timeout) throws Exception { 493 VThreadRunner.run(builder, () -> { 494 try (var ssc = ServerSocketChannel.open()) { 495 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 496 var sc = SocketChannel.open(); 497 498 // interrupt current thread when it blocks in accept 499 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress())); 500 501 // accept should block 502 if (timeout > 0) 503 ssc.socket().setSoTimeout(timeout); 504 Socket s = ssc.socket().accept(); 505 sc.close(); 506 s.close(); 507 } 508 }); 509 } 510 511 /** 512 * DatagramChannel receive/send, no blocking. 513 */ 514 @ParameterizedTest 515 @MethodSource("threadBuilders") 516 void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception { 517 VThreadRunner.run(builder, () -> { 518 try (DatagramChannel dc1 = DatagramChannel.open(); 519 DatagramChannel dc2 = DatagramChannel.open()) { 520 521 InetAddress lh = InetAddress.getLoopbackAddress(); 522 dc2.bind(new InetSocketAddress(lh, 0)); 523 524 // send should not block 525 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 526 int n = dc1.send(bb, dc2.getLocalAddress()); 527 assertTrue(n > 0); 528 529 // receive should not block 530 bb = ByteBuffer.allocate(10); 531 dc2.receive(bb); 532 assertTrue(bb.get(0) == 'X'); 533 } 534 }); 535 } 536 537 /** 538 * Virtual thread blocks in DatagramChannel receive. 539 */ 540 @ParameterizedTest 541 @MethodSource("threadBuilders") 542 void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception { 543 VThreadRunner.run(builder, () -> { 544 try (DatagramChannel dc1 = DatagramChannel.open(); 545 DatagramChannel dc2 = DatagramChannel.open()) { 546 547 InetAddress lh = InetAddress.getLoopbackAddress(); 548 dc2.bind(new InetSocketAddress(lh, 0)); 549 550 // send from dc1 when current thread blocked in dc2.receive 551 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 552 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress())); 553 554 // read from dc2 should block 555 ByteBuffer bb2 = ByteBuffer.allocate(10); 556 dc2.receive(bb2); 557 assertTrue(bb2.get(0) == 'X'); 558 } 559 }); 560 } 561 562 /** 563 * DatagramChannel close while virtual thread blocked in receive. 564 */ 565 @ParameterizedTest 566 @MethodSource("threadBuilders") 567 void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 568 VThreadRunner.run(builder, () -> { 569 try (DatagramChannel dc = DatagramChannel.open()) { 570 InetAddress lh = InetAddress.getLoopbackAddress(); 571 dc.bind(new InetSocketAddress(lh, 0)); 572 runAfterParkedAsync(dc::close); 573 try { 574 dc.receive(ByteBuffer.allocate(100)); 575 fail("receive returned"); 576 } catch (AsynchronousCloseException expected) { } 577 } 578 }); 579 } 580 581 /** 582 * Virtual thread interrupted while blocked in DatagramChannel receive. 583 */ 584 @ParameterizedTest 585 @MethodSource("threadBuilders") 586 void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 587 VThreadRunner.run(builder, () -> { 588 try (DatagramChannel dc = DatagramChannel.open()) { 589 InetAddress lh = InetAddress.getLoopbackAddress(); 590 dc.bind(new InetSocketAddress(lh, 0)); 591 592 // interrupt current thread when it blocks in receive 593 Thread thisThread = Thread.currentThread(); 594 runAfterParkedAsync(thisThread::interrupt); 595 596 try { 597 dc.receive(ByteBuffer.allocate(100)); 598 fail("receive returned"); 599 } catch (ClosedByInterruptException expected) { 600 assertTrue(Thread.interrupted()); 601 } 602 } 603 }); 604 } 605 606 /** 607 * Virtual thread blocks in DatagramSocket adaptor receive. 608 */ 609 @ParameterizedTest 610 @MethodSource("threadBuilders") 611 void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception { 612 testDatagramSocketAdaptorReceive(builder, 0); 613 } 614 615 /** 616 * Virtual thread blocks in DatagramSocket adaptor receive with timeout. 617 */ 618 @ParameterizedTest 619 @MethodSource("threadBuilders") 620 void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception { 621 testDatagramSocketAdaptorReceive(builder, 60_000); 622 } 623 624 private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder, 625 int timeout) throws Exception { 626 VThreadRunner.run(builder, () -> { 627 try (DatagramChannel dc1 = DatagramChannel.open(); 628 DatagramChannel dc2 = DatagramChannel.open()) { 629 630 InetAddress lh = InetAddress.getLoopbackAddress(); 631 dc2.bind(new InetSocketAddress(lh, 0)); 632 633 // send from dc1 when current thread blocks in dc2 receive 634 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 635 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress())); 636 637 // receive should block 638 byte[] array = new byte[100]; 639 DatagramPacket p = new DatagramPacket(array, 0, array.length); 640 if (timeout > 0) 641 dc2.socket().setSoTimeout(timeout); 642 dc2.socket().receive(p); 643 assertTrue(p.getLength() == 3 && array[0] == 'X'); 644 } 645 }); 646 } 647 648 /** 649 * DatagramChannel close while virtual thread blocked in adaptor receive. 650 */ 651 @ParameterizedTest 652 @MethodSource("threadBuilders") 653 void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception { 654 testDatagramSocketAdaptorReceiveAsyncClose(builder, 0); 655 } 656 657 /** 658 * DatagramChannel close while virtual thread blocked in adaptor receive 659 * with timeout. 660 */ 661 @ParameterizedTest 662 @MethodSource("threadBuilders") 663 void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception { 664 testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000); 665 } 666 667 private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder, 668 int timeout) throws Exception { 669 VThreadRunner.run(builder, () -> { 670 try (DatagramChannel dc = DatagramChannel.open()) { 671 InetAddress lh = InetAddress.getLoopbackAddress(); 672 dc.bind(new InetSocketAddress(lh, 0)); 673 674 byte[] array = new byte[100]; 675 DatagramPacket p = new DatagramPacket(array, 0, array.length); 676 if (timeout > 0) 677 dc.socket().setSoTimeout(timeout); 678 679 // close channel/socket when current thread blocks in receive 680 runAfterParkedAsync(dc::close); 681 682 assertThrows(SocketException.class, () -> dc.socket().receive(p)); 683 } 684 }); 685 } 686 687 /** 688 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive. 689 */ 690 @ParameterizedTest 691 @MethodSource("threadBuilders") 692 void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception { 693 testDatagramSocketAdaptorReceiveInterrupt(builder, 0); 694 } 695 696 /** 697 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive 698 * with timeout. 699 */ 700 @ParameterizedTest 701 @MethodSource("threadBuilders") 702 void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception { 703 testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000); 704 } 705 706 private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder, 707 int timeout) throws Exception { 708 VThreadRunner.run(builder, () -> { 709 try (DatagramChannel dc = DatagramChannel.open()) { 710 InetAddress lh = InetAddress.getLoopbackAddress(); 711 dc.bind(new InetSocketAddress(lh, 0)); 712 713 byte[] array = new byte[100]; 714 DatagramPacket p = new DatagramPacket(array, 0, array.length); 715 if (timeout > 0) 716 dc.socket().setSoTimeout(timeout); 717 718 // interrupt current thread when it blocks in receive 719 Thread thisThread = Thread.currentThread(); 720 runAfterParkedAsync(thisThread::interrupt); 721 722 try { 723 dc.socket().receive(p); 724 fail(); 725 } catch (ClosedByInterruptException expected) { 726 assertTrue(Thread.interrupted()); 727 } 728 } 729 }); 730 } 731 732 /** 733 * Pipe read/write, no blocking. 734 */ 735 @ParameterizedTest 736 @MethodSource("threadBuilders") 737 void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception { 738 VThreadRunner.run(builder, () -> { 739 Pipe p = Pipe.open(); 740 try (Pipe.SinkChannel sink = p.sink(); 741 Pipe.SourceChannel source = p.source()) { 742 743 // write should not block 744 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 745 int n = sink.write(bb); 746 assertTrue(n > 0); 747 748 // read should not block 749 bb = ByteBuffer.allocate(10); 750 n = source.read(bb); 751 assertTrue(n > 0); 752 assertTrue(bb.get(0) == 'X'); 753 } 754 }); 755 } 756 757 /** 758 * Virtual thread blocks in Pipe.SourceChannel read. 759 */ 760 @ParameterizedTest 761 @MethodSource("threadBuilders") 762 void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception { 763 VThreadRunner.run(builder, () -> { 764 Pipe p = Pipe.open(); 765 try (Pipe.SinkChannel sink = p.sink(); 766 Pipe.SourceChannel source = p.source()) { 767 768 // write from sink when current thread blocks reading from source 769 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 770 runAfterParkedAsync(() -> sink.write(bb1)); 771 772 // read should block 773 ByteBuffer bb2 = ByteBuffer.allocate(10); 774 int n = source.read(bb2); 775 assertTrue(n > 0); 776 assertTrue(bb2.get(0) == 'X'); 777 } 778 }); 779 } 780 781 /** 782 * Virtual thread blocks in Pipe.SinkChannel write. 783 */ 784 @ParameterizedTest 785 @MethodSource("threadBuilders") 786 void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception { 787 VThreadRunner.run(builder, () -> { 788 Pipe p = Pipe.open(); 789 try (Pipe.SinkChannel sink = p.sink(); 790 Pipe.SourceChannel source = p.source()) { 791 792 // read from source to EOF when current thread blocking in write 793 Thread reader = runAfterParkedAsync(() -> readToEOF(source)); 794 795 // write to sink should block 796 ByteBuffer bb = ByteBuffer.allocate(100*1024); 797 for (int i=0; i<1000; i++) { 798 int n = sink.write(bb); 799 assertTrue(n > 0); 800 bb.clear(); 801 } 802 sink.close(); 803 804 // wait for reader to finish 805 reader.join(); 806 } 807 }); 808 } 809 810 /** 811 * Pipe.SourceChannel close while virtual thread blocked in read. 812 */ 813 @ParameterizedTest 814 @MethodSource("threadBuilders") 815 void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 816 VThreadRunner.run(builder, () -> { 817 Pipe p = Pipe.open(); 818 try (Pipe.SinkChannel sink = p.sink(); 819 Pipe.SourceChannel source = p.source()) { 820 runAfterParkedAsync(source::close); 821 try { 822 int n = source.read(ByteBuffer.allocate(100)); 823 fail("read returned " + n); 824 } catch (AsynchronousCloseException expected) { } 825 } 826 }); 827 } 828 829 /** 830 * Virtual thread interrupted while blocked in Pipe.SourceChannel read. 831 */ 832 @ParameterizedTest 833 @MethodSource("threadBuilders") 834 void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 835 VThreadRunner.run(builder, () -> { 836 Pipe p = Pipe.open(); 837 try (Pipe.SinkChannel sink = p.sink(); 838 Pipe.SourceChannel source = p.source()) { 839 840 // interrupt current thread when it blocks reading from source 841 Thread thisThread = Thread.currentThread(); 842 runAfterParkedAsync(thisThread::interrupt); 843 844 try { 845 int n = source.read(ByteBuffer.allocate(100)); 846 fail("read returned " + n); 847 } catch (ClosedByInterruptException expected) { 848 assertTrue(Thread.interrupted()); 849 } 850 } 851 }); 852 } 853 854 /** 855 * Pipe.SinkChannel close while virtual thread blocked in write. 856 */ 857 @ParameterizedTest 858 @MethodSource("threadBuilders") 859 void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 860 VThreadRunner.run(builder, () -> { 861 boolean done = false; 862 while (!done) { 863 Pipe p = Pipe.open(); 864 try (Pipe.SinkChannel sink = p.sink(); 865 Pipe.SourceChannel source = p.source()) { 866 867 // close sink when current thread blocks in write 868 runAfterParkedAsync(sink::close, true); 869 870 // write until channel is closed 871 try { 872 ByteBuffer bb = ByteBuffer.allocate(100*1024); 873 for (;;) { 874 int n = sink.write(bb); 875 assertTrue(n > 0); 876 bb.clear(); 877 } 878 } catch (AsynchronousCloseException e) { 879 // closed when blocked in write 880 done = true; 881 } catch (ClosedChannelException e) { 882 // closed but not blocked in write, need to retry test 883 System.err.format("%s, need to retry!%n", e); 884 } 885 } 886 } 887 }); 888 } 889 890 /** 891 * Virtual thread interrupted while blocked in Pipe.SinkChannel write. 892 */ 893 @ParameterizedTest 894 @MethodSource("threadBuilders") 895 void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 896 VThreadRunner.run(builder, () -> { 897 boolean done = false; 898 while (!done) { 899 Pipe p = Pipe.open(); 900 try (Pipe.SinkChannel sink = p.sink(); 901 Pipe.SourceChannel source = p.source()) { 902 903 // interrupt current thread when it blocks in write 904 Thread thisThread = Thread.currentThread(); 905 runAfterParkedAsync(thisThread::interrupt, true); 906 907 // write until channel is closed 908 try { 909 ByteBuffer bb = ByteBuffer.allocate(100*1024); 910 for (;;) { 911 int n = sink.write(bb); 912 assertTrue(n > 0); 913 bb.clear(); 914 } 915 } catch (ClosedByInterruptException expected) { 916 // closed when blocked in write 917 assertTrue(Thread.interrupted()); 918 done = true; 919 } catch (ClosedChannelException e) { 920 // closed but not blocked in write, need to retry test 921 System.err.format("%s, need to retry!%n", e); 922 } 923 } 924 } 925 }); 926 } 927 928 /** 929 * Creates a loopback connection 930 */ 931 static class Connection implements Closeable { 932 private final SocketChannel sc1; 933 private final SocketChannel sc2; 934 Connection() throws IOException { 935 var lh = InetAddress.getLoopbackAddress(); 936 try (var listener = ServerSocketChannel.open()) { 937 listener.bind(new InetSocketAddress(lh, 0)); 938 SocketChannel sc1 = SocketChannel.open(); 939 SocketChannel sc2 = null; 940 try { 941 sc1.socket().connect(listener.getLocalAddress()); 942 sc2 = listener.accept(); 943 } catch (IOException ioe) { 944 sc1.close(); 945 throw ioe; 946 } 947 this.sc1 = sc1; 948 this.sc2 = sc2; 949 } 950 } 951 SocketChannel channel1() { 952 return sc1; 953 } 954 SocketChannel channel2() { 955 return sc2; 956 } 957 @Override 958 public void close() throws IOException { 959 sc1.close(); 960 sc2.close(); 961 } 962 } 963 964 /** 965 * Read from a channel until all bytes have been read or an I/O error occurs. 966 */ 967 static void readToEOF(ReadableByteChannel rbc) throws IOException { 968 ByteBuffer bb = ByteBuffer.allocate(16*1024); 969 int n; 970 while ((n = rbc.read(bb)) > 0) { 971 bb.clear(); 972 } 973 } 974 975 @FunctionalInterface 976 interface ThrowingRunnable { 977 void run() throws Exception; 978 } 979 980 /** 981 * Runs the given task asynchronously after the current virtual thread parks. 982 * @param writing if the thread will block in write 983 * @return the thread started to run the task 984 */ 985 private static Thread runAfterParkedAsync(ThrowingRunnable task, boolean writing) { 986 Thread target = Thread.currentThread(); 987 if (!target.isVirtual()) 988 throw new WrongThreadException(); 989 return Thread.ofPlatform().daemon().start(() -> { 990 try { 991 // wait for target thread to park 992 while (!isWaiting(target)) { 993 Thread.sleep(20); 994 } 995 996 // if the target thread is parked in write then we nudge it a few times 997 // to avoid wakeup with some bytes written 998 if (writing) { 999 for (int i = 0; i < 3; i++) { 1000 LockSupport.unpark(target); 1001 while (!isWaiting(target)) { 1002 Thread.sleep(20); 1003 } 1004 } 1005 } 1006 1007 task.run(); 1008 1009 } catch (Exception e) { 1010 e.printStackTrace(); 1011 } 1012 }); 1013 } 1014 1015 private static Thread runAfterParkedAsync(ThrowingRunnable task) { 1016 return runAfterParkedAsync(task, false); 1017 } 1018 1019 /** 1020 * Return true if the given Thread is parked. 1021 */ 1022 private static boolean isWaiting(Thread target) { 1023 Thread.State state = target.getState(); 1024 assertNotEquals(Thread.State.TERMINATED, state); 1025 return (state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING); 1026 } 1027 }