1 /* 2 * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test id=default 26 * @bug 8284161 27 * @summary Test virtual threads doing blocking I/O on NIO channels 28 * @library /test/lib 29 * @run junit/othervm/timeout=480 BlockingChannelOps 30 */ 31 32 /* 33 * @test id=poller-modes 34 * @requires (os.family == "linux") | (os.family == "mac") 35 * @library /test/lib 36 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps 37 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps 38 * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 BlockingChannelOps 39 */ 40 41 /* 42 * @test id=io_uring 43 * @requires os.family == "linux" 44 * @library /test/lib 45 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring=true BlockingChannelOps 46 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring=true BlockingChannelOps 47 * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring=true BlockingChannelOps 48 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps 49 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps 50 * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps 51 */ 52 53 /* 54 * @test id=no-vmcontinuations 55 * @requires vm.continuations 56 * @library /test/lib 57 * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps 58 */ 59 60 import java.io.Closeable; 61 import java.io.IOException; 62 import java.net.DatagramPacket; 63 import java.net.InetAddress; 64 import java.net.InetSocketAddress; 65 import java.net.Socket; 66 import java.net.SocketAddress; 67 import java.net.SocketException; 68 import java.nio.ByteBuffer; 69 import java.nio.channels.AsynchronousCloseException; 70 import java.nio.channels.ClosedByInterruptException; 71 import java.nio.channels.ClosedChannelException; 72 import java.nio.channels.DatagramChannel; 73 import java.nio.channels.Pipe; 74 import java.nio.channels.ReadableByteChannel; 75 import java.nio.channels.ServerSocketChannel; 76 import java.nio.channels.SocketChannel; 77 import java.nio.channels.WritableByteChannel; 78 import java.util.concurrent.ExecutorService; 79 import java.util.concurrent.Executors; 80 import java.util.concurrent.ThreadFactory; 81 import java.util.concurrent.locks.LockSupport; 82 import java.util.stream.Stream; 83 84 import jdk.test.lib.thread.VThreadRunner; 85 import jdk.test.lib.thread.VThreadScheduler; 86 87 import org.junit.jupiter.api.BeforeAll; 88 import org.junit.jupiter.params.ParameterizedTest; 89 import org.junit.jupiter.params.provider.MethodSource; 90 import static org.junit.jupiter.api.Assertions.*; 91 92 class BlockingChannelOps { 93 private static ExecutorService threadPool; 94 private static Thread.VirtualThreadScheduler customScheduler; 95 96 @BeforeAll 97 static void setup() { 98 ThreadFactory factory = Thread.ofPlatform().daemon().factory(); 99 threadPool = Executors.newCachedThreadPool(factory); 100 customScheduler = (_, task) -> threadPool.execute(task); 101 } 102 103 /** 104 * Returns the Thread.Builder to create the virtual thread. 105 */ 106 static Stream<Thread.Builder.OfVirtual> threadBuilders() { 107 if (VThreadScheduler.supportsCustomScheduler()) { 108 return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler)); 109 } else { 110 return Stream.of(Thread.ofVirtual()); 111 } 112 } 113 114 /** 115 * SocketChannel read/write, no blocking. 116 */ 117 @ParameterizedTest 118 @MethodSource("threadBuilders") 119 void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception { 120 VThreadRunner.run(builder, () -> { 121 try (var connection = new Connection()) { 122 SocketChannel sc1 = connection.channel1(); 123 SocketChannel sc2 = connection.channel2(); 124 125 // write to sc1 126 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 127 int n = sc1.write(bb); 128 assertTrue(n > 0); 129 130 // read from sc2 should not block 131 bb = ByteBuffer.allocate(10); 132 n = sc2.read(bb); 133 assertTrue(n > 0); 134 assertTrue(bb.get(0) == 'X'); 135 } 136 }); 137 } 138 139 /** 140 * Virtual thread blocks in SocketChannel read. 141 */ 142 @ParameterizedTest 143 @MethodSource("threadBuilders") 144 void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception { 145 VThreadRunner.run(builder, () -> { 146 try (var connection = new Connection()) { 147 SocketChannel sc1 = connection.channel1(); 148 SocketChannel sc2 = connection.channel2(); 149 150 // write to sc1 when current thread blocks in sc2.read 151 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 152 runAfterParkedAsync(() -> sc1.write(bb1)); 153 154 // read from sc2 should block 155 ByteBuffer bb2 = ByteBuffer.allocate(10); 156 int n = sc2.read(bb2); 157 assertTrue(n > 0); 158 assertTrue(bb2.get(0) == 'X'); 159 } 160 }); 161 } 162 163 /** 164 * Virtual thread blocks in SocketChannel write. 165 */ 166 @ParameterizedTest 167 @MethodSource("threadBuilders") 168 void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception { 169 VThreadRunner.run(builder, () -> { 170 try (var connection = new Connection()) { 171 SocketChannel sc1 = connection.channel1(); 172 SocketChannel sc2 = connection.channel2(); 173 174 // read from sc2 to EOF when current thread blocks in sc1.write 175 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2)); 176 177 // write to sc1 should block 178 ByteBuffer bb = ByteBuffer.allocate(100*1024); 179 for (int i=0; i<1000; i++) { 180 int n = sc1.write(bb); 181 assertTrue(n > 0); 182 bb.clear(); 183 } 184 sc1.close(); 185 186 // wait for reader to finish 187 reader.join(); 188 } 189 }); 190 } 191 192 /** 193 * SocketChannel close while virtual thread blocked in read. 194 */ 195 @ParameterizedTest 196 @MethodSource("threadBuilders") 197 void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 198 VThreadRunner.run(builder, () -> { 199 try (var connection = new Connection()) { 200 SocketChannel sc = connection.channel1(); 201 runAfterParkedAsync(sc::close); 202 try { 203 int n = sc.read(ByteBuffer.allocate(100)); 204 fail("read returned " + n); 205 } catch (AsynchronousCloseException expected) { } 206 } 207 }); 208 } 209 210 /** 211 * SocketChannel shutdownInput while virtual thread blocked in read. 212 */ 213 @ParameterizedTest 214 @MethodSource("threadBuilders") 215 void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception { 216 VThreadRunner.run(builder, () -> { 217 try (var connection = new Connection()) { 218 SocketChannel sc = connection.channel1(); 219 runAfterParkedAsync(sc::shutdownInput); 220 int n = sc.read(ByteBuffer.allocate(100)); 221 assertEquals(-1, n); 222 assertTrue(sc.isOpen()); 223 } 224 }); 225 } 226 227 /** 228 * Virtual thread interrupted while blocked in SocketChannel read. 229 */ 230 @ParameterizedTest 231 @MethodSource("threadBuilders") 232 void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 233 VThreadRunner.run(builder, () -> { 234 try (var connection = new Connection()) { 235 SocketChannel sc = connection.channel1(); 236 237 // interrupt current thread when it blocks in read 238 Thread thisThread = Thread.currentThread(); 239 runAfterParkedAsync(thisThread::interrupt); 240 241 try { 242 int n = sc.read(ByteBuffer.allocate(100)); 243 fail("read returned " + n); 244 } catch (ClosedByInterruptException expected) { 245 assertTrue(Thread.interrupted()); 246 } 247 } 248 }); 249 } 250 251 /** 252 * SocketChannel close while virtual thread blocked in write. 253 */ 254 @ParameterizedTest 255 @MethodSource("threadBuilders") 256 void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 257 VThreadRunner.run(builder, () -> { 258 boolean done = false; 259 while (!done) { 260 try (var connection = new Connection()) { 261 SocketChannel sc = connection.channel1(); 262 263 // close sc when current thread blocks in write 264 runAfterParkedAsync(sc::close, true); 265 266 // write until channel is closed 267 try { 268 ByteBuffer bb = ByteBuffer.allocate(100*1024); 269 for (;;) { 270 int n = sc.write(bb); 271 assertTrue(n > 0); 272 bb.clear(); 273 } 274 } catch (AsynchronousCloseException expected) { 275 // closed when blocked in write 276 done = true; 277 } catch (ClosedChannelException e) { 278 // closed but not blocked in write, need to retry test 279 System.err.format("%s, need to retry!%n", e); 280 } 281 } 282 } 283 }); 284 } 285 286 /** 287 * SocketChannel shutdownOutput while virtual thread blocked in write. 288 */ 289 @ParameterizedTest 290 @MethodSource("threadBuilders") 291 void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception { 292 VThreadRunner.run(builder, () -> { 293 try (var connection = new Connection()) { 294 SocketChannel sc = connection.channel1(); 295 296 // shutdown output when current thread blocks in write 297 runAfterParkedAsync(sc::shutdownOutput); 298 try { 299 ByteBuffer bb = ByteBuffer.allocate(100*1024); 300 for (;;) { 301 int n = sc.write(bb); 302 assertTrue(n > 0); 303 bb.clear(); 304 } 305 } catch (ClosedChannelException e) { 306 // expected 307 } 308 assertTrue(sc.isOpen()); 309 } 310 }); 311 } 312 313 /** 314 * Virtual thread interrupted while blocked in SocketChannel write. 315 */ 316 @ParameterizedTest 317 @MethodSource("threadBuilders") 318 void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 319 VThreadRunner.run(builder, () -> { 320 boolean done = false; 321 while (!done) { 322 try (var connection = new Connection()) { 323 SocketChannel sc = connection.channel1(); 324 325 // interrupt current thread when it blocks in write 326 Thread thisThread = Thread.currentThread(); 327 runAfterParkedAsync(thisThread::interrupt, true); 328 329 // write until channel is closed 330 try { 331 ByteBuffer bb = ByteBuffer.allocate(100*1024); 332 for (;;) { 333 int n = sc.write(bb); 334 assertTrue(n > 0); 335 bb.clear(); 336 } 337 } catch (ClosedByInterruptException e) { 338 // closed when blocked in write 339 assertTrue(Thread.interrupted()); 340 done = true; 341 } catch (ClosedChannelException e) { 342 // closed but not blocked in write, need to retry test 343 System.err.format("%s, need to retry!%n", e); 344 } 345 } 346 } 347 }); 348 } 349 350 /** 351 * Virtual thread blocks in SocketChannel adaptor read. 352 */ 353 @ParameterizedTest 354 @MethodSource("threadBuilders") 355 void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception { 356 testSocketAdaptorRead(builder, 0); 357 } 358 359 /** 360 * Virtual thread blocks in SocketChannel adaptor read with timeout. 361 */ 362 @ParameterizedTest 363 @MethodSource("threadBuilders") 364 void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception { 365 testSocketAdaptorRead(builder, 60_000); 366 } 367 368 private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder, 369 int timeout) throws Exception { 370 VThreadRunner.run(builder, () -> { 371 try (var connection = new Connection()) { 372 SocketChannel sc1 = connection.channel1(); 373 SocketChannel sc2 = connection.channel2(); 374 375 // write to sc1 when currnet thread blocks reading from sc2 376 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 377 runAfterParkedAsync(() -> sc1.write(bb)); 378 379 // read from sc2 should block 380 byte[] array = new byte[100]; 381 if (timeout > 0) 382 sc2.socket().setSoTimeout(timeout); 383 int n = sc2.socket().getInputStream().read(array); 384 assertTrue(n > 0); 385 assertTrue(array[0] == 'X'); 386 } 387 }); 388 } 389 390 /** 391 * ServerSocketChannel accept, no blocking. 392 */ 393 @ParameterizedTest 394 @MethodSource("threadBuilders") 395 void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception { 396 VThreadRunner.run(builder, () -> { 397 try (var ssc = ServerSocketChannel.open()) { 398 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 399 var sc1 = SocketChannel.open(ssc.getLocalAddress()); 400 // accept should not block 401 var sc2 = ssc.accept(); 402 sc1.close(); 403 sc2.close(); 404 } 405 }); 406 } 407 408 /** 409 * Virtual thread blocks in ServerSocketChannel accept. 410 */ 411 @ParameterizedTest 412 @MethodSource("threadBuilders") 413 void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception { 414 VThreadRunner.run(builder, () -> { 415 try (var ssc = ServerSocketChannel.open()) { 416 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 417 var sc1 = SocketChannel.open(); 418 419 // connect when current thread when it blocks in accept 420 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress())); 421 422 // accept should block 423 var sc2 = ssc.accept(); 424 sc1.close(); 425 sc2.close(); 426 } 427 }); 428 } 429 430 /** 431 * SeverSocketChannel close while virtual thread blocked in accept. 432 */ 433 @ParameterizedTest 434 @MethodSource("threadBuilders") 435 void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 436 VThreadRunner.run(builder, () -> { 437 try (var ssc = ServerSocketChannel.open()) { 438 InetAddress lh = InetAddress.getLoopbackAddress(); 439 ssc.bind(new InetSocketAddress(lh, 0)); 440 runAfterParkedAsync(ssc::close); 441 try { 442 SocketChannel sc = ssc.accept(); 443 sc.close(); 444 fail("connection accepted???"); 445 } catch (AsynchronousCloseException expected) { } 446 } 447 }); 448 } 449 450 /** 451 * Virtual thread interrupted while blocked in ServerSocketChannel accept. 452 */ 453 @ParameterizedTest 454 @MethodSource("threadBuilders") 455 void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 456 VThreadRunner.run(builder, () -> { 457 try (var ssc = ServerSocketChannel.open()) { 458 InetAddress lh = InetAddress.getLoopbackAddress(); 459 ssc.bind(new InetSocketAddress(lh, 0)); 460 461 // interrupt current thread when it blocks in accept 462 Thread thisThread = Thread.currentThread(); 463 runAfterParkedAsync(thisThread::interrupt); 464 465 try { 466 SocketChannel sc = ssc.accept(); 467 sc.close(); 468 fail("connection accepted???"); 469 } catch (ClosedByInterruptException expected) { 470 assertTrue(Thread.interrupted()); 471 } 472 } 473 }); 474 } 475 476 /** 477 * Virtual thread blocks in ServerSocketChannel adaptor accept. 478 */ 479 @ParameterizedTest 480 @MethodSource("threadBuilders") 481 void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception { 482 testSocketChannelAdaptorAccept(builder, 0); 483 } 484 485 /** 486 * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout. 487 */ 488 @ParameterizedTest 489 @MethodSource("threadBuilders") 490 void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception { 491 testSocketChannelAdaptorAccept(builder, 60_000); 492 } 493 494 private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder, 495 int timeout) throws Exception { 496 VThreadRunner.run(builder, () -> { 497 try (var ssc = ServerSocketChannel.open()) { 498 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 499 var sc = SocketChannel.open(); 500 501 // interrupt current thread when it blocks in accept 502 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress())); 503 504 // accept should block 505 if (timeout > 0) 506 ssc.socket().setSoTimeout(timeout); 507 Socket s = ssc.socket().accept(); 508 sc.close(); 509 s.close(); 510 } 511 }); 512 } 513 514 /** 515 * DatagramChannel receive/send, no blocking. 516 */ 517 @ParameterizedTest 518 @MethodSource("threadBuilders") 519 void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception { 520 VThreadRunner.run(builder, () -> { 521 try (DatagramChannel dc1 = DatagramChannel.open(); 522 DatagramChannel dc2 = DatagramChannel.open()) { 523 524 InetAddress lh = InetAddress.getLoopbackAddress(); 525 dc2.bind(new InetSocketAddress(lh, 0)); 526 527 // send should not block 528 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 529 int n = dc1.send(bb, dc2.getLocalAddress()); 530 assertTrue(n > 0); 531 532 // receive should not block 533 bb = ByteBuffer.allocate(10); 534 dc2.receive(bb); 535 assertTrue(bb.get(0) == 'X'); 536 } 537 }); 538 } 539 540 /** 541 * Virtual thread blocks in DatagramChannel receive. 542 */ 543 @ParameterizedTest 544 @MethodSource("threadBuilders") 545 void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception { 546 VThreadRunner.run(builder, () -> { 547 try (DatagramChannel dc1 = DatagramChannel.open(); 548 DatagramChannel dc2 = DatagramChannel.open()) { 549 550 InetAddress lh = InetAddress.getLoopbackAddress(); 551 dc2.bind(new InetSocketAddress(lh, 0)); 552 553 // send from dc1 when current thread blocked in dc2.receive 554 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 555 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress())); 556 557 // read from dc2 should block 558 ByteBuffer bb2 = ByteBuffer.allocate(10); 559 dc2.receive(bb2); 560 assertTrue(bb2.get(0) == 'X'); 561 } 562 }); 563 } 564 565 /** 566 * DatagramChannel close while virtual thread blocked in receive. 567 */ 568 @ParameterizedTest 569 @MethodSource("threadBuilders") 570 void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 571 VThreadRunner.run(builder, () -> { 572 try (DatagramChannel dc = DatagramChannel.open()) { 573 InetAddress lh = InetAddress.getLoopbackAddress(); 574 dc.bind(new InetSocketAddress(lh, 0)); 575 runAfterParkedAsync(dc::close); 576 try { 577 dc.receive(ByteBuffer.allocate(100)); 578 fail("receive returned"); 579 } catch (AsynchronousCloseException expected) { } 580 } 581 }); 582 } 583 584 /** 585 * Virtual thread interrupted while blocked in DatagramChannel receive. 586 */ 587 @ParameterizedTest 588 @MethodSource("threadBuilders") 589 void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 590 VThreadRunner.run(builder, () -> { 591 try (DatagramChannel dc = DatagramChannel.open()) { 592 InetAddress lh = InetAddress.getLoopbackAddress(); 593 dc.bind(new InetSocketAddress(lh, 0)); 594 595 // interrupt current thread when it blocks in receive 596 Thread thisThread = Thread.currentThread(); 597 runAfterParkedAsync(thisThread::interrupt); 598 599 try { 600 dc.receive(ByteBuffer.allocate(100)); 601 fail("receive returned"); 602 } catch (ClosedByInterruptException expected) { 603 assertTrue(Thread.interrupted()); 604 } 605 } 606 }); 607 } 608 609 /** 610 * Virtual thread blocks in DatagramSocket adaptor receive. 611 */ 612 @ParameterizedTest 613 @MethodSource("threadBuilders") 614 void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception { 615 testDatagramSocketAdaptorReceive(builder, 0); 616 } 617 618 /** 619 * Virtual thread blocks in DatagramSocket adaptor receive with timeout. 620 */ 621 @ParameterizedTest 622 @MethodSource("threadBuilders") 623 void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception { 624 testDatagramSocketAdaptorReceive(builder, 60_000); 625 } 626 627 private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder, 628 int timeout) throws Exception { 629 VThreadRunner.run(builder, () -> { 630 try (DatagramChannel dc1 = DatagramChannel.open(); 631 DatagramChannel dc2 = DatagramChannel.open()) { 632 633 InetAddress lh = InetAddress.getLoopbackAddress(); 634 dc2.bind(new InetSocketAddress(lh, 0)); 635 636 // send from dc1 when current thread blocks in dc2 receive 637 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 638 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress())); 639 640 // receive should block 641 byte[] array = new byte[100]; 642 DatagramPacket p = new DatagramPacket(array, 0, array.length); 643 if (timeout > 0) 644 dc2.socket().setSoTimeout(timeout); 645 dc2.socket().receive(p); 646 assertTrue(p.getLength() == 3 && array[0] == 'X'); 647 } 648 }); 649 } 650 651 /** 652 * DatagramChannel close while virtual thread blocked in adaptor receive. 653 */ 654 @ParameterizedTest 655 @MethodSource("threadBuilders") 656 void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception { 657 testDatagramSocketAdaptorReceiveAsyncClose(builder, 0); 658 } 659 660 /** 661 * DatagramChannel close while virtual thread blocked in adaptor receive 662 * with timeout. 663 */ 664 @ParameterizedTest 665 @MethodSource("threadBuilders") 666 void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception { 667 testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000); 668 } 669 670 private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder, 671 int timeout) throws Exception { 672 VThreadRunner.run(builder, () -> { 673 try (DatagramChannel dc = DatagramChannel.open()) { 674 InetAddress lh = InetAddress.getLoopbackAddress(); 675 dc.bind(new InetSocketAddress(lh, 0)); 676 677 byte[] array = new byte[100]; 678 DatagramPacket p = new DatagramPacket(array, 0, array.length); 679 if (timeout > 0) 680 dc.socket().setSoTimeout(timeout); 681 682 // close channel/socket when current thread blocks in receive 683 runAfterParkedAsync(dc::close); 684 685 assertThrows(SocketException.class, () -> dc.socket().receive(p)); 686 } 687 }); 688 } 689 690 /** 691 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive. 692 */ 693 @ParameterizedTest 694 @MethodSource("threadBuilders") 695 void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception { 696 testDatagramSocketAdaptorReceiveInterrupt(builder, 0); 697 } 698 699 /** 700 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive 701 * with timeout. 702 */ 703 @ParameterizedTest 704 @MethodSource("threadBuilders") 705 void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception { 706 testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000); 707 } 708 709 private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder, 710 int timeout) throws Exception { 711 VThreadRunner.run(builder, () -> { 712 try (DatagramChannel dc = DatagramChannel.open()) { 713 InetAddress lh = InetAddress.getLoopbackAddress(); 714 dc.bind(new InetSocketAddress(lh, 0)); 715 716 byte[] array = new byte[100]; 717 DatagramPacket p = new DatagramPacket(array, 0, array.length); 718 if (timeout > 0) 719 dc.socket().setSoTimeout(timeout); 720 721 // interrupt current thread when it blocks in receive 722 Thread thisThread = Thread.currentThread(); 723 runAfterParkedAsync(thisThread::interrupt); 724 725 try { 726 dc.socket().receive(p); 727 fail(); 728 } catch (ClosedByInterruptException expected) { 729 assertTrue(Thread.interrupted()); 730 } 731 } 732 }); 733 } 734 735 /** 736 * Pipe read/write, no blocking. 737 */ 738 @ParameterizedTest 739 @MethodSource("threadBuilders") 740 void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception { 741 VThreadRunner.run(builder, () -> { 742 Pipe p = Pipe.open(); 743 try (Pipe.SinkChannel sink = p.sink(); 744 Pipe.SourceChannel source = p.source()) { 745 746 // write should not block 747 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 748 int n = sink.write(bb); 749 assertTrue(n > 0); 750 751 // read should not block 752 bb = ByteBuffer.allocate(10); 753 n = source.read(bb); 754 assertTrue(n > 0); 755 assertTrue(bb.get(0) == 'X'); 756 } 757 }); 758 } 759 760 /** 761 * Virtual thread blocks in Pipe.SourceChannel read. 762 */ 763 @ParameterizedTest 764 @MethodSource("threadBuilders") 765 void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception { 766 VThreadRunner.run(builder, () -> { 767 Pipe p = Pipe.open(); 768 try (Pipe.SinkChannel sink = p.sink(); 769 Pipe.SourceChannel source = p.source()) { 770 771 // write from sink when current thread blocks reading from source 772 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8")); 773 runAfterParkedAsync(() -> sink.write(bb1)); 774 775 // read should block 776 ByteBuffer bb2 = ByteBuffer.allocate(10); 777 int n = source.read(bb2); 778 assertTrue(n > 0); 779 assertTrue(bb2.get(0) == 'X'); 780 } 781 }); 782 } 783 784 /** 785 * Virtual thread blocks in Pipe.SinkChannel write. 786 */ 787 @ParameterizedTest 788 @MethodSource("threadBuilders") 789 void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception { 790 VThreadRunner.run(builder, () -> { 791 Pipe p = Pipe.open(); 792 try (Pipe.SinkChannel sink = p.sink(); 793 Pipe.SourceChannel source = p.source()) { 794 795 // read from source to EOF when current thread blocking in write 796 Thread reader = runAfterParkedAsync(() -> readToEOF(source)); 797 798 // write to sink should block 799 ByteBuffer bb = ByteBuffer.allocate(100*1024); 800 for (int i=0; i<1000; i++) { 801 int n = sink.write(bb); 802 assertTrue(n > 0); 803 bb.clear(); 804 } 805 sink.close(); 806 807 // wait for reader to finish 808 reader.join(); 809 } 810 }); 811 } 812 813 /** 814 * Pipe.SourceChannel close while virtual thread blocked in read. 815 */ 816 @ParameterizedTest 817 @MethodSource("threadBuilders") 818 void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 819 VThreadRunner.run(builder, () -> { 820 Pipe p = Pipe.open(); 821 try (Pipe.SinkChannel sink = p.sink(); 822 Pipe.SourceChannel source = p.source()) { 823 runAfterParkedAsync(source::close); 824 try { 825 int n = source.read(ByteBuffer.allocate(100)); 826 fail("read returned " + n); 827 } catch (AsynchronousCloseException expected) { } 828 } 829 }); 830 } 831 832 /** 833 * Virtual thread interrupted while blocked in Pipe.SourceChannel read. 834 */ 835 @ParameterizedTest 836 @MethodSource("threadBuilders") 837 void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 838 VThreadRunner.run(builder, () -> { 839 Pipe p = Pipe.open(); 840 try (Pipe.SinkChannel sink = p.sink(); 841 Pipe.SourceChannel source = p.source()) { 842 843 // interrupt current thread when it blocks reading from source 844 Thread thisThread = Thread.currentThread(); 845 runAfterParkedAsync(thisThread::interrupt); 846 847 try { 848 int n = source.read(ByteBuffer.allocate(100)); 849 fail("read returned " + n); 850 } catch (ClosedByInterruptException expected) { 851 assertTrue(Thread.interrupted()); 852 } 853 } 854 }); 855 } 856 857 /** 858 * Pipe.SinkChannel close while virtual thread blocked in write. 859 */ 860 @ParameterizedTest 861 @MethodSource("threadBuilders") 862 void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception { 863 VThreadRunner.run(builder, () -> { 864 boolean done = false; 865 while (!done) { 866 Pipe p = Pipe.open(); 867 try (Pipe.SinkChannel sink = p.sink(); 868 Pipe.SourceChannel source = p.source()) { 869 870 // close sink when current thread blocks in write 871 runAfterParkedAsync(sink::close, true); 872 873 // write until channel is closed 874 try { 875 ByteBuffer bb = ByteBuffer.allocate(100*1024); 876 for (;;) { 877 int n = sink.write(bb); 878 assertTrue(n > 0); 879 bb.clear(); 880 } 881 } catch (AsynchronousCloseException e) { 882 // closed when blocked in write 883 done = true; 884 } catch (ClosedChannelException e) { 885 // closed but not blocked in write, need to retry test 886 System.err.format("%s, need to retry!%n", e); 887 } 888 } 889 } 890 }); 891 } 892 893 /** 894 * Virtual thread interrupted while blocked in Pipe.SinkChannel write. 895 */ 896 @ParameterizedTest 897 @MethodSource("threadBuilders") 898 void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception { 899 VThreadRunner.run(builder, () -> { 900 boolean done = false; 901 while (!done) { 902 Pipe p = Pipe.open(); 903 try (Pipe.SinkChannel sink = p.sink(); 904 Pipe.SourceChannel source = p.source()) { 905 906 // interrupt current thread when it blocks in write 907 Thread thisThread = Thread.currentThread(); 908 runAfterParkedAsync(thisThread::interrupt, true); 909 910 // write until channel is closed 911 try { 912 ByteBuffer bb = ByteBuffer.allocate(100*1024); 913 for (;;) { 914 int n = sink.write(bb); 915 assertTrue(n > 0); 916 bb.clear(); 917 } 918 } catch (ClosedByInterruptException expected) { 919 // closed when blocked in write 920 assertTrue(Thread.interrupted()); 921 done = true; 922 } catch (ClosedChannelException e) { 923 // closed but not blocked in write, need to retry test 924 System.err.format("%s, need to retry!%n", e); 925 } 926 } 927 } 928 }); 929 } 930 931 /** 932 * Creates a loopback connection 933 */ 934 static class Connection implements Closeable { 935 private final SocketChannel sc1; 936 private final SocketChannel sc2; 937 Connection() throws IOException { 938 var lh = InetAddress.getLoopbackAddress(); 939 try (var listener = ServerSocketChannel.open()) { 940 listener.bind(new InetSocketAddress(lh, 0)); 941 SocketChannel sc1 = SocketChannel.open(); 942 SocketChannel sc2 = null; 943 try { 944 sc1.socket().connect(listener.getLocalAddress()); 945 sc2 = listener.accept(); 946 } catch (IOException ioe) { 947 sc1.close(); 948 throw ioe; 949 } 950 this.sc1 = sc1; 951 this.sc2 = sc2; 952 } 953 } 954 SocketChannel channel1() { 955 return sc1; 956 } 957 SocketChannel channel2() { 958 return sc2; 959 } 960 @Override 961 public void close() throws IOException { 962 sc1.close(); 963 sc2.close(); 964 } 965 } 966 967 /** 968 * Read from a channel until all bytes have been read or an I/O error occurs. 969 */ 970 static void readToEOF(ReadableByteChannel rbc) throws IOException { 971 ByteBuffer bb = ByteBuffer.allocate(16*1024); 972 int n; 973 while ((n = rbc.read(bb)) > 0) { 974 bb.clear(); 975 } 976 } 977 978 @FunctionalInterface 979 interface ThrowingRunnable { 980 void run() throws Exception; 981 } 982 983 /** 984 * Runs the given task asynchronously after the current virtual thread parks. 985 * @param writing if the thread will block in write 986 * @return the thread started to run the task 987 */ 988 private static Thread runAfterParkedAsync(ThrowingRunnable task, boolean writing) { 989 Thread target = Thread.currentThread(); 990 if (!target.isVirtual()) 991 throw new WrongThreadException(); 992 return Thread.ofPlatform().daemon().start(() -> { 993 try { 994 // wait for target thread to park 995 while (!isWaiting(target)) { 996 Thread.sleep(20); 997 } 998 999 // if the target thread is parked in write then we nudge it a few times 1000 // to avoid wakeup with some bytes written 1001 if (writing) { 1002 for (int i = 0; i < 3; i++) { 1003 LockSupport.unpark(target); 1004 while (!isWaiting(target)) { 1005 Thread.sleep(20); 1006 } 1007 } 1008 } 1009 1010 task.run(); 1011 1012 } catch (Exception e) { 1013 e.printStackTrace(); 1014 } 1015 }); 1016 } 1017 1018 private static Thread runAfterParkedAsync(ThrowingRunnable task) { 1019 return runAfterParkedAsync(task, false); 1020 } 1021 1022 /** 1023 * Return true if the given Thread is parked. 1024 */ 1025 private static boolean isWaiting(Thread target) { 1026 Thread.State state = target.getState(); 1027 assertNotEquals(Thread.State.TERMINATED, state); 1028 return (state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING); 1029 } 1030 }