1 /*
   2  * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /*
  25  * @test id=default
  26  * @bug 8284161
  27  * @summary Test virtual threads doing blocking I/O on NIO channels
  28  * @library /test/lib
  29  * @run junit/othervm/timeout=480 BlockingChannelOps
  30  */
  31 
  32 /*
  33  * @test id=poller-modes
  34  * @requires (os.family == "linux") | (os.family == "mac")
  35  * @library /test/lib
  36  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
  37  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
  38  * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 BlockingChannelOps
  39  */
  40 
  41 /*
  42  * @test id=io_uring
  43  * @requires os.family == "linux"
  44  * @library /test/lib
  45  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring=true BlockingChannelOps
  46  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring=true BlockingChannelOps
  47  * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring=true BlockingChannelOps
  48  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
  49  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
  50  * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
  51  */
  52 
  53 /*
  54  * @test id=no-vmcontinuations
  55  * @requires vm.continuations
  56  * @library /test/lib
  57  * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
  58  */
  59 
  60 import java.io.Closeable;
  61 import java.io.IOException;
  62 import java.net.DatagramPacket;
  63 import java.net.InetAddress;
  64 import java.net.InetSocketAddress;
  65 import java.net.Socket;
  66 import java.net.SocketAddress;
  67 import java.net.SocketException;
  68 import java.nio.ByteBuffer;
  69 import java.nio.channels.AsynchronousCloseException;
  70 import java.nio.channels.ClosedByInterruptException;
  71 import java.nio.channels.ClosedChannelException;
  72 import java.nio.channels.DatagramChannel;
  73 import java.nio.channels.Pipe;
  74 import java.nio.channels.ReadableByteChannel;
  75 import java.nio.channels.ServerSocketChannel;
  76 import java.nio.channels.SocketChannel;
  77 import java.nio.channels.WritableByteChannel;
  78 import java.util.concurrent.ExecutorService;
  79 import java.util.concurrent.Executors;
  80 import java.util.concurrent.ThreadFactory;
  81 import java.util.concurrent.locks.LockSupport;
  82 import java.util.stream.Stream;
  83 
  84 import jdk.test.lib.thread.VThreadRunner;
  85 import jdk.test.lib.thread.VThreadScheduler;
  86 
  87 import org.junit.jupiter.api.BeforeAll;
  88 import org.junit.jupiter.params.ParameterizedTest;
  89 import org.junit.jupiter.params.provider.MethodSource;
  90 import static org.junit.jupiter.api.Assertions.*;
  91 
  92 class BlockingChannelOps {
  93     private static ExecutorService threadPool;
  94     private static Thread.VirtualThreadScheduler customScheduler;
  95 
  96     @BeforeAll
  97     static void setup() {
  98         ThreadFactory factory = Thread.ofPlatform().daemon().factory();
  99         threadPool = Executors.newCachedThreadPool(factory);
 100         customScheduler = (_, task) -> threadPool.execute(task);
 101     }
 102 
 103     /**
 104      * Returns the Thread.Builder to create the virtual thread.
 105      */
 106     static Stream<Thread.Builder.OfVirtual> threadBuilders() {
 107         if (VThreadScheduler.supportsCustomScheduler()) {
 108             return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler));
 109         } else {
 110             return Stream.of(Thread.ofVirtual());
 111         }
 112     }
 113 
 114     /**
 115      * SocketChannel read/write, no blocking.
 116      */
 117     @ParameterizedTest
 118     @MethodSource("threadBuilders")
 119     void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
 120         VThreadRunner.run(builder, () -> {
 121             try (var connection = new Connection()) {
 122                 SocketChannel sc1 = connection.channel1();
 123                 SocketChannel sc2 = connection.channel2();
 124 
 125                 // write to sc1
 126                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 127                 int n = sc1.write(bb);
 128                 assertTrue(n > 0);
 129 
 130                 // read from sc2 should not block
 131                 bb = ByteBuffer.allocate(10);
 132                 n = sc2.read(bb);
 133                 assertTrue(n > 0);
 134                 assertTrue(bb.get(0) == 'X');
 135             }
 136         });
 137     }
 138 
 139     /**
 140      * Virtual thread blocks in SocketChannel read.
 141      */
 142     @ParameterizedTest
 143     @MethodSource("threadBuilders")
 144     void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception {
 145         VThreadRunner.run(builder, () -> {
 146             try (var connection = new Connection()) {
 147                 SocketChannel sc1 = connection.channel1();
 148                 SocketChannel sc2 = connection.channel2();
 149 
 150                 // write to sc1 when current thread blocks in sc2.read
 151                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 152                 runAfterParkedAsync(() -> sc1.write(bb1));
 153 
 154                 // read from sc2 should block
 155                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 156                 int n = sc2.read(bb2);
 157                 assertTrue(n > 0);
 158                 assertTrue(bb2.get(0) == 'X');
 159             }
 160         });
 161     }
 162 
 163     /**
 164      * Virtual thread blocks in SocketChannel write.
 165      */
 166     @ParameterizedTest
 167     @MethodSource("threadBuilders")
 168     void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception {
 169         VThreadRunner.run(builder, () -> {
 170             try (var connection = new Connection()) {
 171                 SocketChannel sc1 = connection.channel1();
 172                 SocketChannel sc2 = connection.channel2();
 173 
 174                 // read from sc2 to EOF when current thread blocks in sc1.write
 175                 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
 176 
 177                 // write to sc1 should block
 178                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
 179                 for (int i=0; i<1000; i++) {
 180                     int n = sc1.write(bb);
 181                     assertTrue(n > 0);
 182                     bb.clear();
 183                 }
 184                 sc1.close();
 185 
 186                 // wait for reader to finish
 187                 reader.join();
 188             }
 189         });
 190     }
 191 
 192     /**
 193      * SocketChannel close while virtual thread blocked in read.
 194      */
 195     @ParameterizedTest
 196     @MethodSource("threadBuilders")
 197     void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 198         VThreadRunner.run(builder, () -> {
 199             try (var connection = new Connection()) {
 200                 SocketChannel sc = connection.channel1();
 201                 runAfterParkedAsync(sc::close);
 202                 try {
 203                     int n = sc.read(ByteBuffer.allocate(100));
 204                     fail("read returned " + n);
 205                 } catch (AsynchronousCloseException expected) { }
 206             }
 207         });
 208     }
 209 
 210     /**
 211      * SocketChannel shutdownInput while virtual thread blocked in read.
 212      */
 213     @ParameterizedTest
 214     @MethodSource("threadBuilders")
 215     void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception {
 216         VThreadRunner.run(builder, () -> {
 217             try (var connection = new Connection()) {
 218                 SocketChannel sc = connection.channel1();
 219                 runAfterParkedAsync(sc::shutdownInput);
 220                 int n = sc.read(ByteBuffer.allocate(100));
 221                 assertEquals(-1, n);
 222                 assertTrue(sc.isOpen());
 223             }
 224         });
 225     }
 226 
 227     /**
 228      * Virtual thread interrupted while blocked in SocketChannel read.
 229      */
 230     @ParameterizedTest
 231     @MethodSource("threadBuilders")
 232     void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 233         VThreadRunner.run(builder, () -> {
 234             try (var connection = new Connection()) {
 235                 SocketChannel sc = connection.channel1();
 236 
 237                 // interrupt current thread when it blocks in read
 238                 Thread thisThread = Thread.currentThread();
 239                 runAfterParkedAsync(thisThread::interrupt);
 240 
 241                 try {
 242                     int n = sc.read(ByteBuffer.allocate(100));
 243                     fail("read returned " + n);
 244                 } catch (ClosedByInterruptException expected) {
 245                     assertTrue(Thread.interrupted());
 246                 }
 247             }
 248         });
 249     }
 250 
 251     /**
 252      * SocketChannel close while virtual thread blocked in write.
 253      */
 254     @ParameterizedTest
 255     @MethodSource("threadBuilders")
 256     void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 257         VThreadRunner.run(builder, () -> {
 258             boolean done = false;
 259             while (!done) {
 260                 try (var connection = new Connection()) {
 261                     SocketChannel sc = connection.channel1();
 262 
 263                     // close sc when current thread blocks in write
 264                     runAfterParkedAsync(sc::close, true);
 265 
 266                     // write until channel is closed
 267                     try {
 268                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 269                         for (;;) {
 270                             int n = sc.write(bb);
 271                             assertTrue(n > 0);
 272                             bb.clear();
 273                         }
 274                     } catch (AsynchronousCloseException expected) {
 275                         // closed when blocked in write
 276                         done = true;
 277                     } catch (ClosedChannelException e) {
 278                         // closed but not blocked in write, need to retry test
 279                         System.err.format("%s, need to retry!%n", e);
 280                     }
 281                 }
 282             }
 283         });
 284     }
 285 
 286     /**
 287      * SocketChannel shutdownOutput while virtual thread blocked in write.
 288      */
 289     @ParameterizedTest
 290     @MethodSource("threadBuilders")
 291     void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception {
 292         VThreadRunner.run(builder, () -> {
 293             try (var connection = new Connection()) {
 294                 SocketChannel sc = connection.channel1();
 295 
 296                 // shutdown output when current thread blocks in write
 297                 runAfterParkedAsync(sc::shutdownOutput);
 298                 try {
 299                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
 300                     for (;;) {
 301                         int n = sc.write(bb);
 302                         assertTrue(n > 0);
 303                         bb.clear();
 304                     }
 305                 } catch (ClosedChannelException e) {
 306                     // expected
 307                 }
 308                 assertTrue(sc.isOpen());
 309             }
 310         });
 311     }
 312 
 313     /**
 314      * Virtual thread interrupted while blocked in SocketChannel write.
 315      */
 316     @ParameterizedTest
 317     @MethodSource("threadBuilders")
 318     void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 319         VThreadRunner.run(builder, () -> {
 320             boolean done = false;
 321             while (!done) {
 322                 try (var connection = new Connection()) {
 323                     SocketChannel sc = connection.channel1();
 324 
 325                     // interrupt current thread when it blocks in write
 326                     Thread thisThread = Thread.currentThread();
 327                     runAfterParkedAsync(thisThread::interrupt, true);
 328 
 329                     // write until channel is closed
 330                     try {
 331                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 332                         for (;;) {
 333                             int n = sc.write(bb);
 334                             assertTrue(n > 0);
 335                             bb.clear();
 336                         }
 337                     } catch (ClosedByInterruptException e) {
 338                         // closed when blocked in write
 339                         assertTrue(Thread.interrupted());
 340                         done = true;
 341                     } catch (ClosedChannelException e) {
 342                         // closed but not blocked in write, need to retry test
 343                         System.err.format("%s, need to retry!%n", e);
 344                     }
 345                 }
 346             }
 347         });
 348     }
 349 
 350     /**
 351      * Virtual thread blocks in SocketChannel adaptor read.
 352      */
 353     @ParameterizedTest
 354     @MethodSource("threadBuilders")
 355     void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception {
 356         testSocketAdaptorRead(builder, 0);
 357     }
 358 
 359     /**
 360      * Virtual thread blocks in SocketChannel adaptor read with timeout.
 361      */
 362     @ParameterizedTest
 363     @MethodSource("threadBuilders")
 364     void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception {
 365         testSocketAdaptorRead(builder, 60_000);
 366     }
 367 
 368     private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder,
 369                                        int timeout) throws Exception {
 370         VThreadRunner.run(builder, () -> {
 371             try (var connection = new Connection()) {
 372                 SocketChannel sc1 = connection.channel1();
 373                 SocketChannel sc2 = connection.channel2();
 374 
 375                 // write to sc1 when currnet thread blocks reading from sc2
 376                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 377                 runAfterParkedAsync(() -> sc1.write(bb));
 378 
 379                 // read from sc2 should block
 380                 byte[] array = new byte[100];
 381                 if (timeout > 0)
 382                     sc2.socket().setSoTimeout(timeout);
 383                 int n = sc2.socket().getInputStream().read(array);
 384                 assertTrue(n > 0);
 385                 assertTrue(array[0] == 'X');
 386             }
 387         });
 388     }
 389 
 390     /**
 391      * ServerSocketChannel accept, no blocking.
 392      */
 393     @ParameterizedTest
 394     @MethodSource("threadBuilders")
 395     void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception {
 396         VThreadRunner.run(builder, () -> {
 397             try (var ssc = ServerSocketChannel.open()) {
 398                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 399                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
 400                 // accept should not block
 401                 var sc2 = ssc.accept();
 402                 sc1.close();
 403                 sc2.close();
 404             }
 405         });
 406     }
 407 
 408     /**
 409      * Virtual thread blocks in ServerSocketChannel accept.
 410      */
 411     @ParameterizedTest
 412     @MethodSource("threadBuilders")
 413     void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception {
 414         VThreadRunner.run(builder, () -> {
 415             try (var ssc = ServerSocketChannel.open()) {
 416                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 417                 var sc1 = SocketChannel.open();
 418 
 419                 // connect when current thread when it blocks in accept
 420                 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
 421 
 422                 // accept should block
 423                 var sc2 = ssc.accept();
 424                 sc1.close();
 425                 sc2.close();
 426             }
 427         });
 428     }
 429 
 430     /**
 431      * SeverSocketChannel close while virtual thread blocked in accept.
 432      */
 433     @ParameterizedTest
 434     @MethodSource("threadBuilders")
 435     void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 436         VThreadRunner.run(builder, () -> {
 437             try (var ssc = ServerSocketChannel.open()) {
 438                 InetAddress lh = InetAddress.getLoopbackAddress();
 439                 ssc.bind(new InetSocketAddress(lh, 0));
 440                 runAfterParkedAsync(ssc::close);
 441                 try {
 442                     SocketChannel sc = ssc.accept();
 443                     sc.close();
 444                     fail("connection accepted???");
 445                 } catch (AsynchronousCloseException expected) { }
 446             }
 447         });
 448     }
 449 
 450     /**
 451      * Virtual thread interrupted while blocked in ServerSocketChannel accept.
 452      */
 453     @ParameterizedTest
 454     @MethodSource("threadBuilders")
 455     void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 456         VThreadRunner.run(builder, () -> {
 457             try (var ssc = ServerSocketChannel.open()) {
 458                 InetAddress lh = InetAddress.getLoopbackAddress();
 459                 ssc.bind(new InetSocketAddress(lh, 0));
 460 
 461                 // interrupt current thread when it blocks in accept
 462                 Thread thisThread = Thread.currentThread();
 463                 runAfterParkedAsync(thisThread::interrupt);
 464 
 465                 try {
 466                     SocketChannel sc = ssc.accept();
 467                     sc.close();
 468                     fail("connection accepted???");
 469                 } catch (ClosedByInterruptException expected) {
 470                     assertTrue(Thread.interrupted());
 471                 }
 472             }
 473         });
 474     }
 475 
 476     /**
 477      * Virtual thread blocks in ServerSocketChannel adaptor accept.
 478      */
 479     @ParameterizedTest
 480     @MethodSource("threadBuilders")
 481     void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception {
 482         testSocketChannelAdaptorAccept(builder, 0);
 483     }
 484 
 485     /**
 486      * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
 487      */
 488     @ParameterizedTest
 489     @MethodSource("threadBuilders")
 490     void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception {
 491         testSocketChannelAdaptorAccept(builder, 60_000);
 492     }
 493 
 494     private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder,
 495                                                 int timeout) throws Exception {
 496         VThreadRunner.run(builder, () -> {
 497             try (var ssc = ServerSocketChannel.open()) {
 498                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 499                 var sc = SocketChannel.open();
 500 
 501                 // interrupt current thread when it blocks in accept
 502                 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
 503 
 504                 // accept should block
 505                 if (timeout > 0)
 506                     ssc.socket().setSoTimeout(timeout);
 507                 Socket s = ssc.socket().accept();
 508                 sc.close();
 509                 s.close();
 510             }
 511         });
 512     }
 513 
 514     /**
 515      * DatagramChannel receive/send, no blocking.
 516      */
 517     @ParameterizedTest
 518     @MethodSource("threadBuilders")
 519     void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception {
 520         VThreadRunner.run(builder, () -> {
 521             try (DatagramChannel dc1 = DatagramChannel.open();
 522                  DatagramChannel dc2 = DatagramChannel.open()) {
 523 
 524                 InetAddress lh = InetAddress.getLoopbackAddress();
 525                 dc2.bind(new InetSocketAddress(lh, 0));
 526 
 527                 // send should not block
 528                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 529                 int n = dc1.send(bb, dc2.getLocalAddress());
 530                 assertTrue(n > 0);
 531 
 532                 // receive should not block
 533                 bb = ByteBuffer.allocate(10);
 534                 dc2.receive(bb);
 535                 assertTrue(bb.get(0) == 'X');
 536             }
 537         });
 538     }
 539 
 540     /**
 541      * Virtual thread blocks in DatagramChannel receive.
 542      */
 543     @ParameterizedTest
 544     @MethodSource("threadBuilders")
 545     void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception {
 546         VThreadRunner.run(builder, () -> {
 547             try (DatagramChannel dc1 = DatagramChannel.open();
 548                  DatagramChannel dc2 = DatagramChannel.open()) {
 549 
 550                 InetAddress lh = InetAddress.getLoopbackAddress();
 551                 dc2.bind(new InetSocketAddress(lh, 0));
 552 
 553                 // send from dc1 when current thread blocked in dc2.receive
 554                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 555                 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
 556 
 557                 // read from dc2 should block
 558                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 559                 dc2.receive(bb2);
 560                 assertTrue(bb2.get(0) == 'X');
 561             }
 562         });
 563     }
 564 
 565     /**
 566      * DatagramChannel close while virtual thread blocked in receive.
 567      */
 568     @ParameterizedTest
 569     @MethodSource("threadBuilders")
 570     void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 571         VThreadRunner.run(builder, () -> {
 572             try (DatagramChannel dc = DatagramChannel.open()) {
 573                 InetAddress lh = InetAddress.getLoopbackAddress();
 574                 dc.bind(new InetSocketAddress(lh, 0));
 575                 runAfterParkedAsync(dc::close);
 576                 try {
 577                     dc.receive(ByteBuffer.allocate(100));
 578                     fail("receive returned");
 579                 } catch (AsynchronousCloseException expected) { }
 580             }
 581         });
 582     }
 583 
 584     /**
 585      * Virtual thread interrupted while blocked in DatagramChannel receive.
 586      */
 587     @ParameterizedTest
 588     @MethodSource("threadBuilders")
 589     void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 590         VThreadRunner.run(builder, () -> {
 591             try (DatagramChannel dc = DatagramChannel.open()) {
 592                 InetAddress lh = InetAddress.getLoopbackAddress();
 593                 dc.bind(new InetSocketAddress(lh, 0));
 594 
 595                 // interrupt current thread when it blocks in receive
 596                 Thread thisThread = Thread.currentThread();
 597                 runAfterParkedAsync(thisThread::interrupt);
 598 
 599                 try {
 600                     dc.receive(ByteBuffer.allocate(100));
 601                     fail("receive returned");
 602                 } catch (ClosedByInterruptException expected) {
 603                     assertTrue(Thread.interrupted());
 604                 }
 605             }
 606         });
 607     }
 608 
 609     /**
 610      * Virtual thread blocks in DatagramSocket adaptor receive.
 611      */
 612     @ParameterizedTest
 613     @MethodSource("threadBuilders")
 614     void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception {
 615         testDatagramSocketAdaptorReceive(builder, 0);
 616     }
 617 
 618     /**
 619      * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
 620      */
 621     @ParameterizedTest
 622     @MethodSource("threadBuilders")
 623     void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception {
 624         testDatagramSocketAdaptorReceive(builder, 60_000);
 625     }
 626 
 627     private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder,
 628                                                   int timeout) throws Exception {
 629         VThreadRunner.run(builder, () -> {
 630             try (DatagramChannel dc1 = DatagramChannel.open();
 631                  DatagramChannel dc2 = DatagramChannel.open()) {
 632 
 633                 InetAddress lh = InetAddress.getLoopbackAddress();
 634                 dc2.bind(new InetSocketAddress(lh, 0));
 635 
 636                 // send from dc1 when current thread blocks in dc2 receive
 637                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 638                 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
 639 
 640                 // receive should block
 641                 byte[] array = new byte[100];
 642                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 643                 if (timeout > 0)
 644                     dc2.socket().setSoTimeout(timeout);
 645                 dc2.socket().receive(p);
 646                 assertTrue(p.getLength() == 3 && array[0] == 'X');
 647             }
 648         });
 649     }
 650 
 651     /**
 652      * DatagramChannel close while virtual thread blocked in adaptor receive.
 653      */
 654     @ParameterizedTest
 655     @MethodSource("threadBuilders")
 656     void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception {
 657         testDatagramSocketAdaptorReceiveAsyncClose(builder, 0);
 658     }
 659 
 660     /**
 661      * DatagramChannel close while virtual thread blocked in adaptor receive
 662      * with timeout.
 663      */
 664     @ParameterizedTest
 665     @MethodSource("threadBuilders")
 666     void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception {
 667         testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000);
 668     }
 669 
 670     private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder,
 671                                                             int timeout) throws Exception {
 672         VThreadRunner.run(builder, () -> {
 673             try (DatagramChannel dc = DatagramChannel.open()) {
 674                 InetAddress lh = InetAddress.getLoopbackAddress();
 675                 dc.bind(new InetSocketAddress(lh, 0));
 676 
 677                 byte[] array = new byte[100];
 678                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 679                 if (timeout > 0)
 680                     dc.socket().setSoTimeout(timeout);
 681 
 682                 // close channel/socket when current thread blocks in receive
 683                 runAfterParkedAsync(dc::close);
 684 
 685                 assertThrows(SocketException.class, () -> dc.socket().receive(p));
 686             }
 687         });
 688     }
 689 
 690     /**
 691      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
 692      */
 693     @ParameterizedTest
 694     @MethodSource("threadBuilders")
 695     void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception {
 696         testDatagramSocketAdaptorReceiveInterrupt(builder, 0);
 697     }
 698 
 699     /**
 700      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
 701      * with timeout.
 702      */
 703     @ParameterizedTest
 704     @MethodSource("threadBuilders")
 705     void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception {
 706         testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000);
 707     }
 708 
 709     private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder,
 710                                                            int timeout) throws Exception {
 711         VThreadRunner.run(builder, () -> {
 712             try (DatagramChannel dc = DatagramChannel.open()) {
 713                 InetAddress lh = InetAddress.getLoopbackAddress();
 714                 dc.bind(new InetSocketAddress(lh, 0));
 715 
 716                 byte[] array = new byte[100];
 717                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 718                 if (timeout > 0)
 719                     dc.socket().setSoTimeout(timeout);
 720 
 721                 // interrupt current thread when it blocks in receive
 722                 Thread thisThread = Thread.currentThread();
 723                 runAfterParkedAsync(thisThread::interrupt);
 724 
 725                 try {
 726                     dc.socket().receive(p);
 727                     fail();
 728                 } catch (ClosedByInterruptException expected) {
 729                     assertTrue(Thread.interrupted());
 730                 }
 731             }
 732         });
 733     }
 734 
 735     /**
 736      * Pipe read/write, no blocking.
 737      */
 738     @ParameterizedTest
 739     @MethodSource("threadBuilders")
 740     void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
 741         VThreadRunner.run(builder, () -> {
 742             Pipe p = Pipe.open();
 743             try (Pipe.SinkChannel sink = p.sink();
 744                  Pipe.SourceChannel source = p.source()) {
 745 
 746                 // write should not block
 747                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 748                 int n = sink.write(bb);
 749                 assertTrue(n > 0);
 750 
 751                 // read should not block
 752                 bb = ByteBuffer.allocate(10);
 753                 n = source.read(bb);
 754                 assertTrue(n > 0);
 755                 assertTrue(bb.get(0) == 'X');
 756             }
 757         });
 758     }
 759 
 760     /**
 761      * Virtual thread blocks in Pipe.SourceChannel read.
 762      */
 763     @ParameterizedTest
 764     @MethodSource("threadBuilders")
 765     void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception {
 766         VThreadRunner.run(builder, () -> {
 767             Pipe p = Pipe.open();
 768             try (Pipe.SinkChannel sink = p.sink();
 769                  Pipe.SourceChannel source = p.source()) {
 770 
 771                 // write from sink when current thread blocks reading from source
 772                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 773                 runAfterParkedAsync(() -> sink.write(bb1));
 774 
 775                 // read should block
 776                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 777                 int n = source.read(bb2);
 778                 assertTrue(n > 0);
 779                 assertTrue(bb2.get(0) == 'X');
 780             }
 781         });
 782     }
 783 
 784     /**
 785      * Virtual thread blocks in Pipe.SinkChannel write.
 786      */
 787     @ParameterizedTest
 788     @MethodSource("threadBuilders")
 789     void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception {
 790         VThreadRunner.run(builder, () -> {
 791             Pipe p = Pipe.open();
 792             try (Pipe.SinkChannel sink = p.sink();
 793                  Pipe.SourceChannel source = p.source()) {
 794 
 795                 // read from source to EOF when current thread blocking in write
 796                 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
 797 
 798                 // write to sink should block
 799                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
 800                 for (int i=0; i<1000; i++) {
 801                     int n = sink.write(bb);
 802                     assertTrue(n > 0);
 803                     bb.clear();
 804                 }
 805                 sink.close();
 806 
 807                 // wait for reader to finish
 808                 reader.join();
 809             }
 810         });
 811     }
 812 
 813     /**
 814      * Pipe.SourceChannel close while virtual thread blocked in read.
 815      */
 816     @ParameterizedTest
 817     @MethodSource("threadBuilders")
 818     void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 819         VThreadRunner.run(builder, () -> {
 820             Pipe p = Pipe.open();
 821             try (Pipe.SinkChannel sink = p.sink();
 822                  Pipe.SourceChannel source = p.source()) {
 823                 runAfterParkedAsync(source::close);
 824                 try {
 825                     int n = source.read(ByteBuffer.allocate(100));
 826                     fail("read returned " + n);
 827                 } catch (AsynchronousCloseException expected) { }
 828             }
 829         });
 830     }
 831 
 832     /**
 833      * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
 834      */
 835     @ParameterizedTest
 836     @MethodSource("threadBuilders")
 837     void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 838         VThreadRunner.run(builder, () -> {
 839             Pipe p = Pipe.open();
 840             try (Pipe.SinkChannel sink = p.sink();
 841                  Pipe.SourceChannel source = p.source()) {
 842 
 843                 // interrupt current thread when it blocks reading from source
 844                 Thread thisThread = Thread.currentThread();
 845                 runAfterParkedAsync(thisThread::interrupt);
 846 
 847                 try {
 848                     int n = source.read(ByteBuffer.allocate(100));
 849                     fail("read returned " + n);
 850                 } catch (ClosedByInterruptException expected) {
 851                     assertTrue(Thread.interrupted());
 852                 }
 853             }
 854         });
 855     }
 856 
 857     /**
 858      * Pipe.SinkChannel close while virtual thread blocked in write.
 859      */
 860     @ParameterizedTest
 861     @MethodSource("threadBuilders")
 862     void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 863         VThreadRunner.run(builder, () -> {
 864             boolean done = false;
 865             while (!done) {
 866                 Pipe p = Pipe.open();
 867                 try (Pipe.SinkChannel sink = p.sink();
 868                      Pipe.SourceChannel source = p.source()) {
 869 
 870                     // close sink when current thread blocks in write
 871                     runAfterParkedAsync(sink::close, true);
 872 
 873                     // write until channel is closed
 874                     try {
 875                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 876                         for (;;) {
 877                             int n = sink.write(bb);
 878                             assertTrue(n > 0);
 879                             bb.clear();
 880                         }
 881                     } catch (AsynchronousCloseException e) {
 882                         // closed when blocked in write
 883                         done = true;
 884                     } catch (ClosedChannelException e) {
 885                         // closed but not blocked in write, need to retry test
 886                         System.err.format("%s, need to retry!%n", e);
 887                     }
 888                 }
 889             }
 890         });
 891     }
 892 
 893     /**
 894      * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
 895      */
 896     @ParameterizedTest
 897     @MethodSource("threadBuilders")
 898     void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 899         VThreadRunner.run(builder, () -> {
 900             boolean done = false;
 901             while (!done) {
 902                 Pipe p = Pipe.open();
 903                 try (Pipe.SinkChannel sink = p.sink();
 904                      Pipe.SourceChannel source = p.source()) {
 905 
 906                     // interrupt current thread when it blocks in write
 907                     Thread thisThread = Thread.currentThread();
 908                     runAfterParkedAsync(thisThread::interrupt, true);
 909 
 910                     // write until channel is closed
 911                     try {
 912                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 913                         for (;;) {
 914                             int n = sink.write(bb);
 915                             assertTrue(n > 0);
 916                             bb.clear();
 917                         }
 918                     } catch (ClosedByInterruptException expected) {
 919                         // closed when blocked in write
 920                         assertTrue(Thread.interrupted());
 921                         done = true;
 922                     } catch (ClosedChannelException e) {
 923                         // closed but not blocked in write, need to retry test
 924                         System.err.format("%s, need to retry!%n", e);
 925                     }
 926                 }
 927             }
 928         });
 929     }
 930 
 931     /**
 932      * Creates a loopback connection
 933      */
 934     static class Connection implements Closeable {
 935         private final SocketChannel sc1;
 936         private final SocketChannel sc2;
 937         Connection() throws IOException {
 938             var lh = InetAddress.getLoopbackAddress();
 939             try (var listener = ServerSocketChannel.open()) {
 940                 listener.bind(new InetSocketAddress(lh, 0));
 941                 SocketChannel sc1 = SocketChannel.open();
 942                 SocketChannel sc2 = null;
 943                 try {
 944                     sc1.socket().connect(listener.getLocalAddress());
 945                     sc2 = listener.accept();
 946                 } catch (IOException ioe) {
 947                     sc1.close();
 948                     throw ioe;
 949                 }
 950                 this.sc1 = sc1;
 951                 this.sc2 = sc2;
 952             }
 953         }
 954         SocketChannel channel1() {
 955             return sc1;
 956         }
 957         SocketChannel channel2() {
 958             return sc2;
 959         }
 960         @Override
 961         public void close() throws IOException {
 962             sc1.close();
 963             sc2.close();
 964         }
 965     }
 966 
 967     /**
 968      * Read from a channel until all bytes have been read or an I/O error occurs.
 969      */
 970     static void readToEOF(ReadableByteChannel rbc) throws IOException {
 971         ByteBuffer bb = ByteBuffer.allocate(16*1024);
 972         int n;
 973         while ((n = rbc.read(bb)) > 0) {
 974             bb.clear();
 975         }
 976     }
 977 
 978     @FunctionalInterface
 979     interface ThrowingRunnable {
 980         void run() throws Exception;
 981     }
 982 
 983     /**
 984      * Runs the given task asynchronously after the current virtual thread parks.
 985      * @param writing if the thread will block in write
 986      * @return the thread started to run the task
 987      */
 988     private static Thread runAfterParkedAsync(ThrowingRunnable task, boolean writing) {
 989         Thread target = Thread.currentThread();
 990         if (!target.isVirtual())
 991             throw new WrongThreadException();
 992         return Thread.ofPlatform().daemon().start(() -> {
 993             try {
 994                 // wait for target thread to park
 995                 while (!isWaiting(target)) {
 996                     Thread.sleep(20);
 997                 }
 998 
 999                 // if the target thread is parked in write then we nudge it a few times
1000                 // to avoid wakeup with some bytes written
1001                 if (writing) {
1002                     for (int i = 0; i < 3; i++) {
1003                         LockSupport.unpark(target);
1004                         while (!isWaiting(target)) {
1005                             Thread.sleep(20);
1006                         }
1007                     }
1008                 }
1009 
1010                 task.run();
1011 
1012             } catch (Exception e) {
1013                 e.printStackTrace();
1014             }
1015         });
1016     }
1017 
1018     private static Thread runAfterParkedAsync(ThrowingRunnable task) {
1019         return runAfterParkedAsync(task, false);
1020     }
1021 
1022     /**
1023      * Return true if the given Thread is parked.
1024      */
1025     private static boolean isWaiting(Thread target) {
1026         Thread.State state = target.getState();
1027         assertNotEquals(Thread.State.TERMINATED, state);
1028         return (state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING);
1029     }
1030 }