1 /*
   2  * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /*
  25  * @test id=default
  26  * @bug 8284161
  27  * @summary Test virtual threads doing blocking I/O on NIO channels
  28  * @library /test/lib
  29  * @run junit/othervm/timeout=480 BlockingChannelOps
  30  */
  31 
  32 /*
  33  * @test id=poller-modes
  34  * @requires (os.family == "linux") | (os.family == "mac")
  35  * @library /test/lib
  36  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
  37  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
  38  * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 BlockingChannelOps
  39  */
  40 
  41 /*
  42  * @test id=io_uring
  43  * @requires os.family == "linux"
  44  * @library /test/lib
  45  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring=true BlockingChannelOps
  46  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring=true BlockingChannelOps
  47  * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring=true BlockingChannelOps
  48  */
  49 
  50 /*
  51  * @test id=no-vmcontinuations
  52  * @requires vm.continuations
  53  * @library /test/lib
  54  * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
  55  */
  56 
  57 import java.io.Closeable;
  58 import java.io.IOException;
  59 import java.net.DatagramPacket;
  60 import java.net.InetAddress;
  61 import java.net.InetSocketAddress;
  62 import java.net.Socket;
  63 import java.net.SocketAddress;
  64 import java.net.SocketException;
  65 import java.nio.ByteBuffer;
  66 import java.nio.channels.AsynchronousCloseException;
  67 import java.nio.channels.ClosedByInterruptException;
  68 import java.nio.channels.ClosedChannelException;
  69 import java.nio.channels.DatagramChannel;
  70 import java.nio.channels.Pipe;
  71 import java.nio.channels.ReadableByteChannel;
  72 import java.nio.channels.ServerSocketChannel;
  73 import java.nio.channels.SocketChannel;
  74 import java.nio.channels.WritableByteChannel;
  75 import java.util.concurrent.ExecutorService;
  76 import java.util.concurrent.Executors;
  77 import java.util.concurrent.ThreadFactory;
  78 import java.util.concurrent.locks.LockSupport;
  79 import java.util.stream.Stream;
  80 
  81 import jdk.test.lib.thread.VThreadRunner;
  82 import jdk.test.lib.thread.VThreadScheduler;
  83 
  84 import org.junit.jupiter.api.BeforeAll;
  85 import org.junit.jupiter.params.ParameterizedTest;
  86 import org.junit.jupiter.params.provider.MethodSource;
  87 import static org.junit.jupiter.api.Assertions.*;
  88 
  89 class BlockingChannelOps {
  90     private static ExecutorService threadPool;
  91     private static Thread.VirtualThreadScheduler customScheduler;
  92 
  93     @BeforeAll
  94     static void setup() {
  95         ThreadFactory factory = Thread.ofPlatform().daemon().factory();
  96         threadPool = Executors.newCachedThreadPool(factory);
  97         customScheduler = (_, task) -> threadPool.execute(task);
  98     }
  99 
 100     /**
 101      * Returns the Thread.Builder to create the virtual thread.
 102      */
 103     static Stream<Thread.Builder.OfVirtual> threadBuilders() {
 104         if (VThreadScheduler.supportsCustomScheduler()) {
 105             return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler));
 106         } else {
 107             return Stream.of(Thread.ofVirtual());
 108         }
 109     }
 110 
 111     /**
 112      * SocketChannel read/write, no blocking.
 113      */
 114     @ParameterizedTest
 115     @MethodSource("threadBuilders")
 116     void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
 117         VThreadRunner.run(builder, () -> {
 118             try (var connection = new Connection()) {
 119                 SocketChannel sc1 = connection.channel1();
 120                 SocketChannel sc2 = connection.channel2();
 121 
 122                 // write to sc1
 123                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 124                 int n = sc1.write(bb);
 125                 assertTrue(n > 0);
 126 
 127                 // read from sc2 should not block
 128                 bb = ByteBuffer.allocate(10);
 129                 n = sc2.read(bb);
 130                 assertTrue(n > 0);
 131                 assertTrue(bb.get(0) == 'X');
 132             }
 133         });
 134     }
 135 
 136     /**
 137      * Virtual thread blocks in SocketChannel read.
 138      */
 139     @ParameterizedTest
 140     @MethodSource("threadBuilders")
 141     void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception {
 142         VThreadRunner.run(builder, () -> {
 143             try (var connection = new Connection()) {
 144                 SocketChannel sc1 = connection.channel1();
 145                 SocketChannel sc2 = connection.channel2();
 146 
 147                 // write to sc1 when current thread blocks in sc2.read
 148                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 149                 runAfterParkedAsync(() -> sc1.write(bb1));
 150 
 151                 // read from sc2 should block
 152                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 153                 int n = sc2.read(bb2);
 154                 assertTrue(n > 0);
 155                 assertTrue(bb2.get(0) == 'X');
 156             }
 157         });
 158     }
 159 
 160     /**
 161      * Virtual thread blocks in SocketChannel write.
 162      */
 163     @ParameterizedTest
 164     @MethodSource("threadBuilders")
 165     void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception {
 166         VThreadRunner.run(builder, () -> {
 167             try (var connection = new Connection()) {
 168                 SocketChannel sc1 = connection.channel1();
 169                 SocketChannel sc2 = connection.channel2();
 170 
 171                 // read from sc2 to EOF when current thread blocks in sc1.write
 172                 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
 173 
 174                 // write to sc1 should block
 175                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
 176                 for (int i=0; i<1000; i++) {
 177                     int n = sc1.write(bb);
 178                     assertTrue(n > 0);
 179                     bb.clear();
 180                 }
 181                 sc1.close();
 182 
 183                 // wait for reader to finish
 184                 reader.join();
 185             }
 186         });
 187     }
 188 
 189     /**
 190      * SocketChannel close while virtual thread blocked in read.
 191      */
 192     @ParameterizedTest
 193     @MethodSource("threadBuilders")
 194     void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 195         VThreadRunner.run(builder, () -> {
 196             try (var connection = new Connection()) {
 197                 SocketChannel sc = connection.channel1();
 198                 runAfterParkedAsync(sc::close);
 199                 try {
 200                     int n = sc.read(ByteBuffer.allocate(100));
 201                     fail("read returned " + n);
 202                 } catch (AsynchronousCloseException expected) { }
 203             }
 204         });
 205     }
 206 
 207     /**
 208      * SocketChannel shutdownInput while virtual thread blocked in read.
 209      */
 210     @ParameterizedTest
 211     @MethodSource("threadBuilders")
 212     void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception {
 213         VThreadRunner.run(builder, () -> {
 214             try (var connection = new Connection()) {
 215                 SocketChannel sc = connection.channel1();
 216                 runAfterParkedAsync(sc::shutdownInput);
 217                 int n = sc.read(ByteBuffer.allocate(100));
 218                 assertEquals(-1, n);
 219                 assertTrue(sc.isOpen());
 220             }
 221         });
 222     }
 223 
 224     /**
 225      * Virtual thread interrupted while blocked in SocketChannel read.
 226      */
 227     @ParameterizedTest
 228     @MethodSource("threadBuilders")
 229     void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 230         VThreadRunner.run(builder, () -> {
 231             try (var connection = new Connection()) {
 232                 SocketChannel sc = connection.channel1();
 233 
 234                 // interrupt current thread when it blocks in read
 235                 Thread thisThread = Thread.currentThread();
 236                 runAfterParkedAsync(thisThread::interrupt);
 237 
 238                 try {
 239                     int n = sc.read(ByteBuffer.allocate(100));
 240                     fail("read returned " + n);
 241                 } catch (ClosedByInterruptException expected) {
 242                     assertTrue(Thread.interrupted());
 243                 }
 244             }
 245         });
 246     }
 247 
 248     /**
 249      * SocketChannel close while virtual thread blocked in write.
 250      */
 251     @ParameterizedTest
 252     @MethodSource("threadBuilders")
 253     void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 254         VThreadRunner.run(builder, () -> {
 255             boolean done = false;
 256             while (!done) {
 257                 try (var connection = new Connection()) {
 258                     SocketChannel sc = connection.channel1();
 259 
 260                     // close sc when current thread blocks in write
 261                     runAfterParkedAsync(sc::close, true);
 262 
 263                     // write until channel is closed
 264                     try {
 265                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 266                         for (;;) {
 267                             int n = sc.write(bb);
 268                             assertTrue(n > 0);
 269                             bb.clear();
 270                         }
 271                     } catch (AsynchronousCloseException expected) {
 272                         // closed when blocked in write
 273                         done = true;
 274                     } catch (ClosedChannelException e) {
 275                         // closed but not blocked in write, need to retry test
 276                         System.err.format("%s, need to retry!%n", e);
 277                     }
 278                 }
 279             }
 280         });
 281     }
 282 
 283     /**
 284      * SocketChannel shutdownOutput while virtual thread blocked in write.
 285      */
 286     @ParameterizedTest
 287     @MethodSource("threadBuilders")
 288     void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception {
 289         VThreadRunner.run(builder, () -> {
 290             try (var connection = new Connection()) {
 291                 SocketChannel sc = connection.channel1();
 292 
 293                 // shutdown output when current thread blocks in write
 294                 runAfterParkedAsync(sc::shutdownOutput);
 295                 try {
 296                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
 297                     for (;;) {
 298                         int n = sc.write(bb);
 299                         assertTrue(n > 0);
 300                         bb.clear();
 301                     }
 302                 } catch (ClosedChannelException e) {
 303                     // expected
 304                 }
 305                 assertTrue(sc.isOpen());
 306             }
 307         });
 308     }
 309 
 310     /**
 311      * Virtual thread interrupted while blocked in SocketChannel write.
 312      */
 313     @ParameterizedTest
 314     @MethodSource("threadBuilders")
 315     void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 316         VThreadRunner.run(builder, () -> {
 317             boolean done = false;
 318             while (!done) {
 319                 try (var connection = new Connection()) {
 320                     SocketChannel sc = connection.channel1();
 321 
 322                     // interrupt current thread when it blocks in write
 323                     Thread thisThread = Thread.currentThread();
 324                     runAfterParkedAsync(thisThread::interrupt, true);
 325 
 326                     // write until channel is closed
 327                     try {
 328                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 329                         for (;;) {
 330                             int n = sc.write(bb);
 331                             assertTrue(n > 0);
 332                             bb.clear();
 333                         }
 334                     } catch (ClosedByInterruptException e) {
 335                         // closed when blocked in write
 336                         assertTrue(Thread.interrupted());
 337                         done = true;
 338                     } catch (ClosedChannelException e) {
 339                         // closed but not blocked in write, need to retry test
 340                         System.err.format("%s, need to retry!%n", e);
 341                     }
 342                 }
 343             }
 344         });
 345     }
 346 
 347     /**
 348      * Virtual thread blocks in SocketChannel adaptor read.
 349      */
 350     @ParameterizedTest
 351     @MethodSource("threadBuilders")
 352     void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception {
 353         testSocketAdaptorRead(builder, 0);
 354     }
 355 
 356     /**
 357      * Virtual thread blocks in SocketChannel adaptor read with timeout.
 358      */
 359     @ParameterizedTest
 360     @MethodSource("threadBuilders")
 361     void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception {
 362         testSocketAdaptorRead(builder, 60_000);
 363     }
 364 
 365     private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder,
 366                                        int timeout) throws Exception {
 367         VThreadRunner.run(builder, () -> {
 368             try (var connection = new Connection()) {
 369                 SocketChannel sc1 = connection.channel1();
 370                 SocketChannel sc2 = connection.channel2();
 371 
 372                 // write to sc1 when currnet thread blocks reading from sc2
 373                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 374                 runAfterParkedAsync(() -> sc1.write(bb));
 375 
 376                 // read from sc2 should block
 377                 byte[] array = new byte[100];
 378                 if (timeout > 0)
 379                     sc2.socket().setSoTimeout(timeout);
 380                 int n = sc2.socket().getInputStream().read(array);
 381                 assertTrue(n > 0);
 382                 assertTrue(array[0] == 'X');
 383             }
 384         });
 385     }
 386 
 387     /**
 388      * ServerSocketChannel accept, no blocking.
 389      */
 390     @ParameterizedTest
 391     @MethodSource("threadBuilders")
 392     void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception {
 393         VThreadRunner.run(builder, () -> {
 394             try (var ssc = ServerSocketChannel.open()) {
 395                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 396                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
 397                 // accept should not block
 398                 var sc2 = ssc.accept();
 399                 sc1.close();
 400                 sc2.close();
 401             }
 402         });
 403     }
 404 
 405     /**
 406      * Virtual thread blocks in ServerSocketChannel accept.
 407      */
 408     @ParameterizedTest
 409     @MethodSource("threadBuilders")
 410     void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception {
 411         VThreadRunner.run(builder, () -> {
 412             try (var ssc = ServerSocketChannel.open()) {
 413                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 414                 var sc1 = SocketChannel.open();
 415 
 416                 // connect when current thread when it blocks in accept
 417                 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
 418 
 419                 // accept should block
 420                 var sc2 = ssc.accept();
 421                 sc1.close();
 422                 sc2.close();
 423             }
 424         });
 425     }
 426 
 427     /**
 428      * SeverSocketChannel close while virtual thread blocked in accept.
 429      */
 430     @ParameterizedTest
 431     @MethodSource("threadBuilders")
 432     void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 433         VThreadRunner.run(builder, () -> {
 434             try (var ssc = ServerSocketChannel.open()) {
 435                 InetAddress lh = InetAddress.getLoopbackAddress();
 436                 ssc.bind(new InetSocketAddress(lh, 0));
 437                 runAfterParkedAsync(ssc::close);
 438                 try {
 439                     SocketChannel sc = ssc.accept();
 440                     sc.close();
 441                     fail("connection accepted???");
 442                 } catch (AsynchronousCloseException expected) { }
 443             }
 444         });
 445     }
 446 
 447     /**
 448      * Virtual thread interrupted while blocked in ServerSocketChannel accept.
 449      */
 450     @ParameterizedTest
 451     @MethodSource("threadBuilders")
 452     void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 453         VThreadRunner.run(builder, () -> {
 454             try (var ssc = ServerSocketChannel.open()) {
 455                 InetAddress lh = InetAddress.getLoopbackAddress();
 456                 ssc.bind(new InetSocketAddress(lh, 0));
 457 
 458                 // interrupt current thread when it blocks in accept
 459                 Thread thisThread = Thread.currentThread();
 460                 runAfterParkedAsync(thisThread::interrupt);
 461 
 462                 try {
 463                     SocketChannel sc = ssc.accept();
 464                     sc.close();
 465                     fail("connection accepted???");
 466                 } catch (ClosedByInterruptException expected) {
 467                     assertTrue(Thread.interrupted());
 468                 }
 469             }
 470         });
 471     }
 472 
 473     /**
 474      * Virtual thread blocks in ServerSocketChannel adaptor accept.
 475      */
 476     @ParameterizedTest
 477     @MethodSource("threadBuilders")
 478     void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception {
 479         testSocketChannelAdaptorAccept(builder, 0);
 480     }
 481 
 482     /**
 483      * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
 484      */
 485     @ParameterizedTest
 486     @MethodSource("threadBuilders")
 487     void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception {
 488         testSocketChannelAdaptorAccept(builder, 60_000);
 489     }
 490 
 491     private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder,
 492                                                 int timeout) throws Exception {
 493         VThreadRunner.run(builder, () -> {
 494             try (var ssc = ServerSocketChannel.open()) {
 495                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 496                 var sc = SocketChannel.open();
 497 
 498                 // interrupt current thread when it blocks in accept
 499                 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
 500 
 501                 // accept should block
 502                 if (timeout > 0)
 503                     ssc.socket().setSoTimeout(timeout);
 504                 Socket s = ssc.socket().accept();
 505                 sc.close();
 506                 s.close();
 507             }
 508         });
 509     }
 510 
 511     /**
 512      * DatagramChannel receive/send, no blocking.
 513      */
 514     @ParameterizedTest
 515     @MethodSource("threadBuilders")
 516     void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception {
 517         VThreadRunner.run(builder, () -> {
 518             try (DatagramChannel dc1 = DatagramChannel.open();
 519                  DatagramChannel dc2 = DatagramChannel.open()) {
 520 
 521                 InetAddress lh = InetAddress.getLoopbackAddress();
 522                 dc2.bind(new InetSocketAddress(lh, 0));
 523 
 524                 // send should not block
 525                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 526                 int n = dc1.send(bb, dc2.getLocalAddress());
 527                 assertTrue(n > 0);
 528 
 529                 // receive should not block
 530                 bb = ByteBuffer.allocate(10);
 531                 dc2.receive(bb);
 532                 assertTrue(bb.get(0) == 'X');
 533             }
 534         });
 535     }
 536 
 537     /**
 538      * Virtual thread blocks in DatagramChannel receive.
 539      */
 540     @ParameterizedTest
 541     @MethodSource("threadBuilders")
 542     void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception {
 543         VThreadRunner.run(builder, () -> {
 544             try (DatagramChannel dc1 = DatagramChannel.open();
 545                  DatagramChannel dc2 = DatagramChannel.open()) {
 546 
 547                 InetAddress lh = InetAddress.getLoopbackAddress();
 548                 dc2.bind(new InetSocketAddress(lh, 0));
 549 
 550                 // send from dc1 when current thread blocked in dc2.receive
 551                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 552                 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
 553 
 554                 // read from dc2 should block
 555                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 556                 dc2.receive(bb2);
 557                 assertTrue(bb2.get(0) == 'X');
 558             }
 559         });
 560     }
 561 
 562     /**
 563      * DatagramChannel close while virtual thread blocked in receive.
 564      */
 565     @ParameterizedTest
 566     @MethodSource("threadBuilders")
 567     void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 568         VThreadRunner.run(builder, () -> {
 569             try (DatagramChannel dc = DatagramChannel.open()) {
 570                 InetAddress lh = InetAddress.getLoopbackAddress();
 571                 dc.bind(new InetSocketAddress(lh, 0));
 572                 runAfterParkedAsync(dc::close);
 573                 try {
 574                     dc.receive(ByteBuffer.allocate(100));
 575                     fail("receive returned");
 576                 } catch (AsynchronousCloseException expected) { }
 577             }
 578         });
 579     }
 580 
 581     /**
 582      * Virtual thread interrupted while blocked in DatagramChannel receive.
 583      */
 584     @ParameterizedTest
 585     @MethodSource("threadBuilders")
 586     void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 587         VThreadRunner.run(builder, () -> {
 588             try (DatagramChannel dc = DatagramChannel.open()) {
 589                 InetAddress lh = InetAddress.getLoopbackAddress();
 590                 dc.bind(new InetSocketAddress(lh, 0));
 591 
 592                 // interrupt current thread when it blocks in receive
 593                 Thread thisThread = Thread.currentThread();
 594                 runAfterParkedAsync(thisThread::interrupt);
 595 
 596                 try {
 597                     dc.receive(ByteBuffer.allocate(100));
 598                     fail("receive returned");
 599                 } catch (ClosedByInterruptException expected) {
 600                     assertTrue(Thread.interrupted());
 601                 }
 602             }
 603         });
 604     }
 605 
 606     /**
 607      * Virtual thread blocks in DatagramSocket adaptor receive.
 608      */
 609     @ParameterizedTest
 610     @MethodSource("threadBuilders")
 611     void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception {
 612         testDatagramSocketAdaptorReceive(builder, 0);
 613     }
 614 
 615     /**
 616      * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
 617      */
 618     @ParameterizedTest
 619     @MethodSource("threadBuilders")
 620     void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception {
 621         testDatagramSocketAdaptorReceive(builder, 60_000);
 622     }
 623 
 624     private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder,
 625                                                   int timeout) throws Exception {
 626         VThreadRunner.run(builder, () -> {
 627             try (DatagramChannel dc1 = DatagramChannel.open();
 628                  DatagramChannel dc2 = DatagramChannel.open()) {
 629 
 630                 InetAddress lh = InetAddress.getLoopbackAddress();
 631                 dc2.bind(new InetSocketAddress(lh, 0));
 632 
 633                 // send from dc1 when current thread blocks in dc2 receive
 634                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 635                 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
 636 
 637                 // receive should block
 638                 byte[] array = new byte[100];
 639                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 640                 if (timeout > 0)
 641                     dc2.socket().setSoTimeout(timeout);
 642                 dc2.socket().receive(p);
 643                 assertTrue(p.getLength() == 3 && array[0] == 'X');
 644             }
 645         });
 646     }
 647 
 648     /**
 649      * DatagramChannel close while virtual thread blocked in adaptor receive.
 650      */
 651     @ParameterizedTest
 652     @MethodSource("threadBuilders")
 653     void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception {
 654         testDatagramSocketAdaptorReceiveAsyncClose(builder, 0);
 655     }
 656 
 657     /**
 658      * DatagramChannel close while virtual thread blocked in adaptor receive
 659      * with timeout.
 660      */
 661     @ParameterizedTest
 662     @MethodSource("threadBuilders")
 663     void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception {
 664         testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000);
 665     }
 666 
 667     private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder,
 668                                                             int timeout) throws Exception {
 669         VThreadRunner.run(builder, () -> {
 670             try (DatagramChannel dc = DatagramChannel.open()) {
 671                 InetAddress lh = InetAddress.getLoopbackAddress();
 672                 dc.bind(new InetSocketAddress(lh, 0));
 673 
 674                 byte[] array = new byte[100];
 675                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 676                 if (timeout > 0)
 677                     dc.socket().setSoTimeout(timeout);
 678 
 679                 // close channel/socket when current thread blocks in receive
 680                 runAfterParkedAsync(dc::close);
 681 
 682                 assertThrows(SocketException.class, () -> dc.socket().receive(p));
 683             }
 684         });
 685     }
 686 
 687     /**
 688      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
 689      */
 690     @ParameterizedTest
 691     @MethodSource("threadBuilders")
 692     void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception {
 693         testDatagramSocketAdaptorReceiveInterrupt(builder, 0);
 694     }
 695 
 696     /**
 697      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
 698      * with timeout.
 699      */
 700     @ParameterizedTest
 701     @MethodSource("threadBuilders")
 702     void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception {
 703         testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000);
 704     }
 705 
 706     private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder,
 707                                                            int timeout) throws Exception {
 708         VThreadRunner.run(builder, () -> {
 709             try (DatagramChannel dc = DatagramChannel.open()) {
 710                 InetAddress lh = InetAddress.getLoopbackAddress();
 711                 dc.bind(new InetSocketAddress(lh, 0));
 712 
 713                 byte[] array = new byte[100];
 714                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 715                 if (timeout > 0)
 716                     dc.socket().setSoTimeout(timeout);
 717 
 718                 // interrupt current thread when it blocks in receive
 719                 Thread thisThread = Thread.currentThread();
 720                 runAfterParkedAsync(thisThread::interrupt);
 721 
 722                 try {
 723                     dc.socket().receive(p);
 724                     fail();
 725                 } catch (ClosedByInterruptException expected) {
 726                     assertTrue(Thread.interrupted());
 727                 }
 728             }
 729         });
 730     }
 731 
 732     /**
 733      * Pipe read/write, no blocking.
 734      */
 735     @ParameterizedTest
 736     @MethodSource("threadBuilders")
 737     void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
 738         VThreadRunner.run(builder, () -> {
 739             Pipe p = Pipe.open();
 740             try (Pipe.SinkChannel sink = p.sink();
 741                  Pipe.SourceChannel source = p.source()) {
 742 
 743                 // write should not block
 744                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 745                 int n = sink.write(bb);
 746                 assertTrue(n > 0);
 747 
 748                 // read should not block
 749                 bb = ByteBuffer.allocate(10);
 750                 n = source.read(bb);
 751                 assertTrue(n > 0);
 752                 assertTrue(bb.get(0) == 'X');
 753             }
 754         });
 755     }
 756 
 757     /**
 758      * Virtual thread blocks in Pipe.SourceChannel read.
 759      */
 760     @ParameterizedTest
 761     @MethodSource("threadBuilders")
 762     void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception {
 763         VThreadRunner.run(builder, () -> {
 764             Pipe p = Pipe.open();
 765             try (Pipe.SinkChannel sink = p.sink();
 766                  Pipe.SourceChannel source = p.source()) {
 767 
 768                 // write from sink when current thread blocks reading from source
 769                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 770                 runAfterParkedAsync(() -> sink.write(bb1));
 771 
 772                 // read should block
 773                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 774                 int n = source.read(bb2);
 775                 assertTrue(n > 0);
 776                 assertTrue(bb2.get(0) == 'X');
 777             }
 778         });
 779     }
 780 
 781     /**
 782      * Virtual thread blocks in Pipe.SinkChannel write.
 783      */
 784     @ParameterizedTest
 785     @MethodSource("threadBuilders")
 786     void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception {
 787         VThreadRunner.run(builder, () -> {
 788             Pipe p = Pipe.open();
 789             try (Pipe.SinkChannel sink = p.sink();
 790                  Pipe.SourceChannel source = p.source()) {
 791 
 792                 // read from source to EOF when current thread blocking in write
 793                 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
 794 
 795                 // write to sink should block
 796                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
 797                 for (int i=0; i<1000; i++) {
 798                     int n = sink.write(bb);
 799                     assertTrue(n > 0);
 800                     bb.clear();
 801                 }
 802                 sink.close();
 803 
 804                 // wait for reader to finish
 805                 reader.join();
 806             }
 807         });
 808     }
 809 
 810     /**
 811      * Pipe.SourceChannel close while virtual thread blocked in read.
 812      */
 813     @ParameterizedTest
 814     @MethodSource("threadBuilders")
 815     void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 816         VThreadRunner.run(builder, () -> {
 817             Pipe p = Pipe.open();
 818             try (Pipe.SinkChannel sink = p.sink();
 819                  Pipe.SourceChannel source = p.source()) {
 820                 runAfterParkedAsync(source::close);
 821                 try {
 822                     int n = source.read(ByteBuffer.allocate(100));
 823                     fail("read returned " + n);
 824                 } catch (AsynchronousCloseException expected) { }
 825             }
 826         });
 827     }
 828 
 829     /**
 830      * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
 831      */
 832     @ParameterizedTest
 833     @MethodSource("threadBuilders")
 834     void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 835         VThreadRunner.run(builder, () -> {
 836             Pipe p = Pipe.open();
 837             try (Pipe.SinkChannel sink = p.sink();
 838                  Pipe.SourceChannel source = p.source()) {
 839 
 840                 // interrupt current thread when it blocks reading from source
 841                 Thread thisThread = Thread.currentThread();
 842                 runAfterParkedAsync(thisThread::interrupt);
 843 
 844                 try {
 845                     int n = source.read(ByteBuffer.allocate(100));
 846                     fail("read returned " + n);
 847                 } catch (ClosedByInterruptException expected) {
 848                     assertTrue(Thread.interrupted());
 849                 }
 850             }
 851         });
 852     }
 853 
 854     /**
 855      * Pipe.SinkChannel close while virtual thread blocked in write.
 856      */
 857     @ParameterizedTest
 858     @MethodSource("threadBuilders")
 859     void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 860         VThreadRunner.run(builder, () -> {
 861             boolean done = false;
 862             while (!done) {
 863                 Pipe p = Pipe.open();
 864                 try (Pipe.SinkChannel sink = p.sink();
 865                      Pipe.SourceChannel source = p.source()) {
 866 
 867                     // close sink when current thread blocks in write
 868                     runAfterParkedAsync(sink::close, true);
 869 
 870                     // write until channel is closed
 871                     try {
 872                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 873                         for (;;) {
 874                             int n = sink.write(bb);
 875                             assertTrue(n > 0);
 876                             bb.clear();
 877                         }
 878                     } catch (AsynchronousCloseException e) {
 879                         // closed when blocked in write
 880                         done = true;
 881                     } catch (ClosedChannelException e) {
 882                         // closed but not blocked in write, need to retry test
 883                         System.err.format("%s, need to retry!%n", e);
 884                     }
 885                 }
 886             }
 887         });
 888     }
 889 
 890     /**
 891      * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
 892      */
 893     @ParameterizedTest
 894     @MethodSource("threadBuilders")
 895     void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 896         VThreadRunner.run(builder, () -> {
 897             boolean done = false;
 898             while (!done) {
 899                 Pipe p = Pipe.open();
 900                 try (Pipe.SinkChannel sink = p.sink();
 901                      Pipe.SourceChannel source = p.source()) {
 902 
 903                     // interrupt current thread when it blocks in write
 904                     Thread thisThread = Thread.currentThread();
 905                     runAfterParkedAsync(thisThread::interrupt, true);
 906 
 907                     // write until channel is closed
 908                     try {
 909                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 910                         for (;;) {
 911                             int n = sink.write(bb);
 912                             assertTrue(n > 0);
 913                             bb.clear();
 914                         }
 915                     } catch (ClosedByInterruptException expected) {
 916                         // closed when blocked in write
 917                         assertTrue(Thread.interrupted());
 918                         done = true;
 919                     } catch (ClosedChannelException e) {
 920                         // closed but not blocked in write, need to retry test
 921                         System.err.format("%s, need to retry!%n", e);
 922                     }
 923                 }
 924             }
 925         });
 926     }
 927 
 928     /**
 929      * Creates a loopback connection
 930      */
 931     static class Connection implements Closeable {
 932         private final SocketChannel sc1;
 933         private final SocketChannel sc2;
 934         Connection() throws IOException {
 935             var lh = InetAddress.getLoopbackAddress();
 936             try (var listener = ServerSocketChannel.open()) {
 937                 listener.bind(new InetSocketAddress(lh, 0));
 938                 SocketChannel sc1 = SocketChannel.open();
 939                 SocketChannel sc2 = null;
 940                 try {
 941                     sc1.socket().connect(listener.getLocalAddress());
 942                     sc2 = listener.accept();
 943                 } catch (IOException ioe) {
 944                     sc1.close();
 945                     throw ioe;
 946                 }
 947                 this.sc1 = sc1;
 948                 this.sc2 = sc2;
 949             }
 950         }
 951         SocketChannel channel1() {
 952             return sc1;
 953         }
 954         SocketChannel channel2() {
 955             return sc2;
 956         }
 957         @Override
 958         public void close() throws IOException {
 959             sc1.close();
 960             sc2.close();
 961         }
 962     }
 963 
 964     /**
 965      * Read from a channel until all bytes have been read or an I/O error occurs.
 966      */
 967     static void readToEOF(ReadableByteChannel rbc) throws IOException {
 968         ByteBuffer bb = ByteBuffer.allocate(16*1024);
 969         int n;
 970         while ((n = rbc.read(bb)) > 0) {
 971             bb.clear();
 972         }
 973     }
 974 
 975     @FunctionalInterface
 976     interface ThrowingRunnable {
 977         void run() throws Exception;
 978     }
 979 
 980     /**
 981      * Runs the given task asynchronously after the current virtual thread parks.
 982      * @param writing if the thread will block in write
 983      * @return the thread started to run the task
 984      */
 985     private static Thread runAfterParkedAsync(ThrowingRunnable task, boolean writing) {
 986         Thread target = Thread.currentThread();
 987         if (!target.isVirtual())
 988             throw new WrongThreadException();
 989         return Thread.ofPlatform().daemon().start(() -> {
 990             try {
 991                 // wait for target thread to park
 992                 while (!isWaiting(target)) {
 993                     Thread.sleep(20);
 994                 }
 995 
 996                 // if the target thread is parked in write then we nudge it a few times
 997                 // to avoid wakeup with some bytes written
 998                 if (writing) {
 999                     for (int i = 0; i < 3; i++) {
1000                         LockSupport.unpark(target);
1001                         while (!isWaiting(target)) {
1002                             Thread.sleep(20);
1003                         }
1004                     }
1005                 }
1006 
1007                 task.run();
1008 
1009             } catch (Exception e) {
1010                 e.printStackTrace();
1011             }
1012         });
1013     }
1014 
1015     private static Thread runAfterParkedAsync(ThrowingRunnable task) {
1016         return runAfterParkedAsync(task, false);
1017     }
1018 
1019     /**
1020      * Return true if the given Thread is parked.
1021      */
1022     private static boolean isWaiting(Thread target) {
1023         Thread.State state = target.getState();
1024         assertNotEquals(Thread.State.TERMINATED, state);
1025         return (state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING);
1026     }
1027 }