< prev index next >

test/jdk/java/nio/channels/vthread/BlockingChannelOps.java

Print this page

   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /*
  25  * @test id=default
  26  * @bug 8284161
  27  * @summary Test virtual threads doing blocking I/O on NIO channels
  28  * @library /test/lib
  29  * @run junit/timeout=480 BlockingChannelOps
  30  */
  31 
  32 /*
  33  * @test id=poller-modes
  34  * @requires (os.family == "linux") | (os.family == "mac")
  35  * @library /test/lib
  36  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
  37  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps













  38  */
  39 
  40 /*
  41  * @test id=no-vmcontinuations
  42  * @requires vm.continuations
  43  * @library /test/lib
  44  * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
  45  */
  46 
  47 import java.io.Closeable;
  48 import java.io.IOException;
  49 import java.net.DatagramPacket;
  50 import java.net.InetAddress;
  51 import java.net.InetSocketAddress;
  52 import java.net.Socket;
  53 import java.net.SocketAddress;
  54 import java.net.SocketException;
  55 import java.nio.ByteBuffer;
  56 import java.nio.channels.AsynchronousCloseException;
  57 import java.nio.channels.ClosedByInterruptException;
  58 import java.nio.channels.ClosedChannelException;
  59 import java.nio.channels.DatagramChannel;
  60 import java.nio.channels.Pipe;
  61 import java.nio.channels.ReadableByteChannel;
  62 import java.nio.channels.ServerSocketChannel;
  63 import java.nio.channels.SocketChannel;
  64 import java.nio.channels.WritableByteChannel;



  65 import java.util.concurrent.locks.LockSupport;

  66 
  67 import jdk.test.lib.thread.VThreadRunner;
  68 import org.junit.jupiter.api.Test;


  69 import org.junit.jupiter.params.ParameterizedTest;
  70 import org.junit.jupiter.params.provider.ValueSource;
  71 import static org.junit.jupiter.api.Assertions.*;
  72 
  73 class BlockingChannelOps {





























  74 
  75     /**
  76      * SocketChannel read/write, no blocking.
  77      */
  78     @Test
  79     void testSocketChannelReadWrite1() throws Exception {
  80         VThreadRunner.run(() -> {

  81             try (var connection = new Connection()) {
  82                 SocketChannel sc1 = connection.channel1();
  83                 SocketChannel sc2 = connection.channel2();
  84 
  85                 // write to sc1
  86                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
  87                 int n = sc1.write(bb);
  88                 assertTrue(n > 0);
  89 
  90                 // read from sc2 should not block
  91                 bb = ByteBuffer.allocate(10);
  92                 n = sc2.read(bb);
  93                 assertTrue(n > 0);
  94                 assertTrue(bb.get(0) == 'X');
  95             }
  96         });
  97     }
  98 
  99     /**
 100      * Virtual thread blocks in SocketChannel read.
 101      */
 102     @Test
 103     void testSocketChannelRead() throws Exception {
 104         VThreadRunner.run(() -> {

 105             try (var connection = new Connection()) {
 106                 SocketChannel sc1 = connection.channel1();
 107                 SocketChannel sc2 = connection.channel2();
 108 
 109                 // write to sc1 when current thread blocks in sc2.read
 110                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 111                 runAfterParkedAsync(() -> sc1.write(bb1));
 112 
 113                 // read from sc2 should block
 114                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 115                 int n = sc2.read(bb2);
 116                 assertTrue(n > 0);
 117                 assertTrue(bb2.get(0) == 'X');
 118             }
 119         });
 120     }
 121 
 122     /**
 123      * Virtual thread blocks in SocketChannel write.
 124      */
 125     @Test
 126     void testSocketChannelWrite() throws Exception {
 127         VThreadRunner.run(() -> {

 128             try (var connection = new Connection()) {
 129                 SocketChannel sc1 = connection.channel1();
 130                 SocketChannel sc2 = connection.channel2();
 131 
 132                 // read from sc2 to EOF when current thread blocks in sc1.write
 133                 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
 134 
 135                 // write to sc1 should block
 136                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
 137                 for (int i=0; i<1000; i++) {
 138                     int n = sc1.write(bb);
 139                     assertTrue(n > 0);
 140                     bb.clear();
 141                 }
 142                 sc1.close();
 143 
 144                 // wait for reader to finish
 145                 reader.join();
 146             }
 147         });
 148     }
 149 
 150     /**
 151      * SocketChannel close while virtual thread blocked in read.
 152      */
 153     @Test
 154     void testSocketChannelReadAsyncClose() throws Exception {
 155         VThreadRunner.run(() -> {

 156             try (var connection = new Connection()) {
 157                 SocketChannel sc = connection.channel1();
 158                 runAfterParkedAsync(sc::close);
 159                 try {
 160                     int n = sc.read(ByteBuffer.allocate(100));
 161                     fail("read returned " + n);
 162                 } catch (AsynchronousCloseException expected) { }
 163             }
 164         });
 165     }
 166 
 167     /**
 168      * SocketChannel shutdownInput while virtual thread blocked in read.
 169      */
 170     @Test
 171     void testSocketChannelReadAsyncShutdownInput() throws Exception {
 172         VThreadRunner.run(() -> {

 173             try (var connection = new Connection()) {
 174                 SocketChannel sc = connection.channel1();
 175                 runAfterParkedAsync(sc::shutdownInput);
 176                 int n = sc.read(ByteBuffer.allocate(100));
 177                 assertEquals(-1, n);
 178                 assertTrue(sc.isOpen());
 179             }
 180         });
 181     }
 182 
 183     /**
 184      * Virtual thread interrupted while blocked in SocketChannel read.
 185      */
 186     @Test
 187     void testSocketChannelReadInterrupt() throws Exception {
 188         VThreadRunner.run(() -> {

 189             try (var connection = new Connection()) {
 190                 SocketChannel sc = connection.channel1();
 191 
 192                 // interrupt current thread when it blocks in read
 193                 Thread thisThread = Thread.currentThread();
 194                 runAfterParkedAsync(thisThread::interrupt);
 195 
 196                 try {
 197                     int n = sc.read(ByteBuffer.allocate(100));
 198                     fail("read returned " + n);
 199                 } catch (ClosedByInterruptException expected) {
 200                     assertTrue(Thread.interrupted());
 201                 }
 202             }
 203         });
 204     }
 205 
 206     /**
 207      * SocketChannel close while virtual thread blocked in write.
 208      */
 209     @Test
 210     void testSocketChannelWriteAsyncClose() throws Exception {
 211         VThreadRunner.run(() -> {

 212             boolean done = false;
 213             while (!done) {
 214                 try (var connection = new Connection()) {
 215                     SocketChannel sc = connection.channel1();
 216 
 217                     // close sc when current thread blocks in write
 218                     runAfterParkedAsync(sc::close, true);
 219 
 220                     // write until channel is closed
 221                     try {
 222                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 223                         for (;;) {
 224                             int n = sc.write(bb);
 225                             assertTrue(n > 0);
 226                             bb.clear();
 227                         }
 228                     } catch (AsynchronousCloseException expected) {
 229                         // closed when blocked in write
 230                         done = true;
 231                     } catch (ClosedChannelException e) {
 232                         // closed but not blocked in write, need to retry test
 233                         System.err.format("%s, need to retry!%n", e);
 234                     }
 235                 }
 236             }
 237         });
 238     }
 239 
 240 
 241     /**
 242      * SocketChannel shutdownOutput while virtual thread blocked in write.
 243      */
 244     @Test
 245     void testSocketChannelWriteAsyncShutdownOutput() throws Exception {
 246         VThreadRunner.run(() -> {

 247             try (var connection = new Connection()) {
 248                 SocketChannel sc = connection.channel1();
 249 
 250                 // shutdown output when current thread blocks in write
 251                 runAfterParkedAsync(sc::shutdownOutput);
 252                 try {
 253                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
 254                     for (;;) {
 255                         int n = sc.write(bb);
 256                         assertTrue(n > 0);
 257                         bb.clear();
 258                     }
 259                 } catch (ClosedChannelException e) {
 260                     // expected
 261                 }
 262                 assertTrue(sc.isOpen());
 263             }
 264         });
 265     }
 266 
 267     /**
 268      * Virtual thread interrupted while blocked in SocketChannel write.
 269      */
 270     @Test
 271     void testSocketChannelWriteInterrupt() throws Exception {
 272         VThreadRunner.run(() -> {

 273             boolean done = false;
 274             while (!done) {
 275                 try (var connection = new Connection()) {
 276                     SocketChannel sc = connection.channel1();
 277 
 278                     // interrupt current thread when it blocks in write
 279                     Thread thisThread = Thread.currentThread();
 280                     runAfterParkedAsync(thisThread::interrupt, true);
 281 
 282                     // write until channel is closed
 283                     try {
 284                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 285                         for (;;) {
 286                             int n = sc.write(bb);
 287                             assertTrue(n > 0);
 288                             bb.clear();
 289                         }
 290                     } catch (ClosedByInterruptException e) {
 291                         // closed when blocked in write
 292                         assertTrue(Thread.interrupted());
 293                         done = true;
 294                     } catch (ClosedChannelException e) {
 295                         // closed but not blocked in write, need to retry test
 296                         System.err.format("%s, need to retry!%n", e);
 297                     }
 298                 }
 299             }
 300         });
 301     }
 302 
 303     /**
 304      * Virtual thread blocks in SocketChannel adaptor read.
 305      */
 306     @ParameterizedTest
 307     @ValueSource(ints = { 0, 60_000 })
 308     void testSocketAdaptorRead(int timeout) throws Exception {
 309         VThreadRunner.run(() -> {














 310             try (var connection = new Connection()) {
 311                 SocketChannel sc1 = connection.channel1();
 312                 SocketChannel sc2 = connection.channel2();
 313 
 314                 // write to sc1 when currnet thread blocks reading from sc2
 315                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 316                 runAfterParkedAsync(() -> sc1.write(bb));
 317 
 318                 // read from sc2 should block
 319                 byte[] array = new byte[100];
 320                 if (timeout > 0)
 321                     sc2.socket().setSoTimeout(timeout);
 322                 int n = sc2.socket().getInputStream().read(array);
 323                 assertTrue(n > 0);
 324                 assertTrue(array[0] == 'X');
 325             }
 326         });
 327     }
 328 
 329     /**
 330      * ServerSocketChannel accept, no blocking.
 331      */
 332     @Test
 333     void testServerSocketChannelAccept1() throws Exception {
 334         VThreadRunner.run(() -> {

 335             try (var ssc = ServerSocketChannel.open()) {
 336                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 337                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
 338                 // accept should not block
 339                 var sc2 = ssc.accept();
 340                 sc1.close();
 341                 sc2.close();
 342             }
 343         });
 344     }
 345 
 346     /**
 347      * Virtual thread blocks in ServerSocketChannel accept.
 348      */
 349     @Test
 350     void testServerSocketChannelAccept2() throws Exception {
 351         VThreadRunner.run(() -> {

 352             try (var ssc = ServerSocketChannel.open()) {
 353                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 354                 var sc1 = SocketChannel.open();
 355 
 356                 // connect when current thread when it blocks in accept
 357                 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
 358 
 359                 // accept should block
 360                 var sc2 = ssc.accept();
 361                 sc1.close();
 362                 sc2.close();
 363             }
 364         });
 365     }
 366 
 367     /**
 368      * SeverSocketChannel close while virtual thread blocked in accept.
 369      */
 370     @Test
 371     void testServerSocketChannelAcceptAsyncClose() throws Exception {
 372         VThreadRunner.run(() -> {

 373             try (var ssc = ServerSocketChannel.open()) {
 374                 InetAddress lh = InetAddress.getLoopbackAddress();
 375                 ssc.bind(new InetSocketAddress(lh, 0));
 376                 runAfterParkedAsync(ssc::close);
 377                 try {
 378                     SocketChannel sc = ssc.accept();
 379                     sc.close();
 380                     fail("connection accepted???");
 381                 } catch (AsynchronousCloseException expected) { }
 382             }
 383         });
 384     }
 385 
 386     /**
 387      * Virtual thread interrupted while blocked in ServerSocketChannel accept.
 388      */
 389     @Test
 390     void testServerSocketChannelAcceptInterrupt() throws Exception {
 391         VThreadRunner.run(() -> {

 392             try (var ssc = ServerSocketChannel.open()) {
 393                 InetAddress lh = InetAddress.getLoopbackAddress();
 394                 ssc.bind(new InetSocketAddress(lh, 0));
 395 
 396                 // interrupt current thread when it blocks in accept
 397                 Thread thisThread = Thread.currentThread();
 398                 runAfterParkedAsync(thisThread::interrupt);
 399 
 400                 try {
 401                     SocketChannel sc = ssc.accept();
 402                     sc.close();
 403                     fail("connection accepted???");
 404                 } catch (ClosedByInterruptException expected) {
 405                     assertTrue(Thread.interrupted());
 406                 }
 407             }
 408         });
 409     }
 410 
 411     /**
 412      * Virtual thread blocks in ServerSocketChannel adaptor accept.
 413      */
 414     @ParameterizedTest
 415     @ValueSource(ints = { 0, 60_000 })
 416     void testSocketChannelAdaptorAccept(int timeout) throws Exception {
 417         VThreadRunner.run(() -> {














 418             try (var ssc = ServerSocketChannel.open()) {
 419                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 420                 var sc = SocketChannel.open();
 421 
 422                 // interrupt current thread when it blocks in accept
 423                 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
 424 
 425                 // accept should block
 426                 if (timeout > 0)
 427                     ssc.socket().setSoTimeout(timeout);
 428                 Socket s = ssc.socket().accept();
 429                 sc.close();
 430                 s.close();
 431             }
 432         });
 433     }
 434 
 435     /**
 436      * DatagramChannel receive/send, no blocking.
 437      */
 438     @Test
 439     void testDatagramChannelSendReceive1() throws Exception {
 440         VThreadRunner.run(() -> {

 441             try (DatagramChannel dc1 = DatagramChannel.open();
 442                  DatagramChannel dc2 = DatagramChannel.open()) {
 443 
 444                 InetAddress lh = InetAddress.getLoopbackAddress();
 445                 dc2.bind(new InetSocketAddress(lh, 0));
 446 
 447                 // send should not block
 448                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 449                 int n = dc1.send(bb, dc2.getLocalAddress());
 450                 assertTrue(n > 0);
 451 
 452                 // receive should not block
 453                 bb = ByteBuffer.allocate(10);
 454                 dc2.receive(bb);
 455                 assertTrue(bb.get(0) == 'X');
 456             }
 457         });
 458     }
 459 
 460     /**
 461      * Virtual thread blocks in DatagramChannel receive.
 462      */
 463     @Test
 464     void testDatagramChannelSendReceive2() throws Exception {
 465         VThreadRunner.run(() -> {

 466             try (DatagramChannel dc1 = DatagramChannel.open();
 467                  DatagramChannel dc2 = DatagramChannel.open()) {
 468 
 469                 InetAddress lh = InetAddress.getLoopbackAddress();
 470                 dc2.bind(new InetSocketAddress(lh, 0));
 471 
 472                 // send from dc1 when current thread blocked in dc2.receive
 473                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 474                 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
 475 
 476                 // read from dc2 should block
 477                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 478                 dc2.receive(bb2);
 479                 assertTrue(bb2.get(0) == 'X');
 480             }
 481         });
 482     }
 483 
 484     /**
 485      * DatagramChannel close while virtual thread blocked in receive.
 486      */
 487     @Test
 488     void testDatagramChannelReceiveAsyncClose() throws Exception {
 489         VThreadRunner.run(() -> {

 490             try (DatagramChannel dc = DatagramChannel.open()) {
 491                 InetAddress lh = InetAddress.getLoopbackAddress();
 492                 dc.bind(new InetSocketAddress(lh, 0));
 493                 runAfterParkedAsync(dc::close);
 494                 try {
 495                     dc.receive(ByteBuffer.allocate(100));
 496                     fail("receive returned");
 497                 } catch (AsynchronousCloseException expected) { }
 498             }
 499         });
 500     }
 501 
 502     /**
 503      * Virtual thread interrupted while blocked in DatagramChannel receive.
 504      */
 505     @Test
 506     void testDatagramChannelReceiveInterrupt() throws Exception {
 507         VThreadRunner.run(() -> {

 508             try (DatagramChannel dc = DatagramChannel.open()) {
 509                 InetAddress lh = InetAddress.getLoopbackAddress();
 510                 dc.bind(new InetSocketAddress(lh, 0));
 511 
 512                 // interrupt current thread when it blocks in receive
 513                 Thread thisThread = Thread.currentThread();
 514                 runAfterParkedAsync(thisThread::interrupt);
 515 
 516                 try {
 517                     dc.receive(ByteBuffer.allocate(100));
 518                     fail("receive returned");
 519                 } catch (ClosedByInterruptException expected) {
 520                     assertTrue(Thread.interrupted());
 521                 }
 522             }
 523         });
 524     }
 525 
 526     /**
 527      * Virtual thread blocks in DatagramSocket adaptor receive.
 528      */
 529     @ParameterizedTest
 530     @ValueSource(ints = { 0, 60_000 })
 531     void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
 532         VThreadRunner.run(() -> {














 533             try (DatagramChannel dc1 = DatagramChannel.open();
 534                  DatagramChannel dc2 = DatagramChannel.open()) {
 535 
 536                 InetAddress lh = InetAddress.getLoopbackAddress();
 537                 dc2.bind(new InetSocketAddress(lh, 0));
 538 
 539                 // send from dc1 when current thread blocks in dc2 receive
 540                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 541                 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
 542 
 543                 // receive should block
 544                 byte[] array = new byte[100];
 545                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 546                 if (timeout > 0)
 547                     dc2.socket().setSoTimeout(timeout);
 548                 dc2.socket().receive(p);
 549                 assertTrue(p.getLength() == 3 && array[0] == 'X');
 550             }
 551         });
 552     }
 553 
 554     /**
 555      * DatagramChannel close while virtual thread blocked in adaptor receive.
 556      */
 557     @ParameterizedTest
 558     @ValueSource(ints = { 0, 60_000 })
 559     void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
 560         VThreadRunner.run(() -> {















 561             try (DatagramChannel dc = DatagramChannel.open()) {
 562                 InetAddress lh = InetAddress.getLoopbackAddress();
 563                 dc.bind(new InetSocketAddress(lh, 0));
 564 
 565                 byte[] array = new byte[100];
 566                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 567                 if (timeout > 0)
 568                     dc.socket().setSoTimeout(timeout);
 569 
 570                 // close channel/socket when current thread blocks in receive
 571                 runAfterParkedAsync(dc::close);
 572 
 573                 assertThrows(SocketException.class, () -> dc.socket().receive(p));
 574             }
 575         });
 576     }
 577 
 578     /**
 579      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
 580      */
 581     @ParameterizedTest
 582     @ValueSource(ints = { 0, 60_000 })
 583     void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
 584         VThreadRunner.run(() -> {















 585             try (DatagramChannel dc = DatagramChannel.open()) {
 586                 InetAddress lh = InetAddress.getLoopbackAddress();
 587                 dc.bind(new InetSocketAddress(lh, 0));
 588 
 589                 byte[] array = new byte[100];
 590                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 591                 if (timeout > 0)
 592                     dc.socket().setSoTimeout(timeout);
 593 
 594                 // interrupt current thread when it blocks in receive
 595                 Thread thisThread = Thread.currentThread();
 596                 runAfterParkedAsync(thisThread::interrupt);
 597 
 598                 try {
 599                     dc.socket().receive(p);
 600                     fail();
 601                 } catch (ClosedByInterruptException expected) {
 602                     assertTrue(Thread.interrupted());
 603                 }
 604             }
 605         });
 606     }
 607 
 608     /**
 609      * Pipe read/write, no blocking.
 610      */
 611     @Test
 612     void testPipeReadWrite1() throws Exception {
 613         VThreadRunner.run(() -> {

 614             Pipe p = Pipe.open();
 615             try (Pipe.SinkChannel sink = p.sink();
 616                  Pipe.SourceChannel source = p.source()) {
 617 
 618                 // write should not block
 619                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 620                 int n = sink.write(bb);
 621                 assertTrue(n > 0);
 622 
 623                 // read should not block
 624                 bb = ByteBuffer.allocate(10);
 625                 n = source.read(bb);
 626                 assertTrue(n > 0);
 627                 assertTrue(bb.get(0) == 'X');
 628             }
 629         });
 630     }
 631 
 632     /**
 633      * Virtual thread blocks in Pipe.SourceChannel read.
 634      */
 635     @Test
 636     void testPipeReadWrite2() throws Exception {
 637         VThreadRunner.run(() -> {

 638             Pipe p = Pipe.open();
 639             try (Pipe.SinkChannel sink = p.sink();
 640                  Pipe.SourceChannel source = p.source()) {
 641 
 642                 // write from sink when current thread blocks reading from source
 643                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 644                 runAfterParkedAsync(() -> sink.write(bb1));
 645 
 646                 // read should block
 647                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 648                 int n = source.read(bb2);
 649                 assertTrue(n > 0);
 650                 assertTrue(bb2.get(0) == 'X');
 651             }
 652         });
 653     }
 654 
 655     /**
 656      * Virtual thread blocks in Pipe.SinkChannel write.
 657      */
 658     @Test
 659     void testPipeReadWrite3() throws Exception {
 660         VThreadRunner.run(() -> {

 661             Pipe p = Pipe.open();
 662             try (Pipe.SinkChannel sink = p.sink();
 663                  Pipe.SourceChannel source = p.source()) {
 664 
 665                 // read from source to EOF when current thread blocking in write
 666                 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
 667 
 668                 // write to sink should block
 669                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
 670                 for (int i=0; i<1000; i++) {
 671                     int n = sink.write(bb);
 672                     assertTrue(n > 0);
 673                     bb.clear();
 674                 }
 675                 sink.close();
 676 
 677                 // wait for reader to finish
 678                 reader.join();
 679             }
 680         });
 681     }
 682 
 683     /**
 684      * Pipe.SourceChannel close while virtual thread blocked in read.
 685      */
 686     @Test
 687     void testPipeReadAsyncClose() throws Exception {
 688         VThreadRunner.run(() -> {

 689             Pipe p = Pipe.open();
 690             try (Pipe.SinkChannel sink = p.sink();
 691                  Pipe.SourceChannel source = p.source()) {
 692                 runAfterParkedAsync(source::close);
 693                 try {
 694                     int n = source.read(ByteBuffer.allocate(100));
 695                     fail("read returned " + n);
 696                 } catch (AsynchronousCloseException expected) { }
 697             }
 698         });
 699     }
 700 
 701     /**
 702      * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
 703      */
 704     @Test
 705     void testPipeReadInterrupt() throws Exception {
 706         VThreadRunner.run(() -> {

 707             Pipe p = Pipe.open();
 708             try (Pipe.SinkChannel sink = p.sink();
 709                  Pipe.SourceChannel source = p.source()) {
 710 
 711                 // interrupt current thread when it blocks reading from source
 712                 Thread thisThread = Thread.currentThread();
 713                 runAfterParkedAsync(thisThread::interrupt);
 714 
 715                 try {
 716                     int n = source.read(ByteBuffer.allocate(100));
 717                     fail("read returned " + n);
 718                 } catch (ClosedByInterruptException expected) {
 719                     assertTrue(Thread.interrupted());
 720                 }
 721             }
 722         });
 723     }
 724 
 725     /**
 726      * Pipe.SinkChannel close while virtual thread blocked in write.
 727      */
 728     @Test
 729     void testPipeWriteAsyncClose() throws Exception {
 730         VThreadRunner.run(() -> {

 731             boolean done = false;
 732             while (!done) {
 733                 Pipe p = Pipe.open();
 734                 try (Pipe.SinkChannel sink = p.sink();
 735                      Pipe.SourceChannel source = p.source()) {
 736 
 737                     // close sink when current thread blocks in write
 738                     runAfterParkedAsync(sink::close, true);
 739 
 740                     // write until channel is closed
 741                     try {
 742                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 743                         for (;;) {
 744                             int n = sink.write(bb);
 745                             assertTrue(n > 0);
 746                             bb.clear();
 747                         }
 748                     } catch (AsynchronousCloseException e) {
 749                         // closed when blocked in write
 750                         done = true;
 751                     } catch (ClosedChannelException e) {
 752                         // closed but not blocked in write, need to retry test
 753                         System.err.format("%s, need to retry!%n", e);
 754                     }
 755                 }
 756             }
 757         });
 758     }
 759 
 760     /**
 761      * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
 762      */
 763     @Test
 764     void testPipeWriteInterrupt() throws Exception {
 765         VThreadRunner.run(() -> {

 766             boolean done = false;
 767             while (!done) {
 768                 Pipe p = Pipe.open();
 769                 try (Pipe.SinkChannel sink = p.sink();
 770                      Pipe.SourceChannel source = p.source()) {
 771 
 772                     // interrupt current thread when it blocks in write
 773                     Thread thisThread = Thread.currentThread();
 774                     runAfterParkedAsync(thisThread::interrupt, true);
 775 
 776                     // write until channel is closed
 777                     try {
 778                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 779                         for (;;) {
 780                             int n = sink.write(bb);
 781                             assertTrue(n > 0);
 782                             bb.clear();
 783                         }
 784                     } catch (ClosedByInterruptException expected) {
 785                         // closed when blocked in write

   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /*
  25  * @test id=default
  26  * @bug 8284161
  27  * @summary Test virtual threads doing blocking I/O on NIO channels
  28  * @library /test/lib
  29  * @run junit/othervm/timeout=480 BlockingChannelOps
  30  */
  31 
  32 /*
  33  * @test id=poller-modes
  34  * @requires (os.family == "linux") | (os.family == "mac")
  35  * @library /test/lib
  36  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
  37  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
  38  * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 BlockingChannelOps
  39  */
  40 
  41 /*
  42  * @test id=io_uring
  43  * @requires os.family == "linux"
  44  * @library /test/lib
  45  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring=true BlockingChannelOps
  46  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring=true BlockingChannelOps
  47  * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring=true BlockingChannelOps
  48  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
  49  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
  50  * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
  51  */
  52 
  53 /*
  54  * @test id=no-vmcontinuations
  55  * @requires vm.continuations
  56  * @library /test/lib
  57  * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
  58  */
  59 
  60 import java.io.Closeable;
  61 import java.io.IOException;
  62 import java.net.DatagramPacket;
  63 import java.net.InetAddress;
  64 import java.net.InetSocketAddress;
  65 import java.net.Socket;
  66 import java.net.SocketAddress;
  67 import java.net.SocketException;
  68 import java.nio.ByteBuffer;
  69 import java.nio.channels.AsynchronousCloseException;
  70 import java.nio.channels.ClosedByInterruptException;
  71 import java.nio.channels.ClosedChannelException;
  72 import java.nio.channels.DatagramChannel;
  73 import java.nio.channels.Pipe;
  74 import java.nio.channels.ReadableByteChannel;
  75 import java.nio.channels.ServerSocketChannel;
  76 import java.nio.channels.SocketChannel;
  77 import java.nio.channels.WritableByteChannel;
  78 import java.util.concurrent.ExecutorService;
  79 import java.util.concurrent.Executors;
  80 import java.util.concurrent.ThreadFactory;
  81 import java.util.concurrent.locks.LockSupport;
  82 import java.util.stream.Stream;
  83 
  84 import jdk.test.lib.thread.VThreadRunner;
  85 import jdk.test.lib.thread.VThreadScheduler;
  86 
  87 import org.junit.jupiter.api.BeforeAll;
  88 import org.junit.jupiter.params.ParameterizedTest;
  89 import org.junit.jupiter.params.provider.MethodSource;
  90 import static org.junit.jupiter.api.Assertions.*;
  91 
  92 class BlockingChannelOps {
  93     private static ExecutorService threadPool;
  94     private static Thread.VirtualThreadScheduler customScheduler;
  95 
  96     @BeforeAll
  97     static void setup() {
  98         ThreadFactory factory = Thread.ofPlatform().daemon().factory();
  99         threadPool = Executors.newCachedThreadPool(factory);
 100         customScheduler = new Thread.VirtualThreadScheduler() {
 101             @Override
 102             public void onStart(Thread.VirtualThreadTask task) {
 103                 threadPool.execute(task);
 104             }
 105             @Override
 106             public void onContinue(Thread.VirtualThreadTask task) {
 107                 threadPool.execute(task);
 108             }
 109         };
 110     }
 111 
 112     /**
 113      * Returns the Thread.Builder to create the virtual thread.
 114      */
 115     static Stream<Thread.Builder.OfVirtual> threadBuilders() {
 116         if (VThreadScheduler.supportsCustomScheduler()) {
 117             return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler));
 118         } else {
 119             return Stream.of(Thread.ofVirtual());
 120         }
 121     }
 122 
 123     /**
 124      * SocketChannel read/write, no blocking.
 125      */
 126     @ParameterizedTest
 127     @MethodSource("threadBuilders")
 128     void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
 129         VThreadRunner.run(builder, () -> {
 130             try (var connection = new Connection()) {
 131                 SocketChannel sc1 = connection.channel1();
 132                 SocketChannel sc2 = connection.channel2();
 133 
 134                 // write to sc1
 135                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 136                 int n = sc1.write(bb);
 137                 assertTrue(n > 0);
 138 
 139                 // read from sc2 should not block
 140                 bb = ByteBuffer.allocate(10);
 141                 n = sc2.read(bb);
 142                 assertTrue(n > 0);
 143                 assertTrue(bb.get(0) == 'X');
 144             }
 145         });
 146     }
 147 
 148     /**
 149      * Virtual thread blocks in SocketChannel read.
 150      */
 151     @ParameterizedTest
 152     @MethodSource("threadBuilders")
 153     void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception {
 154         VThreadRunner.run(builder, () -> {
 155             try (var connection = new Connection()) {
 156                 SocketChannel sc1 = connection.channel1();
 157                 SocketChannel sc2 = connection.channel2();
 158 
 159                 // write to sc1 when current thread blocks in sc2.read
 160                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 161                 runAfterParkedAsync(() -> sc1.write(bb1));
 162 
 163                 // read from sc2 should block
 164                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 165                 int n = sc2.read(bb2);
 166                 assertTrue(n > 0);
 167                 assertTrue(bb2.get(0) == 'X');
 168             }
 169         });
 170     }
 171 
 172     /**
 173      * Virtual thread blocks in SocketChannel write.
 174      */
 175     @ParameterizedTest
 176     @MethodSource("threadBuilders")
 177     void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception {
 178         VThreadRunner.run(builder, () -> {
 179             try (var connection = new Connection()) {
 180                 SocketChannel sc1 = connection.channel1();
 181                 SocketChannel sc2 = connection.channel2();
 182 
 183                 // read from sc2 to EOF when current thread blocks in sc1.write
 184                 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
 185 
 186                 // write to sc1 should block
 187                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
 188                 for (int i=0; i<1000; i++) {
 189                     int n = sc1.write(bb);
 190                     assertTrue(n > 0);
 191                     bb.clear();
 192                 }
 193                 sc1.close();
 194 
 195                 // wait for reader to finish
 196                 reader.join();
 197             }
 198         });
 199     }
 200 
 201     /**
 202      * SocketChannel close while virtual thread blocked in read.
 203      */
 204     @ParameterizedTest
 205     @MethodSource("threadBuilders")
 206     void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 207         VThreadRunner.run(builder, () -> {
 208             try (var connection = new Connection()) {
 209                 SocketChannel sc = connection.channel1();
 210                 runAfterParkedAsync(sc::close);
 211                 try {
 212                     int n = sc.read(ByteBuffer.allocate(100));
 213                     fail("read returned " + n);
 214                 } catch (AsynchronousCloseException expected) { }
 215             }
 216         });
 217     }
 218 
 219     /**
 220      * SocketChannel shutdownInput while virtual thread blocked in read.
 221      */
 222     @ParameterizedTest
 223     @MethodSource("threadBuilders")
 224     void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception {
 225         VThreadRunner.run(builder, () -> {
 226             try (var connection = new Connection()) {
 227                 SocketChannel sc = connection.channel1();
 228                 runAfterParkedAsync(sc::shutdownInput);
 229                 int n = sc.read(ByteBuffer.allocate(100));
 230                 assertEquals(-1, n);
 231                 assertTrue(sc.isOpen());
 232             }
 233         });
 234     }
 235 
 236     /**
 237      * Virtual thread interrupted while blocked in SocketChannel read.
 238      */
 239     @ParameterizedTest
 240     @MethodSource("threadBuilders")
 241     void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 242         VThreadRunner.run(builder, () -> {
 243             try (var connection = new Connection()) {
 244                 SocketChannel sc = connection.channel1();
 245 
 246                 // interrupt current thread when it blocks in read
 247                 Thread thisThread = Thread.currentThread();
 248                 runAfterParkedAsync(thisThread::interrupt);
 249 
 250                 try {
 251                     int n = sc.read(ByteBuffer.allocate(100));
 252                     fail("read returned " + n);
 253                 } catch (ClosedByInterruptException expected) {
 254                     assertTrue(Thread.interrupted());
 255                 }
 256             }
 257         });
 258     }
 259 
 260     /**
 261      * SocketChannel close while virtual thread blocked in write.
 262      */
 263     @ParameterizedTest
 264     @MethodSource("threadBuilders")
 265     void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 266         VThreadRunner.run(builder, () -> {
 267             boolean done = false;
 268             while (!done) {
 269                 try (var connection = new Connection()) {
 270                     SocketChannel sc = connection.channel1();
 271 
 272                     // close sc when current thread blocks in write
 273                     runAfterParkedAsync(sc::close, true);
 274 
 275                     // write until channel is closed
 276                     try {
 277                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 278                         for (;;) {
 279                             int n = sc.write(bb);
 280                             assertTrue(n > 0);
 281                             bb.clear();
 282                         }
 283                     } catch (AsynchronousCloseException expected) {
 284                         // closed when blocked in write
 285                         done = true;
 286                     } catch (ClosedChannelException e) {
 287                         // closed but not blocked in write, need to retry test
 288                         System.err.format("%s, need to retry!%n", e);
 289                     }
 290                 }
 291             }
 292         });
 293     }
 294 

 295     /**
 296      * SocketChannel shutdownOutput while virtual thread blocked in write.
 297      */
 298     @ParameterizedTest
 299     @MethodSource("threadBuilders")
 300     void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception {
 301         VThreadRunner.run(builder, () -> {
 302             try (var connection = new Connection()) {
 303                 SocketChannel sc = connection.channel1();
 304 
 305                 // shutdown output when current thread blocks in write
 306                 runAfterParkedAsync(sc::shutdownOutput);
 307                 try {
 308                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
 309                     for (;;) {
 310                         int n = sc.write(bb);
 311                         assertTrue(n > 0);
 312                         bb.clear();
 313                     }
 314                 } catch (ClosedChannelException e) {
 315                     // expected
 316                 }
 317                 assertTrue(sc.isOpen());
 318             }
 319         });
 320     }
 321 
 322     /**
 323      * Virtual thread interrupted while blocked in SocketChannel write.
 324      */
 325     @ParameterizedTest
 326     @MethodSource("threadBuilders")
 327     void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 328         VThreadRunner.run(builder, () -> {
 329             boolean done = false;
 330             while (!done) {
 331                 try (var connection = new Connection()) {
 332                     SocketChannel sc = connection.channel1();
 333 
 334                     // interrupt current thread when it blocks in write
 335                     Thread thisThread = Thread.currentThread();
 336                     runAfterParkedAsync(thisThread::interrupt, true);
 337 
 338                     // write until channel is closed
 339                     try {
 340                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 341                         for (;;) {
 342                             int n = sc.write(bb);
 343                             assertTrue(n > 0);
 344                             bb.clear();
 345                         }
 346                     } catch (ClosedByInterruptException e) {
 347                         // closed when blocked in write
 348                         assertTrue(Thread.interrupted());
 349                         done = true;
 350                     } catch (ClosedChannelException e) {
 351                         // closed but not blocked in write, need to retry test
 352                         System.err.format("%s, need to retry!%n", e);
 353                     }
 354                 }
 355             }
 356         });
 357     }
 358 
 359     /**
 360      * Virtual thread blocks in SocketChannel adaptor read.
 361      */
 362     @ParameterizedTest
 363     @MethodSource("threadBuilders")
 364     void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception {
 365         testSocketAdaptorRead(builder, 0);
 366     }
 367 
 368     /**
 369      * Virtual thread blocks in SocketChannel adaptor read with timeout.
 370      */
 371     @ParameterizedTest
 372     @MethodSource("threadBuilders")
 373     void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception {
 374         testSocketAdaptorRead(builder, 60_000);
 375     }
 376 
 377     private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder,
 378                                        int timeout) throws Exception {
 379         VThreadRunner.run(builder, () -> {
 380             try (var connection = new Connection()) {
 381                 SocketChannel sc1 = connection.channel1();
 382                 SocketChannel sc2 = connection.channel2();
 383 
 384                 // write to sc1 when currnet thread blocks reading from sc2
 385                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 386                 runAfterParkedAsync(() -> sc1.write(bb));
 387 
 388                 // read from sc2 should block
 389                 byte[] array = new byte[100];
 390                 if (timeout > 0)
 391                     sc2.socket().setSoTimeout(timeout);
 392                 int n = sc2.socket().getInputStream().read(array);
 393                 assertTrue(n > 0);
 394                 assertTrue(array[0] == 'X');
 395             }
 396         });
 397     }
 398 
 399     /**
 400      * ServerSocketChannel accept, no blocking.
 401      */
 402     @ParameterizedTest
 403     @MethodSource("threadBuilders")
 404     void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception {
 405         VThreadRunner.run(builder, () -> {
 406             try (var ssc = ServerSocketChannel.open()) {
 407                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 408                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
 409                 // accept should not block
 410                 var sc2 = ssc.accept();
 411                 sc1.close();
 412                 sc2.close();
 413             }
 414         });
 415     }
 416 
 417     /**
 418      * Virtual thread blocks in ServerSocketChannel accept.
 419      */
 420     @ParameterizedTest
 421     @MethodSource("threadBuilders")
 422     void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception {
 423         VThreadRunner.run(builder, () -> {
 424             try (var ssc = ServerSocketChannel.open()) {
 425                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 426                 var sc1 = SocketChannel.open();
 427 
 428                 // connect when current thread when it blocks in accept
 429                 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
 430 
 431                 // accept should block
 432                 var sc2 = ssc.accept();
 433                 sc1.close();
 434                 sc2.close();
 435             }
 436         });
 437     }
 438 
 439     /**
 440      * SeverSocketChannel close while virtual thread blocked in accept.
 441      */
 442     @ParameterizedTest
 443     @MethodSource("threadBuilders")
 444     void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 445         VThreadRunner.run(builder, () -> {
 446             try (var ssc = ServerSocketChannel.open()) {
 447                 InetAddress lh = InetAddress.getLoopbackAddress();
 448                 ssc.bind(new InetSocketAddress(lh, 0));
 449                 runAfterParkedAsync(ssc::close);
 450                 try {
 451                     SocketChannel sc = ssc.accept();
 452                     sc.close();
 453                     fail("connection accepted???");
 454                 } catch (AsynchronousCloseException expected) { }
 455             }
 456         });
 457     }
 458 
 459     /**
 460      * Virtual thread interrupted while blocked in ServerSocketChannel accept.
 461      */
 462     @ParameterizedTest
 463     @MethodSource("threadBuilders")
 464     void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 465         VThreadRunner.run(builder, () -> {
 466             try (var ssc = ServerSocketChannel.open()) {
 467                 InetAddress lh = InetAddress.getLoopbackAddress();
 468                 ssc.bind(new InetSocketAddress(lh, 0));
 469 
 470                 // interrupt current thread when it blocks in accept
 471                 Thread thisThread = Thread.currentThread();
 472                 runAfterParkedAsync(thisThread::interrupt);
 473 
 474                 try {
 475                     SocketChannel sc = ssc.accept();
 476                     sc.close();
 477                     fail("connection accepted???");
 478                 } catch (ClosedByInterruptException expected) {
 479                     assertTrue(Thread.interrupted());
 480                 }
 481             }
 482         });
 483     }
 484 
 485     /**
 486      * Virtual thread blocks in ServerSocketChannel adaptor accept.
 487      */
 488     @ParameterizedTest
 489     @MethodSource("threadBuilders")
 490     void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception {
 491         testSocketChannelAdaptorAccept(builder, 0);
 492     }
 493 
 494     /**
 495      * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
 496      */
 497     @ParameterizedTest
 498     @MethodSource("threadBuilders")
 499     void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception {
 500         testSocketChannelAdaptorAccept(builder, 60_000);
 501     }
 502 
 503     private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder,
 504                                                 int timeout) throws Exception {
 505         VThreadRunner.run(builder, () -> {
 506             try (var ssc = ServerSocketChannel.open()) {
 507                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 508                 var sc = SocketChannel.open();
 509 
 510                 // interrupt current thread when it blocks in accept
 511                 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
 512 
 513                 // accept should block
 514                 if (timeout > 0)
 515                     ssc.socket().setSoTimeout(timeout);
 516                 Socket s = ssc.socket().accept();
 517                 sc.close();
 518                 s.close();
 519             }
 520         });
 521     }
 522 
 523     /**
 524      * DatagramChannel receive/send, no blocking.
 525      */
 526     @ParameterizedTest
 527     @MethodSource("threadBuilders")
 528     void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception {
 529         VThreadRunner.run(builder, () -> {
 530             try (DatagramChannel dc1 = DatagramChannel.open();
 531                  DatagramChannel dc2 = DatagramChannel.open()) {
 532 
 533                 InetAddress lh = InetAddress.getLoopbackAddress();
 534                 dc2.bind(new InetSocketAddress(lh, 0));
 535 
 536                 // send should not block
 537                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 538                 int n = dc1.send(bb, dc2.getLocalAddress());
 539                 assertTrue(n > 0);
 540 
 541                 // receive should not block
 542                 bb = ByteBuffer.allocate(10);
 543                 dc2.receive(bb);
 544                 assertTrue(bb.get(0) == 'X');
 545             }
 546         });
 547     }
 548 
 549     /**
 550      * Virtual thread blocks in DatagramChannel receive.
 551      */
 552     @ParameterizedTest
 553     @MethodSource("threadBuilders")
 554     void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception {
 555         VThreadRunner.run(builder, () -> {
 556             try (DatagramChannel dc1 = DatagramChannel.open();
 557                  DatagramChannel dc2 = DatagramChannel.open()) {
 558 
 559                 InetAddress lh = InetAddress.getLoopbackAddress();
 560                 dc2.bind(new InetSocketAddress(lh, 0));
 561 
 562                 // send from dc1 when current thread blocked in dc2.receive
 563                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 564                 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
 565 
 566                 // read from dc2 should block
 567                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 568                 dc2.receive(bb2);
 569                 assertTrue(bb2.get(0) == 'X');
 570             }
 571         });
 572     }
 573 
 574     /**
 575      * DatagramChannel close while virtual thread blocked in receive.
 576      */
 577     @ParameterizedTest
 578     @MethodSource("threadBuilders")
 579     void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 580         VThreadRunner.run(builder, () -> {
 581             try (DatagramChannel dc = DatagramChannel.open()) {
 582                 InetAddress lh = InetAddress.getLoopbackAddress();
 583                 dc.bind(new InetSocketAddress(lh, 0));
 584                 runAfterParkedAsync(dc::close);
 585                 try {
 586                     dc.receive(ByteBuffer.allocate(100));
 587                     fail("receive returned");
 588                 } catch (AsynchronousCloseException expected) { }
 589             }
 590         });
 591     }
 592 
 593     /**
 594      * Virtual thread interrupted while blocked in DatagramChannel receive.
 595      */
 596     @ParameterizedTest
 597     @MethodSource("threadBuilders")
 598     void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 599         VThreadRunner.run(builder, () -> {
 600             try (DatagramChannel dc = DatagramChannel.open()) {
 601                 InetAddress lh = InetAddress.getLoopbackAddress();
 602                 dc.bind(new InetSocketAddress(lh, 0));
 603 
 604                 // interrupt current thread when it blocks in receive
 605                 Thread thisThread = Thread.currentThread();
 606                 runAfterParkedAsync(thisThread::interrupt);
 607 
 608                 try {
 609                     dc.receive(ByteBuffer.allocate(100));
 610                     fail("receive returned");
 611                 } catch (ClosedByInterruptException expected) {
 612                     assertTrue(Thread.interrupted());
 613                 }
 614             }
 615         });
 616     }
 617 
 618     /**
 619      * Virtual thread blocks in DatagramSocket adaptor receive.
 620      */
 621     @ParameterizedTest
 622     @MethodSource("threadBuilders")
 623     void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception {
 624         testDatagramSocketAdaptorReceive(builder, 0);
 625     }
 626 
 627     /**
 628      * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
 629      */
 630     @ParameterizedTest
 631     @MethodSource("threadBuilders")
 632     void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception {
 633         testDatagramSocketAdaptorReceive(builder, 60_000);
 634     }
 635 
 636     private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder,
 637                                                   int timeout) throws Exception {
 638         VThreadRunner.run(builder, () -> {
 639             try (DatagramChannel dc1 = DatagramChannel.open();
 640                  DatagramChannel dc2 = DatagramChannel.open()) {
 641 
 642                 InetAddress lh = InetAddress.getLoopbackAddress();
 643                 dc2.bind(new InetSocketAddress(lh, 0));
 644 
 645                 // send from dc1 when current thread blocks in dc2 receive
 646                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 647                 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
 648 
 649                 // receive should block
 650                 byte[] array = new byte[100];
 651                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 652                 if (timeout > 0)
 653                     dc2.socket().setSoTimeout(timeout);
 654                 dc2.socket().receive(p);
 655                 assertTrue(p.getLength() == 3 && array[0] == 'X');
 656             }
 657         });
 658     }
 659 
 660     /**
 661      * DatagramChannel close while virtual thread blocked in adaptor receive.
 662      */
 663     @ParameterizedTest
 664     @MethodSource("threadBuilders")
 665     void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception {
 666         testDatagramSocketAdaptorReceiveAsyncClose(builder, 0);
 667     }
 668 
 669     /**
 670      * DatagramChannel close while virtual thread blocked in adaptor receive
 671      * with timeout.
 672      */
 673     @ParameterizedTest
 674     @MethodSource("threadBuilders")
 675     void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception {
 676         testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_000);
 677     }
 678 
 679     private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder,
 680                                                             int timeout) throws Exception {
 681         VThreadRunner.run(builder, () -> {
 682             try (DatagramChannel dc = DatagramChannel.open()) {
 683                 InetAddress lh = InetAddress.getLoopbackAddress();
 684                 dc.bind(new InetSocketAddress(lh, 0));
 685 
 686                 byte[] array = new byte[100];
 687                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 688                 if (timeout > 0)
 689                     dc.socket().setSoTimeout(timeout);
 690 
 691                 // close channel/socket when current thread blocks in receive
 692                 runAfterParkedAsync(dc::close);
 693 
 694                 assertThrows(SocketException.class, () -> dc.socket().receive(p));
 695             }
 696         });
 697     }
 698 
 699     /**
 700      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
 701      */
 702     @ParameterizedTest
 703     @MethodSource("threadBuilders")
 704     void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception {
 705         testDatagramSocketAdaptorReceiveInterrupt(builder, 0);
 706     }
 707 
 708     /**
 709      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
 710      * with timeout.
 711      */
 712     @ParameterizedTest
 713     @MethodSource("threadBuilders")
 714     void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception {
 715         testDatagramSocketAdaptorReceiveInterrupt(builder, 60_000);
 716     }
 717 
 718     private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder,
 719                                                            int timeout) throws Exception {
 720         VThreadRunner.run(builder, () -> {
 721             try (DatagramChannel dc = DatagramChannel.open()) {
 722                 InetAddress lh = InetAddress.getLoopbackAddress();
 723                 dc.bind(new InetSocketAddress(lh, 0));
 724 
 725                 byte[] array = new byte[100];
 726                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 727                 if (timeout > 0)
 728                     dc.socket().setSoTimeout(timeout);
 729 
 730                 // interrupt current thread when it blocks in receive
 731                 Thread thisThread = Thread.currentThread();
 732                 runAfterParkedAsync(thisThread::interrupt);
 733 
 734                 try {
 735                     dc.socket().receive(p);
 736                     fail();
 737                 } catch (ClosedByInterruptException expected) {
 738                     assertTrue(Thread.interrupted());
 739                 }
 740             }
 741         });
 742     }
 743 
 744     /**
 745      * Pipe read/write, no blocking.
 746      */
 747     @ParameterizedTest
 748     @MethodSource("threadBuilders")
 749     void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
 750         VThreadRunner.run(builder, () -> {
 751             Pipe p = Pipe.open();
 752             try (Pipe.SinkChannel sink = p.sink();
 753                  Pipe.SourceChannel source = p.source()) {
 754 
 755                 // write should not block
 756                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 757                 int n = sink.write(bb);
 758                 assertTrue(n > 0);
 759 
 760                 // read should not block
 761                 bb = ByteBuffer.allocate(10);
 762                 n = source.read(bb);
 763                 assertTrue(n > 0);
 764                 assertTrue(bb.get(0) == 'X');
 765             }
 766         });
 767     }
 768 
 769     /**
 770      * Virtual thread blocks in Pipe.SourceChannel read.
 771      */
 772     @ParameterizedTest
 773     @MethodSource("threadBuilders")
 774     void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception {
 775         VThreadRunner.run(builder, () -> {
 776             Pipe p = Pipe.open();
 777             try (Pipe.SinkChannel sink = p.sink();
 778                  Pipe.SourceChannel source = p.source()) {
 779 
 780                 // write from sink when current thread blocks reading from source
 781                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 782                 runAfterParkedAsync(() -> sink.write(bb1));
 783 
 784                 // read should block
 785                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 786                 int n = source.read(bb2);
 787                 assertTrue(n > 0);
 788                 assertTrue(bb2.get(0) == 'X');
 789             }
 790         });
 791     }
 792 
 793     /**
 794      * Virtual thread blocks in Pipe.SinkChannel write.
 795      */
 796     @ParameterizedTest
 797     @MethodSource("threadBuilders")
 798     void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception {
 799         VThreadRunner.run(builder, () -> {
 800             Pipe p = Pipe.open();
 801             try (Pipe.SinkChannel sink = p.sink();
 802                  Pipe.SourceChannel source = p.source()) {
 803 
 804                 // read from source to EOF when current thread blocking in write
 805                 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
 806 
 807                 // write to sink should block
 808                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
 809                 for (int i=0; i<1000; i++) {
 810                     int n = sink.write(bb);
 811                     assertTrue(n > 0);
 812                     bb.clear();
 813                 }
 814                 sink.close();
 815 
 816                 // wait for reader to finish
 817                 reader.join();
 818             }
 819         });
 820     }
 821 
 822     /**
 823      * Pipe.SourceChannel close while virtual thread blocked in read.
 824      */
 825     @ParameterizedTest
 826     @MethodSource("threadBuilders")
 827     void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 828         VThreadRunner.run(builder, () -> {
 829             Pipe p = Pipe.open();
 830             try (Pipe.SinkChannel sink = p.sink();
 831                  Pipe.SourceChannel source = p.source()) {
 832                 runAfterParkedAsync(source::close);
 833                 try {
 834                     int n = source.read(ByteBuffer.allocate(100));
 835                     fail("read returned " + n);
 836                 } catch (AsynchronousCloseException expected) { }
 837             }
 838         });
 839     }
 840 
 841     /**
 842      * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
 843      */
 844     @ParameterizedTest
 845     @MethodSource("threadBuilders")
 846     void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 847         VThreadRunner.run(builder, () -> {
 848             Pipe p = Pipe.open();
 849             try (Pipe.SinkChannel sink = p.sink();
 850                  Pipe.SourceChannel source = p.source()) {
 851 
 852                 // interrupt current thread when it blocks reading from source
 853                 Thread thisThread = Thread.currentThread();
 854                 runAfterParkedAsync(thisThread::interrupt);
 855 
 856                 try {
 857                     int n = source.read(ByteBuffer.allocate(100));
 858                     fail("read returned " + n);
 859                 } catch (ClosedByInterruptException expected) {
 860                     assertTrue(Thread.interrupted());
 861                 }
 862             }
 863         });
 864     }
 865 
 866     /**
 867      * Pipe.SinkChannel close while virtual thread blocked in write.
 868      */
 869     @ParameterizedTest
 870     @MethodSource("threadBuilders")
 871     void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 872         VThreadRunner.run(builder, () -> {
 873             boolean done = false;
 874             while (!done) {
 875                 Pipe p = Pipe.open();
 876                 try (Pipe.SinkChannel sink = p.sink();
 877                      Pipe.SourceChannel source = p.source()) {
 878 
 879                     // close sink when current thread blocks in write
 880                     runAfterParkedAsync(sink::close, true);
 881 
 882                     // write until channel is closed
 883                     try {
 884                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 885                         for (;;) {
 886                             int n = sink.write(bb);
 887                             assertTrue(n > 0);
 888                             bb.clear();
 889                         }
 890                     } catch (AsynchronousCloseException e) {
 891                         // closed when blocked in write
 892                         done = true;
 893                     } catch (ClosedChannelException e) {
 894                         // closed but not blocked in write, need to retry test
 895                         System.err.format("%s, need to retry!%n", e);
 896                     }
 897                 }
 898             }
 899         });
 900     }
 901 
 902     /**
 903      * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
 904      */
 905     @ParameterizedTest
 906     @MethodSource("threadBuilders")
 907     void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 908         VThreadRunner.run(builder, () -> {
 909             boolean done = false;
 910             while (!done) {
 911                 Pipe p = Pipe.open();
 912                 try (Pipe.SinkChannel sink = p.sink();
 913                      Pipe.SourceChannel source = p.source()) {
 914 
 915                     // interrupt current thread when it blocks in write
 916                     Thread thisThread = Thread.currentThread();
 917                     runAfterParkedAsync(thisThread::interrupt, true);
 918 
 919                     // write until channel is closed
 920                     try {
 921                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 922                         for (;;) {
 923                             int n = sink.write(bb);
 924                             assertTrue(n > 0);
 925                             bb.clear();
 926                         }
 927                     } catch (ClosedByInterruptException expected) {
 928                         // closed when blocked in write
< prev index next >