1 /*
   2  * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /*
  25  * @test id=default
  26  * @bug 8284161
  27  * @summary Test virtual threads doing blocking I/O on NIO channels
  28  * @library /test/lib
  29  * @run junit/othervm/timeout=480 BlockingChannelOps
  30  */
  31 
  32 /*
  33  * @test id=poller-modes
  34  * @requires (os.family == "linux") | (os.family == "mac")
  35  * @library /test/lib
  36  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
  37  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
  38  * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 BlockingChannelOps
  39  */
  40 
  41 /*
  42  * @test id=io_uring
  43  * @requires os.family == "linux"
  44  * @library /test/lib
  45  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring=true BlockingChannelOps
  46  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring=true BlockingChannelOps
  47  * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring=true BlockingChannelOps
  48  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
  49  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
  50  * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
  51  */
  52 
  53 /*
  54  * @test id=no-vmcontinuations
  55  * @requires vm.continuations
  56  * @library /test/lib
  57  * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
  58  */
  59 
  60 import java.io.Closeable;
  61 import java.io.IOException;
  62 import java.net.DatagramPacket;
  63 import java.net.InetAddress;
  64 import java.net.InetSocketAddress;
  65 import java.net.Socket;
  66 import java.net.SocketAddress;
  67 import java.net.SocketException;
  68 import java.nio.ByteBuffer;
  69 import java.nio.channels.AsynchronousCloseException;
  70 import java.nio.channels.ClosedByInterruptException;
  71 import java.nio.channels.ClosedChannelException;
  72 import java.nio.channels.DatagramChannel;
  73 import java.nio.channels.Pipe;
  74 import java.nio.channels.ReadableByteChannel;
  75 import java.nio.channels.ServerSocketChannel;
  76 import java.nio.channels.SocketChannel;
  77 import java.nio.channels.WritableByteChannel;
  78 import java.util.concurrent.ExecutorService;
  79 import java.util.concurrent.Executors;
  80 import java.util.concurrent.ThreadFactory;
  81 import java.util.concurrent.locks.LockSupport;
  82 import java.util.stream.Stream;
  83 
  84 import jdk.test.lib.thread.VThreadRunner;
  85 import jdk.test.lib.thread.VThreadScheduler;
  86 
  87 import org.junit.jupiter.api.BeforeAll;
  88 import org.junit.jupiter.params.ParameterizedTest;
  89 import org.junit.jupiter.params.provider.MethodSource;
  90 import static org.junit.jupiter.api.Assertions.*;
  91 
  92 class BlockingChannelOps {
  93     private static ExecutorService threadPool;
  94     private static Thread.VirtualThreadScheduler customScheduler;
  95 
  96     @BeforeAll
  97     static void setup() {
  98         ThreadFactory factory = Thread.ofPlatform().daemon().factory();
  99         threadPool = Executors.newCachedThreadPool(factory);
 100         customScheduler = new Thread.VirtualThreadScheduler() {
 101             @Override
 102             public void onStart(Thread.VirtualThreadTask task) {
 103                 threadPool.execute(task);
 104             }
 105             @Override
 106             public void onContinue(Thread.VirtualThreadTask task) {
 107                 threadPool.execute(task);
 108             }
 109         };
 110     }
 111 
 112     /**
 113      * Returns the Thread.Builder to create the virtual thread.
 114      */
 115     static Stream<Thread.Builder.OfVirtual> threadBuilders() {
 116         if (VThreadScheduler.supportsCustomScheduler()) {
 117             return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler));
 118         } else {
 119             return Stream.of(Thread.ofVirtual());
 120         }
 121     }
 122 
 123     /**
 124      * SocketChannel read/write, no blocking.
 125      */
 126     @ParameterizedTest
 127     @MethodSource("threadBuilders")
 128     void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
 129         VThreadRunner.run(builder, () -> {
 130             try (var connection = new Connection()) {
 131                 SocketChannel sc1 = connection.channel1();
 132                 SocketChannel sc2 = connection.channel2();
 133 
 134                 // write to sc1
 135                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 136                 int n = sc1.write(bb);
 137                 assertTrue(n > 0);
 138 
 139                 // read from sc2 should not block
 140                 bb = ByteBuffer.allocate(10);
 141                 n = sc2.read(bb);
 142                 assertTrue(n > 0);
 143                 assertTrue(bb.get(0) == 'X');
 144             }
 145         });
 146     }
 147 
 148     /**
 149      * Virtual thread blocks in SocketChannel read.
 150      */
 151     @ParameterizedTest
 152     @MethodSource("threadBuilders")
 153     void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception {
 154         VThreadRunner.run(builder, () -> {
 155             try (var connection = new Connection()) {
 156                 SocketChannel sc1 = connection.channel1();
 157                 SocketChannel sc2 = connection.channel2();
 158 
 159                 // write to sc1 when current thread blocks in sc2.read
 160                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 161                 runAfterParkedAsync(() -> sc1.write(bb1));
 162 
 163                 // read from sc2 should block
 164                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 165                 int n = sc2.read(bb2);
 166                 assertTrue(n > 0);
 167                 assertTrue(bb2.get(0) == 'X');
 168             }
 169         });
 170     }
 171 
 172     /**
 173      * Virtual thread blocks in SocketChannel write.
 174      */
 175     @ParameterizedTest
 176     @MethodSource("threadBuilders")
 177     void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception {
 178         VThreadRunner.run(builder, () -> {
 179             try (var connection = new Connection()) {
 180                 SocketChannel sc1 = connection.channel1();
 181                 SocketChannel sc2 = connection.channel2();
 182 
 183                 // read from sc2 to EOF when current thread blocks in sc1.write
 184                 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
 185 
 186                 // write to sc1 should block
 187                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
 188                 for (int i=0; i<1000; i++) {
 189                     int n = sc1.write(bb);
 190                     assertTrue(n > 0);
 191                     bb.clear();
 192                 }
 193                 sc1.close();
 194 
 195                 // wait for reader to finish
 196                 reader.join();
 197             }
 198         });
 199     }
 200 
 201     /**
 202      * SocketChannel close while virtual thread blocked in read.
 203      */
 204     @ParameterizedTest
 205     @MethodSource("threadBuilders")
 206     void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 207         VThreadRunner.run(builder, () -> {
 208             try (var connection = new Connection()) {
 209                 SocketChannel sc = connection.channel1();
 210                 runAfterParkedAsync(sc::close);
 211                 try {
 212                     int n = sc.read(ByteBuffer.allocate(100));
 213                     fail("read returned " + n);
 214                 } catch (AsynchronousCloseException expected) { }
 215             }
 216         });
 217     }
 218 
 219     /**
 220      * SocketChannel shutdownInput while virtual thread blocked in read.
 221      */
 222     @ParameterizedTest
 223     @MethodSource("threadBuilders")
 224     void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception {
 225         VThreadRunner.run(builder, () -> {
 226             try (var connection = new Connection()) {
 227                 SocketChannel sc = connection.channel1();
 228                 runAfterParkedAsync(sc::shutdownInput);
 229                 int n = sc.read(ByteBuffer.allocate(100));
 230                 assertEquals(-1, n);
 231                 assertTrue(sc.isOpen());
 232             }
 233         });
 234     }
 235 
 236     /**
 237      * Virtual thread interrupted while blocked in SocketChannel read.
 238      */
 239     @ParameterizedTest
 240     @MethodSource("threadBuilders")
 241     void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 242         VThreadRunner.run(builder, () -> {
 243             try (var connection = new Connection()) {
 244                 SocketChannel sc = connection.channel1();
 245 
 246                 // interrupt current thread when it blocks in read
 247                 Thread thisThread = Thread.currentThread();
 248                 runAfterParkedAsync(thisThread::interrupt);
 249 
 250                 try {
 251                     int n = sc.read(ByteBuffer.allocate(100));
 252                     fail("read returned " + n);
 253                 } catch (ClosedByInterruptException expected) {
 254                     assertTrue(Thread.interrupted());
 255                 }
 256             }
 257         });
 258     }
 259 
 260     /**
 261      * SocketChannel close while virtual thread blocked in write.
 262      */
 263     @ParameterizedTest
 264     @MethodSource("threadBuilders")
 265     void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 266         VThreadRunner.run(builder, () -> {
 267             boolean done = false;
 268             while (!done) {
 269                 try (var connection = new Connection()) {
 270                     SocketChannel sc = connection.channel1();
 271 
 272                     // close sc when current thread blocks in write
 273                     runAfterParkedAsync(sc::close, true);
 274 
 275                     // write until channel is closed
 276                     try {
 277                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 278                         for (;;) {
 279                             int n = sc.write(bb);
 280                             assertTrue(n > 0);
 281                             bb.clear();
 282                         }
 283                     } catch (AsynchronousCloseException expected) {
 284                         // closed when blocked in write
 285                         done = true;
 286                     } catch (ClosedChannelException e) {
 287                         // closed but not blocked in write, need to retry test
 288                         System.err.format("%s, need to retry!%n", e);
 289                     }
 290                 }
 291             }
 292         });
 293     }
 294 
 295     /**
 296      * SocketChannel shutdownOutput while virtual thread blocked in write.
 297      */
 298     @ParameterizedTest
 299     @MethodSource("threadBuilders")
 300     void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception {
 301         VThreadRunner.run(builder, () -> {
 302             try (var connection = new Connection()) {
 303                 SocketChannel sc = connection.channel1();
 304 
 305                 // shutdown output when current thread blocks in write
 306                 runAfterParkedAsync(sc::shutdownOutput);
 307                 try {
 308                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
 309                     for (;;) {
 310                         int n = sc.write(bb);
 311                         assertTrue(n > 0);
 312                         bb.clear();
 313                     }
 314                 } catch (ClosedChannelException e) {
 315                     // expected
 316                 }
 317                 assertTrue(sc.isOpen());
 318             }
 319         });
 320     }
 321 
 322     /**
 323      * Virtual thread interrupted while blocked in SocketChannel write.
 324      */
 325     @ParameterizedTest
 326     @MethodSource("threadBuilders")
 327     void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 328         VThreadRunner.run(builder, () -> {
 329             boolean done = false;
 330             while (!done) {
 331                 try (var connection = new Connection()) {
 332                     SocketChannel sc = connection.channel1();
 333 
 334                     // interrupt current thread when it blocks in write
 335                     Thread thisThread = Thread.currentThread();
 336                     runAfterParkedAsync(thisThread::interrupt, true);
 337 
 338                     // write until channel is closed
 339                     try {
 340                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 341                         for (;;) {
 342                             int n = sc.write(bb);
 343                             assertTrue(n > 0);
 344                             bb.clear();
 345                         }
 346                     } catch (ClosedByInterruptException e) {
 347                         // closed when blocked in write
 348                         assertTrue(Thread.interrupted());
 349                         done = true;
 350                     } catch (ClosedChannelException e) {
 351                         // closed but not blocked in write, need to retry test
 352                         System.err.format("%s, need to retry!%n", e);
 353                     }
 354                 }
 355             }
 356         });
 357     }
 358 
 359     /**
 360      * Virtual thread blocks in SocketChannel adaptor read.
 361      */
 362     @ParameterizedTest
 363     @MethodSource("threadBuilders")
 364     void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception {
 365         testSocketAdaptorRead(builder, 0);
 366     }
 367 
 368     /**
 369      * Virtual thread blocks in SocketChannel adaptor read with timeout.
 370      */
 371     @ParameterizedTest
 372     @MethodSource("threadBuilders")
 373     void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception {
 374         testSocketAdaptorRead(builder, 60_000);
 375     }
 376 
 377     private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder,
 378                                        int timeout) throws Exception {
 379         VThreadRunner.run(builder, () -> {
 380             try (var connection = new Connection()) {
 381                 SocketChannel sc1 = connection.channel1();
 382                 SocketChannel sc2 = connection.channel2();
 383 
 384                 // write to sc1 when currnet thread blocks reading from sc2
 385                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 386                 runAfterParkedAsync(() -> sc1.write(bb));
 387 
 388                 // read from sc2 should block
 389                 byte[] array = new byte[100];
 390                 if (timeout > 0)
 391                     sc2.socket().setSoTimeout(timeout);
 392                 int n = sc2.socket().getInputStream().read(array);
 393                 assertTrue(n > 0);
 394                 assertTrue(array[0] == 'X');
 395             }
 396         });
 397     }
 398 
 399     /**
 400      * ServerSocketChannel accept, no blocking.
 401      */
 402     @ParameterizedTest
 403     @MethodSource("threadBuilders")
 404     void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception {
 405         VThreadRunner.run(builder, () -> {
 406             try (var ssc = ServerSocketChannel.open()) {
 407                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 408                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
 409                 // accept should not block
 410                 var sc2 = ssc.accept();
 411                 sc1.close();
 412                 sc2.close();
 413             }
 414         });
 415     }
 416 
 417     /**
 418      * Virtual thread blocks in ServerSocketChannel accept.
 419      */
 420     @ParameterizedTest
 421     @MethodSource("threadBuilders")
 422     void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception {
 423         VThreadRunner.run(builder, () -> {
 424             try (var ssc = ServerSocketChannel.open()) {
 425                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 426                 var sc1 = SocketChannel.open();
 427 
 428                 // connect when current thread when it blocks in accept
 429                 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
 430 
 431                 // accept should block
 432                 var sc2 = ssc.accept();
 433                 sc1.close();
 434                 sc2.close();
 435             }
 436         });
 437     }
 438 
 439     /**
 440      * SeverSocketChannel close while virtual thread blocked in accept.
 441      */
 442     @ParameterizedTest
 443     @MethodSource("threadBuilders")
 444     void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 445         VThreadRunner.run(builder, () -> {
 446             try (var ssc = ServerSocketChannel.open()) {
 447                 InetAddress lh = InetAddress.getLoopbackAddress();
 448                 ssc.bind(new InetSocketAddress(lh, 0));
 449                 runAfterParkedAsync(ssc::close);
 450                 try {
 451                     SocketChannel sc = ssc.accept();
 452                     sc.close();
 453                     fail("connection accepted???");
 454                 } catch (AsynchronousCloseException expected) { }
 455             }
 456         });
 457     }
 458 
 459     /**
 460      * Virtual thread interrupted while blocked in ServerSocketChannel accept.
 461      */
 462     @ParameterizedTest
 463     @MethodSource("threadBuilders")
 464     void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 465         VThreadRunner.run(builder, () -> {
 466             try (var ssc = ServerSocketChannel.open()) {
 467                 InetAddress lh = InetAddress.getLoopbackAddress();
 468                 ssc.bind(new InetSocketAddress(lh, 0));
 469 
 470                 // interrupt current thread when it blocks in accept
 471                 Thread thisThread = Thread.currentThread();
 472                 runAfterParkedAsync(thisThread::interrupt);
 473 
 474                 try {
 475                     SocketChannel sc = ssc.accept();
 476                     sc.close();
 477                     fail("connection accepted???");
 478                 } catch (ClosedByInterruptException expected) {
 479                     assertTrue(Thread.interrupted());
 480                 }
 481             }
 482         });
 483     }
 484 
 485     /**
 486      * Virtual thread blocks in ServerSocketChannel adaptor accept.
 487      */
 488     @ParameterizedTest
 489     @MethodSource("threadBuilders")
 490     void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception {
 491         testSocketChannelAdaptorAccept(builder, 0);
 492     }
 493 
 494     /**
 495      * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
 496      */
 497     @ParameterizedTest
 498     @MethodSource("threadBuilders")
 499     void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception {
 500         testSocketChannelAdaptorAccept(builder, 60_000);
 501     }
 502 
 503     private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder,
 504                                                 int timeout) throws Exception {
 505         VThreadRunner.run(builder, () -> {
 506             try (var ssc = ServerSocketChannel.open()) {
 507                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 508                 var sc = SocketChannel.open();
 509 
 510                 // interrupt current thread when it blocks in accept
 511                 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
 512 
 513                 // accept should block
 514                 if (timeout > 0)
 515                     ssc.socket().setSoTimeout(timeout);
 516                 Socket s = ssc.socket().accept();
 517                 sc.close();
 518                 s.close();
 519             }
 520         });
 521     }
 522 
 523     /**
 524      * DatagramChannel receive/send, no blocking.
 525      */
 526     @ParameterizedTest
 527     @MethodSource("threadBuilders")
 528     void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception {
 529         VThreadRunner.run(builder, () -> {
 530             try (DatagramChannel dc1 = DatagramChannel.open();
 531                  DatagramChannel dc2 = DatagramChannel.open()) {
 532 
 533                 InetAddress lh = InetAddress.getLoopbackAddress();
 534                 dc2.bind(new InetSocketAddress(lh, 0));
 535 
 536                 // send should not block
 537                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 538                 int n = dc1.send(bb, dc2.getLocalAddress());
 539                 assertTrue(n > 0);
 540 
 541                 // receive should not block
 542                 bb = ByteBuffer.allocate(10);
 543                 dc2.receive(bb);
 544                 assertTrue(bb.get(0) == 'X');
 545             }
 546         });
 547     }
 548 
 549     /**
 550      * Virtual thread blocks in DatagramChannel receive.
 551      */
 552     @ParameterizedTest
 553     @MethodSource("threadBuilders")
 554     void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception {
 555         VThreadRunner.run(builder, () -> {
 556             try (DatagramChannel dc1 = DatagramChannel.open();
 557                  DatagramChannel dc2 = DatagramChannel.open()) {
 558 
 559                 InetAddress lh = InetAddress.getLoopbackAddress();
 560                 dc2.bind(new InetSocketAddress(lh, 0));
 561 
 562                 // send from dc1 when current thread blocked in dc2.receive
 563                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 564                 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
 565 
 566                 // read from dc2 should block
 567                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 568                 dc2.receive(bb2);
 569                 assertTrue(bb2.get(0) == 'X');
 570             }
 571         });
 572     }
 573 
 574     /**
 575      * DatagramChannel close while virtual thread blocked in receive.
 576      */
 577     @ParameterizedTest
 578     @MethodSource("threadBuilders")
 579     void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 580         VThreadRunner.run(builder, () -> {
 581             try (DatagramChannel dc = DatagramChannel.open()) {
 582                 InetAddress lh = InetAddress.getLoopbackAddress();
 583                 dc.bind(new InetSocketAddress(lh, 0));
 584                 runAfterParkedAsync(dc::close);
 585                 try {
 586                     dc.receive(ByteBuffer.allocate(100));
 587                     fail("receive returned");
 588                 } catch (AsynchronousCloseException expected) { }
 589             }
 590         });
 591     }
 592 
 593     /**
 594      * Virtual thread interrupted while blocked in DatagramChannel receive.
 595      */
 596     @ParameterizedTest
 597     @MethodSource("threadBuilders")
 598     void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 599         VThreadRunner.run(builder, () -> {
 600             try (DatagramChannel dc = DatagramChannel.open()) {
 601                 InetAddress lh = InetAddress.getLoopbackAddress();
 602                 dc.bind(new InetSocketAddress(lh, 0));
 603 
 604                 // interrupt current thread when it blocks in receive
 605                 Thread thisThread = Thread.currentThread();
 606                 runAfterParkedAsync(thisThread::interrupt);
 607 
 608                 try {
 609                     dc.receive(ByteBuffer.allocate(100));
 610                     fail("receive returned");
 611                 } catch (ClosedByInterruptException expected) {
 612                     assertTrue(Thread.interrupted());
 613                 }
 614             }
 615         });
 616     }
 617 
 618     /**
 619      * Virtual thread blocks in DatagramSocket adaptor receive.
 620      */
 621     @ParameterizedTest
 622     @MethodSource("threadBuilders")
 623     void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception {
 624         testDatagramSocketAdaptorReceive(builder, 0);
 625     }
 626 
 627     /**
 628      * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
 629      */
 630     @ParameterizedTest
 631     @MethodSource("threadBuilders")
 632     void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception {
 633         testDatagramSocketAdaptorReceive(builder, 60_000);
 634     }
 635 
 636     private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder,
 637                                                   int timeout) throws Exception {
 638         VThreadRunner.run(builder, () -> {
 639             try (DatagramChannel dc1 = DatagramChannel.open();
 640                  DatagramChannel dc2 = DatagramChannel.open()) {
 641 
 642                 InetAddress lh = InetAddress.getLoopbackAddress();
 643                 dc2.bind(new InetSocketAddress(lh, 0));
 644 
 645                 // send from dc1 when current thread blocks in dc2 receive
 646                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 647                 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
 648 
 649                 // receive should block
 650                 byte[] array = new byte[100];
 651                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 652                 if (timeout > 0)
 653                     dc2.socket().setSoTimeout(timeout);
 654                 dc2.socket().receive(p);
 655                 assertTrue(p.getLength() == 3 && array[0] == 'X');
 656             }
 657         });
 658     }
 659 
 660     /**
 661      * DatagramChannel close while virtual thread blocked in adaptor receive.
 662      */
 663     @ParameterizedTest
 664     @MethodSource("threadBuilders")
 665     void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception {
 666         testDatagramSocketAdaptorReceiveAsyncClose(builder, 0);
 667     }
 668 
 669     /**
 670      * DatagramChannel close while virtual thread blocked in adaptor receive
 671      * with timeout.
 672      */
 673     @ParameterizedTest
 674     @MethodSource("threadBuilders")
 675     void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception {
 676         testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_000);
 677     }
 678 
 679     private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder,
 680                                                             int timeout) throws Exception {
 681         VThreadRunner.run(builder, () -> {
 682             try (DatagramChannel dc = DatagramChannel.open()) {
 683                 InetAddress lh = InetAddress.getLoopbackAddress();
 684                 dc.bind(new InetSocketAddress(lh, 0));
 685 
 686                 byte[] array = new byte[100];
 687                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 688                 if (timeout > 0)
 689                     dc.socket().setSoTimeout(timeout);
 690 
 691                 // close channel/socket when current thread blocks in receive
 692                 runAfterParkedAsync(dc::close);
 693 
 694                 assertThrows(SocketException.class, () -> dc.socket().receive(p));
 695             }
 696         });
 697     }
 698 
 699     /**
 700      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
 701      */
 702     @ParameterizedTest
 703     @MethodSource("threadBuilders")
 704     void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception {
 705         testDatagramSocketAdaptorReceiveInterrupt(builder, 0);
 706     }
 707 
 708     /**
 709      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
 710      * with timeout.
 711      */
 712     @ParameterizedTest
 713     @MethodSource("threadBuilders")
 714     void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception {
 715         testDatagramSocketAdaptorReceiveInterrupt(builder, 60_000);
 716     }
 717 
 718     private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder,
 719                                                            int timeout) throws Exception {
 720         VThreadRunner.run(builder, () -> {
 721             try (DatagramChannel dc = DatagramChannel.open()) {
 722                 InetAddress lh = InetAddress.getLoopbackAddress();
 723                 dc.bind(new InetSocketAddress(lh, 0));
 724 
 725                 byte[] array = new byte[100];
 726                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 727                 if (timeout > 0)
 728                     dc.socket().setSoTimeout(timeout);
 729 
 730                 // interrupt current thread when it blocks in receive
 731                 Thread thisThread = Thread.currentThread();
 732                 runAfterParkedAsync(thisThread::interrupt);
 733 
 734                 try {
 735                     dc.socket().receive(p);
 736                     fail();
 737                 } catch (ClosedByInterruptException expected) {
 738                     assertTrue(Thread.interrupted());
 739                 }
 740             }
 741         });
 742     }
 743 
 744     /**
 745      * Pipe read/write, no blocking.
 746      */
 747     @ParameterizedTest
 748     @MethodSource("threadBuilders")
 749     void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
 750         VThreadRunner.run(builder, () -> {
 751             Pipe p = Pipe.open();
 752             try (Pipe.SinkChannel sink = p.sink();
 753                  Pipe.SourceChannel source = p.source()) {
 754 
 755                 // write should not block
 756                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 757                 int n = sink.write(bb);
 758                 assertTrue(n > 0);
 759 
 760                 // read should not block
 761                 bb = ByteBuffer.allocate(10);
 762                 n = source.read(bb);
 763                 assertTrue(n > 0);
 764                 assertTrue(bb.get(0) == 'X');
 765             }
 766         });
 767     }
 768 
 769     /**
 770      * Virtual thread blocks in Pipe.SourceChannel read.
 771      */
 772     @ParameterizedTest
 773     @MethodSource("threadBuilders")
 774     void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception {
 775         VThreadRunner.run(builder, () -> {
 776             Pipe p = Pipe.open();
 777             try (Pipe.SinkChannel sink = p.sink();
 778                  Pipe.SourceChannel source = p.source()) {
 779 
 780                 // write from sink when current thread blocks reading from source
 781                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 782                 runAfterParkedAsync(() -> sink.write(bb1));
 783 
 784                 // read should block
 785                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 786                 int n = source.read(bb2);
 787                 assertTrue(n > 0);
 788                 assertTrue(bb2.get(0) == 'X');
 789             }
 790         });
 791     }
 792 
 793     /**
 794      * Virtual thread blocks in Pipe.SinkChannel write.
 795      */
 796     @ParameterizedTest
 797     @MethodSource("threadBuilders")
 798     void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception {
 799         VThreadRunner.run(builder, () -> {
 800             Pipe p = Pipe.open();
 801             try (Pipe.SinkChannel sink = p.sink();
 802                  Pipe.SourceChannel source = p.source()) {
 803 
 804                 // read from source to EOF when current thread blocking in write
 805                 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
 806 
 807                 // write to sink should block
 808                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
 809                 for (int i=0; i<1000; i++) {
 810                     int n = sink.write(bb);
 811                     assertTrue(n > 0);
 812                     bb.clear();
 813                 }
 814                 sink.close();
 815 
 816                 // wait for reader to finish
 817                 reader.join();
 818             }
 819         });
 820     }
 821 
 822     /**
 823      * Pipe.SourceChannel close while virtual thread blocked in read.
 824      */
 825     @ParameterizedTest
 826     @MethodSource("threadBuilders")
 827     void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 828         VThreadRunner.run(builder, () -> {
 829             Pipe p = Pipe.open();
 830             try (Pipe.SinkChannel sink = p.sink();
 831                  Pipe.SourceChannel source = p.source()) {
 832                 runAfterParkedAsync(source::close);
 833                 try {
 834                     int n = source.read(ByteBuffer.allocate(100));
 835                     fail("read returned " + n);
 836                 } catch (AsynchronousCloseException expected) { }
 837             }
 838         });
 839     }
 840 
 841     /**
 842      * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
 843      */
 844     @ParameterizedTest
 845     @MethodSource("threadBuilders")
 846     void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 847         VThreadRunner.run(builder, () -> {
 848             Pipe p = Pipe.open();
 849             try (Pipe.SinkChannel sink = p.sink();
 850                  Pipe.SourceChannel source = p.source()) {
 851 
 852                 // interrupt current thread when it blocks reading from source
 853                 Thread thisThread = Thread.currentThread();
 854                 runAfterParkedAsync(thisThread::interrupt);
 855 
 856                 try {
 857                     int n = source.read(ByteBuffer.allocate(100));
 858                     fail("read returned " + n);
 859                 } catch (ClosedByInterruptException expected) {
 860                     assertTrue(Thread.interrupted());
 861                 }
 862             }
 863         });
 864     }
 865 
 866     /**
 867      * Pipe.SinkChannel close while virtual thread blocked in write.
 868      */
 869     @ParameterizedTest
 870     @MethodSource("threadBuilders")
 871     void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 872         VThreadRunner.run(builder, () -> {
 873             boolean done = false;
 874             while (!done) {
 875                 Pipe p = Pipe.open();
 876                 try (Pipe.SinkChannel sink = p.sink();
 877                      Pipe.SourceChannel source = p.source()) {
 878 
 879                     // close sink when current thread blocks in write
 880                     runAfterParkedAsync(sink::close, true);
 881 
 882                     // write until channel is closed
 883                     try {
 884                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 885                         for (;;) {
 886                             int n = sink.write(bb);
 887                             assertTrue(n > 0);
 888                             bb.clear();
 889                         }
 890                     } catch (AsynchronousCloseException e) {
 891                         // closed when blocked in write
 892                         done = true;
 893                     } catch (ClosedChannelException e) {
 894                         // closed but not blocked in write, need to retry test
 895                         System.err.format("%s, need to retry!%n", e);
 896                     }
 897                 }
 898             }
 899         });
 900     }
 901 
 902     /**
 903      * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
 904      */
 905     @ParameterizedTest
 906     @MethodSource("threadBuilders")
 907     void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 908         VThreadRunner.run(builder, () -> {
 909             boolean done = false;
 910             while (!done) {
 911                 Pipe p = Pipe.open();
 912                 try (Pipe.SinkChannel sink = p.sink();
 913                      Pipe.SourceChannel source = p.source()) {
 914 
 915                     // interrupt current thread when it blocks in write
 916                     Thread thisThread = Thread.currentThread();
 917                     runAfterParkedAsync(thisThread::interrupt, true);
 918 
 919                     // write until channel is closed
 920                     try {
 921                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 922                         for (;;) {
 923                             int n = sink.write(bb);
 924                             assertTrue(n > 0);
 925                             bb.clear();
 926                         }
 927                     } catch (ClosedByInterruptException expected) {
 928                         // closed when blocked in write
 929                         assertTrue(Thread.interrupted());
 930                         done = true;
 931                     } catch (ClosedChannelException e) {
 932                         // closed but not blocked in write, need to retry test
 933                         System.err.format("%s, need to retry!%n", e);
 934                     }
 935                 }
 936             }
 937         });
 938     }
 939 
 940     /**
 941      * Creates a loopback connection
 942      */
 943     static class Connection implements Closeable {
 944         private final SocketChannel sc1;
 945         private final SocketChannel sc2;
 946         Connection() throws IOException {
 947             var lh = InetAddress.getLoopbackAddress();
 948             try (var listener = ServerSocketChannel.open()) {
 949                 listener.bind(new InetSocketAddress(lh, 0));
 950                 SocketChannel sc1 = SocketChannel.open();
 951                 SocketChannel sc2 = null;
 952                 try {
 953                     sc1.socket().connect(listener.getLocalAddress());
 954                     sc2 = listener.accept();
 955                 } catch (IOException ioe) {
 956                     sc1.close();
 957                     throw ioe;
 958                 }
 959                 this.sc1 = sc1;
 960                 this.sc2 = sc2;
 961             }
 962         }
 963         SocketChannel channel1() {
 964             return sc1;
 965         }
 966         SocketChannel channel2() {
 967             return sc2;
 968         }
 969         @Override
 970         public void close() throws IOException {
 971             sc1.close();
 972             sc2.close();
 973         }
 974     }
 975 
 976     /**
 977      * Read from a channel until all bytes have been read or an I/O error occurs.
 978      */
 979     static void readToEOF(ReadableByteChannel rbc) throws IOException {
 980         ByteBuffer bb = ByteBuffer.allocate(16*1024);
 981         int n;
 982         while ((n = rbc.read(bb)) > 0) {
 983             bb.clear();
 984         }
 985     }
 986 
 987     @FunctionalInterface
 988     interface ThrowingRunnable {
 989         void run() throws Exception;
 990     }
 991 
 992     /**
 993      * Runs the given task asynchronously after the current virtual thread parks.
 994      * @param writing if the thread will block in write
 995      * @return the thread started to run the task
 996      */
 997     private static Thread runAfterParkedAsync(ThrowingRunnable task, boolean writing) {
 998         Thread target = Thread.currentThread();
 999         if (!target.isVirtual())
1000             throw new WrongThreadException();
1001         return Thread.ofPlatform().daemon().start(() -> {
1002             try {
1003                 // wait for target thread to park
1004                 while (!isWaiting(target)) {
1005                     Thread.sleep(20);
1006                 }
1007 
1008                 // if the target thread is parked in write then we nudge it a few times
1009                 // to avoid wakeup with some bytes written
1010                 if (writing) {
1011                     for (int i = 0; i < 3; i++) {
1012                         LockSupport.unpark(target);
1013                         while (!isWaiting(target)) {
1014                             Thread.sleep(20);
1015                         }
1016                     }
1017                 }
1018 
1019                 task.run();
1020 
1021             } catch (Exception e) {
1022                 e.printStackTrace();
1023             }
1024         });
1025     }
1026 
1027     private static Thread runAfterParkedAsync(ThrowingRunnable task) {
1028         return runAfterParkedAsync(task, false);
1029     }
1030 
1031     /**
1032      * Return true if the given Thread is parked.
1033      */
1034     private static boolean isWaiting(Thread target) {
1035         Thread.State state = target.getState();
1036         assertNotEquals(Thread.State.TERMINATED, state);
1037         return (state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING);
1038     }
1039 }