< prev index next >

test/jdk/java/nio/channels/vthread/BlockingChannelOps.java

Print this page

   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /*
  25  * @test id=default
  26  * @bug 8284161
  27  * @summary Test virtual threads doing blocking I/O on NIO channels
  28  * @library /test/lib
  29  * @run junit/timeout=480 BlockingChannelOps
  30  */
  31 
  32 /*
  33  * @test id=poller-modes
  34  * @requires (os.family == "linux") | (os.family == "mac")
  35  * @library /test/lib
  36  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
  37  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps










  38  */
  39 
  40 /*
  41  * @test id=no-vmcontinuations
  42  * @requires vm.continuations
  43  * @library /test/lib
  44  * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
  45  */
  46 
  47 import java.io.Closeable;
  48 import java.io.IOException;
  49 import java.net.DatagramPacket;
  50 import java.net.InetAddress;
  51 import java.net.InetSocketAddress;
  52 import java.net.Socket;
  53 import java.net.SocketAddress;
  54 import java.net.SocketException;
  55 import java.nio.ByteBuffer;
  56 import java.nio.channels.AsynchronousCloseException;
  57 import java.nio.channels.ClosedByInterruptException;
  58 import java.nio.channels.ClosedChannelException;
  59 import java.nio.channels.DatagramChannel;
  60 import java.nio.channels.Pipe;
  61 import java.nio.channels.ReadableByteChannel;
  62 import java.nio.channels.ServerSocketChannel;
  63 import java.nio.channels.SocketChannel;
  64 import java.nio.channels.WritableByteChannel;



  65 import java.util.concurrent.locks.LockSupport;

  66 
  67 import jdk.test.lib.thread.VThreadRunner;
  68 import org.junit.jupiter.api.Test;




  69 import static org.junit.jupiter.api.Assertions.*;
  70 
  71 class BlockingChannelOps {




















  72 
  73     /**
  74      * SocketChannel read/write, no blocking.
  75      */
  76     @Test
  77     void testSocketChannelReadWrite1() throws Exception {
  78         VThreadRunner.run(() -> {

  79             try (var connection = new Connection()) {
  80                 SocketChannel sc1 = connection.channel1();
  81                 SocketChannel sc2 = connection.channel2();
  82 
  83                 // write to sc1
  84                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
  85                 int n = sc1.write(bb);
  86                 assertTrue(n > 0);
  87 
  88                 // read from sc2 should not block
  89                 bb = ByteBuffer.allocate(10);
  90                 n = sc2.read(bb);
  91                 assertTrue(n > 0);
  92                 assertTrue(bb.get(0) == 'X');
  93             }
  94         });
  95     }
  96 
  97     /**
  98      * Virtual thread blocks in SocketChannel read.
  99      */
 100     @Test
 101     void testSocketChannelRead() throws Exception {
 102         VThreadRunner.run(() -> {

 103             try (var connection = new Connection()) {
 104                 SocketChannel sc1 = connection.channel1();
 105                 SocketChannel sc2 = connection.channel2();
 106 
 107                 // write to sc1 when current thread blocks in sc2.read
 108                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 109                 runAfterParkedAsync(() -> sc1.write(bb1));
 110 
 111                 // read from sc2 should block
 112                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 113                 int n = sc2.read(bb2);
 114                 assertTrue(n > 0);
 115                 assertTrue(bb2.get(0) == 'X');
 116             }
 117         });
 118     }
 119 
 120     /**
 121      * Virtual thread blocks in SocketChannel write.
 122      */
 123     @Test
 124     void testSocketChannelWrite() throws Exception {
 125         VThreadRunner.run(() -> {

 126             try (var connection = new Connection()) {
 127                 SocketChannel sc1 = connection.channel1();
 128                 SocketChannel sc2 = connection.channel2();
 129 
 130                 // read from sc2 to EOF when current thread blocks in sc1.write
 131                 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
 132 
 133                 // write to sc1 should block
 134                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
 135                 for (int i=0; i<1000; i++) {
 136                     int n = sc1.write(bb);
 137                     assertTrue(n > 0);
 138                     bb.clear();
 139                 }
 140                 sc1.close();
 141 
 142                 // wait for reader to finish
 143                 reader.join();
 144             }
 145         });
 146     }
 147 
 148     /**
 149      * SocketChannel close while virtual thread blocked in read.
 150      */
 151     @Test
 152     void testSocketChannelReadAsyncClose() throws Exception {
 153         VThreadRunner.run(() -> {

 154             try (var connection = new Connection()) {
 155                 SocketChannel sc = connection.channel1();
 156                 runAfterParkedAsync(sc::close);
 157                 try {
 158                     int n = sc.read(ByteBuffer.allocate(100));
 159                     fail("read returned " + n);
 160                 } catch (AsynchronousCloseException expected) { }
 161             }
 162         });
 163     }
 164 
 165     /**
 166      * SocketChannel shutdownInput while virtual thread blocked in read.
 167      */
 168     @Test
 169     void testSocketChannelReadAsyncShutdownInput() throws Exception {
 170         VThreadRunner.run(() -> {

 171             try (var connection = new Connection()) {
 172                 SocketChannel sc = connection.channel1();
 173                 runAfterParkedAsync(sc::shutdownInput);
 174                 int n = sc.read(ByteBuffer.allocate(100));
 175                 assertEquals(-1, n);
 176                 assertTrue(sc.isOpen());
 177             }
 178         });
 179     }
 180 
 181     /**
 182      * Virtual thread interrupted while blocked in SocketChannel read.
 183      */
 184     @Test
 185     void testSocketChannelReadInterrupt() throws Exception {
 186         VThreadRunner.run(() -> {

 187             try (var connection = new Connection()) {
 188                 SocketChannel sc = connection.channel1();
 189 
 190                 // interrupt current thread when it blocks in read
 191                 Thread thisThread = Thread.currentThread();
 192                 runAfterParkedAsync(thisThread::interrupt);
 193 
 194                 try {
 195                     int n = sc.read(ByteBuffer.allocate(100));
 196                     fail("read returned " + n);
 197                 } catch (ClosedByInterruptException expected) {
 198                     assertTrue(Thread.interrupted());
 199                 }
 200             }
 201         });
 202     }
 203 
 204     /**
 205      * SocketChannel close while virtual thread blocked in write.
 206      */
 207     @Test
 208     void testSocketChannelWriteAsyncClose() throws Exception {
 209         VThreadRunner.run(() -> {

 210             boolean done = false;
 211             while (!done) {
 212                 try (var connection = new Connection()) {
 213                     SocketChannel sc = connection.channel1();
 214 
 215                     // close sc when current thread blocks in write
 216                     runAfterParkedAsync(sc::close, true);
 217 
 218                     // write until channel is closed
 219                     try {
 220                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 221                         for (;;) {
 222                             int n = sc.write(bb);
 223                             assertTrue(n > 0);
 224                             bb.clear();
 225                         }
 226                     } catch (AsynchronousCloseException expected) {
 227                         // closed when blocked in write
 228                         done = true;
 229                     } catch (ClosedChannelException e) {
 230                         // closed but not blocked in write, need to retry test
 231                         System.err.format("%s, need to retry!%n", e);
 232                     }
 233                 }
 234             }
 235         });
 236     }
 237 
 238 
 239     /**
 240      * SocketChannel shutdownOutput while virtual thread blocked in write.
 241      */
 242     @Test
 243     void testSocketChannelWriteAsyncShutdownOutput() throws Exception {
 244         VThreadRunner.run(() -> {

 245             try (var connection = new Connection()) {
 246                 SocketChannel sc = connection.channel1();
 247 
 248                 // shutdown output when current thread blocks in write
 249                 runAfterParkedAsync(sc::shutdownOutput);
 250                 try {
 251                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
 252                     for (;;) {
 253                         int n = sc.write(bb);
 254                         assertTrue(n > 0);
 255                         bb.clear();
 256                     }
 257                 } catch (ClosedChannelException e) {
 258                     // expected
 259                 }
 260                 assertTrue(sc.isOpen());
 261             }
 262         });
 263     }
 264 
 265     /**
 266      * Virtual thread interrupted while blocked in SocketChannel write.
 267      */
 268     @Test
 269     void testSocketChannelWriteInterrupt() throws Exception {
 270         VThreadRunner.run(() -> {

 271             boolean done = false;
 272             while (!done) {
 273                 try (var connection = new Connection()) {
 274                     SocketChannel sc = connection.channel1();
 275 
 276                     // interrupt current thread when it blocks in write
 277                     Thread thisThread = Thread.currentThread();
 278                     runAfterParkedAsync(thisThread::interrupt, true);
 279 
 280                     // write until channel is closed
 281                     try {
 282                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 283                         for (;;) {
 284                             int n = sc.write(bb);
 285                             assertTrue(n > 0);
 286                             bb.clear();
 287                         }
 288                     } catch (ClosedByInterruptException e) {
 289                         // closed when blocked in write
 290                         assertTrue(Thread.interrupted());
 291                         done = true;
 292                     } catch (ClosedChannelException e) {
 293                         // closed but not blocked in write, need to retry test
 294                         System.err.format("%s, need to retry!%n", e);
 295                     }
 296                 }
 297             }
 298         });
 299     }
 300 
 301     /**
 302      * Virtual thread blocks in SocketChannel adaptor read.
 303      */
 304     @Test
 305     void testSocketAdaptorRead1() throws Exception {
 306         testSocketAdaptorRead(0);

 307     }
 308 
 309     /**
 310      * Virtual thread blocks in SocketChannel adaptor read with timeout.
 311      */
 312     @Test
 313     void testSocketAdaptorRead2() throws Exception {
 314         testSocketAdaptorRead(60_000);

 315     }
 316 
 317     private void testSocketAdaptorRead(int timeout) throws Exception {
 318         VThreadRunner.run(() -> {

 319             try (var connection = new Connection()) {
 320                 SocketChannel sc1 = connection.channel1();
 321                 SocketChannel sc2 = connection.channel2();
 322 
 323                 // write to sc1 when currnet thread blocks reading from sc2
 324                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 325                 runAfterParkedAsync(() -> sc1.write(bb));
 326 
 327                 // read from sc2 should block
 328                 byte[] array = new byte[100];
 329                 if (timeout > 0)
 330                     sc2.socket().setSoTimeout(timeout);
 331                 int n = sc2.socket().getInputStream().read(array);
 332                 assertTrue(n > 0);
 333                 assertTrue(array[0] == 'X');
 334             }
 335         });
 336     }
 337 
 338     /**
 339      * ServerSocketChannel accept, no blocking.
 340      */
 341     @Test
 342     void testServerSocketChannelAccept1() throws Exception {
 343         VThreadRunner.run(() -> {

 344             try (var ssc = ServerSocketChannel.open()) {
 345                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 346                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
 347                 // accept should not block
 348                 var sc2 = ssc.accept();
 349                 sc1.close();
 350                 sc2.close();
 351             }
 352         });
 353     }
 354 
 355     /**
 356      * Virtual thread blocks in ServerSocketChannel accept.
 357      */
 358     @Test
 359     void testServerSocketChannelAccept2() throws Exception {
 360         VThreadRunner.run(() -> {

 361             try (var ssc = ServerSocketChannel.open()) {
 362                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 363                 var sc1 = SocketChannel.open();
 364 
 365                 // connect when current thread when it blocks in accept
 366                 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
 367 
 368                 // accept should block
 369                 var sc2 = ssc.accept();
 370                 sc1.close();
 371                 sc2.close();
 372             }
 373         });
 374     }
 375 
 376     /**
 377      * SeverSocketChannel close while virtual thread blocked in accept.
 378      */
 379     @Test
 380     void testServerSocketChannelAcceptAsyncClose() throws Exception {
 381         VThreadRunner.run(() -> {

 382             try (var ssc = ServerSocketChannel.open()) {
 383                 InetAddress lh = InetAddress.getLoopbackAddress();
 384                 ssc.bind(new InetSocketAddress(lh, 0));
 385                 runAfterParkedAsync(ssc::close);
 386                 try {
 387                     SocketChannel sc = ssc.accept();
 388                     sc.close();
 389                     fail("connection accepted???");
 390                 } catch (AsynchronousCloseException expected) { }
 391             }
 392         });
 393     }
 394 
 395     /**
 396      * Virtual thread interrupted while blocked in ServerSocketChannel accept.
 397      */
 398     @Test
 399     void testServerSocketChannelAcceptInterrupt() throws Exception {
 400         VThreadRunner.run(() -> {

 401             try (var ssc = ServerSocketChannel.open()) {
 402                 InetAddress lh = InetAddress.getLoopbackAddress();
 403                 ssc.bind(new InetSocketAddress(lh, 0));
 404 
 405                 // interrupt current thread when it blocks in accept
 406                 Thread thisThread = Thread.currentThread();
 407                 runAfterParkedAsync(thisThread::interrupt);
 408 
 409                 try {
 410                     SocketChannel sc = ssc.accept();
 411                     sc.close();
 412                     fail("connection accepted???");
 413                 } catch (ClosedByInterruptException expected) {
 414                     assertTrue(Thread.interrupted());
 415                 }
 416             }
 417         });
 418     }
 419 
 420     /**
 421      * Virtual thread blocks in ServerSocketChannel adaptor accept.
 422      */
 423     @Test
 424     void testSocketChannelAdaptorAccept1() throws Exception {
 425         testSocketChannelAdaptorAccept(0);

 426     }
 427 
 428     /**
 429      * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
 430      */
 431     @Test
 432     void testSocketChannelAdaptorAccept2() throws Exception {
 433         testSocketChannelAdaptorAccept(60_000);

 434     }
 435 
 436     private void testSocketChannelAdaptorAccept(int timeout) throws Exception {
 437         VThreadRunner.run(() -> {

 438             try (var ssc = ServerSocketChannel.open()) {
 439                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 440                 var sc = SocketChannel.open();
 441 
 442                 // interrupt current thread when it blocks in accept
 443                 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
 444 
 445                 // accept should block
 446                 if (timeout > 0)
 447                     ssc.socket().setSoTimeout(timeout);
 448                 Socket s = ssc.socket().accept();
 449                 sc.close();
 450                 s.close();
 451             }
 452         });
 453     }
 454 
 455     /**
 456      * DatagramChannel receive/send, no blocking.
 457      */
 458     @Test
 459     void testDatagramChannelSendReceive1() throws Exception {
 460         VThreadRunner.run(() -> {

 461             try (DatagramChannel dc1 = DatagramChannel.open();
 462                  DatagramChannel dc2 = DatagramChannel.open()) {
 463 
 464                 InetAddress lh = InetAddress.getLoopbackAddress();
 465                 dc2.bind(new InetSocketAddress(lh, 0));
 466 
 467                 // send should not block
 468                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 469                 int n = dc1.send(bb, dc2.getLocalAddress());
 470                 assertTrue(n > 0);
 471 
 472                 // receive should not block
 473                 bb = ByteBuffer.allocate(10);
 474                 dc2.receive(bb);
 475                 assertTrue(bb.get(0) == 'X');
 476             }
 477         });
 478     }
 479 
 480     /**
 481      * Virtual thread blocks in DatagramChannel receive.
 482      */
 483     @Test
 484     void testDatagramChannelSendReceive2() throws Exception {
 485         VThreadRunner.run(() -> {

 486             try (DatagramChannel dc1 = DatagramChannel.open();
 487                  DatagramChannel dc2 = DatagramChannel.open()) {
 488 
 489                 InetAddress lh = InetAddress.getLoopbackAddress();
 490                 dc2.bind(new InetSocketAddress(lh, 0));
 491 
 492                 // send from dc1 when current thread blocked in dc2.receive
 493                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 494                 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
 495 
 496                 // read from dc2 should block
 497                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 498                 dc2.receive(bb2);
 499                 assertTrue(bb2.get(0) == 'X');
 500             }
 501         });
 502     }
 503 
 504     /**
 505      * DatagramChannel close while virtual thread blocked in receive.
 506      */
 507     @Test
 508     void testDatagramChannelReceiveAsyncClose() throws Exception {
 509         VThreadRunner.run(() -> {

 510             try (DatagramChannel dc = DatagramChannel.open()) {
 511                 InetAddress lh = InetAddress.getLoopbackAddress();
 512                 dc.bind(new InetSocketAddress(lh, 0));
 513                 runAfterParkedAsync(dc::close);
 514                 try {
 515                     dc.receive(ByteBuffer.allocate(100));
 516                     fail("receive returned");
 517                 } catch (AsynchronousCloseException expected) { }
 518             }
 519         });
 520     }
 521 
 522     /**
 523      * Virtual thread interrupted while blocked in DatagramChannel receive.
 524      */
 525     @Test
 526     void testDatagramChannelReceiveInterrupt() throws Exception {
 527         VThreadRunner.run(() -> {

 528             try (DatagramChannel dc = DatagramChannel.open()) {
 529                 InetAddress lh = InetAddress.getLoopbackAddress();
 530                 dc.bind(new InetSocketAddress(lh, 0));
 531 
 532                 // interrupt current thread when it blocks in receive
 533                 Thread thisThread = Thread.currentThread();
 534                 runAfterParkedAsync(thisThread::interrupt);
 535 
 536                 try {
 537                     dc.receive(ByteBuffer.allocate(100));
 538                     fail("receive returned");
 539                 } catch (ClosedByInterruptException expected) {
 540                     assertTrue(Thread.interrupted());
 541                 }
 542             }
 543         });
 544     }
 545 
 546     /**
 547      * Virtual thread blocks in DatagramSocket adaptor receive.
 548      */
 549     @Test
 550     void testDatagramSocketAdaptorReceive1() throws Exception {
 551         testDatagramSocketAdaptorReceive(0);

 552     }
 553 
 554     /**
 555      * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
 556      */
 557     @Test
 558     void testDatagramSocketAdaptorReceive2() throws Exception {
 559         testDatagramSocketAdaptorReceive(60_000);

 560     }
 561 
 562     private void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
 563         VThreadRunner.run(() -> {

 564             try (DatagramChannel dc1 = DatagramChannel.open();
 565                  DatagramChannel dc2 = DatagramChannel.open()) {
 566 
 567                 InetAddress lh = InetAddress.getLoopbackAddress();
 568                 dc2.bind(new InetSocketAddress(lh, 0));
 569 
 570                 // send from dc1 when current thread blocks in dc2 receive
 571                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 572                 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
 573 
 574                 // receive should block
 575                 byte[] array = new byte[100];
 576                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 577                 if (timeout > 0)
 578                     dc2.socket().setSoTimeout(timeout);
 579                 dc2.socket().receive(p);
 580                 assertTrue(p.getLength() == 3 && array[0] == 'X');
 581             }
 582         });
 583     }
 584 
 585     /**
 586      * DatagramChannel close while virtual thread blocked in adaptor receive.
 587      */
 588     @Test
 589     void testDatagramSocketAdaptorReceiveAsyncClose1() throws Exception {
 590         testDatagramSocketAdaptorReceiveAsyncClose(0);

 591     }
 592 
 593     /**
 594      * DatagramChannel close while virtual thread blocked in adaptor receive
 595      * with timeout.
 596      */
 597     @Test
 598     void testDatagramSocketAdaptorReceiveAsyncClose2() throws Exception {
 599         testDatagramSocketAdaptorReceiveAsyncClose(60_1000);

 600     }
 601 
 602     private void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
 603         VThreadRunner.run(() -> {

 604             try (DatagramChannel dc = DatagramChannel.open()) {
 605                 InetAddress lh = InetAddress.getLoopbackAddress();
 606                 dc.bind(new InetSocketAddress(lh, 0));
 607 
 608                 byte[] array = new byte[100];
 609                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 610                 if (timeout > 0)
 611                     dc.socket().setSoTimeout(timeout);
 612 
 613                 // close channel/socket when current thread blocks in receive
 614                 runAfterParkedAsync(dc::close);
 615 
 616                 assertThrows(SocketException.class, () -> dc.socket().receive(p));
 617             }
 618         });
 619     }
 620 
 621     /**
 622      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
 623      */
 624     @Test
 625     void testDatagramSocketAdaptorReceiveInterrupt1() throws Exception {
 626         testDatagramSocketAdaptorReceiveInterrupt(0);

 627     }
 628 
 629     /**
 630      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
 631      * with timeout.
 632      */
 633     @Test
 634     void testDatagramSocketAdaptorReceiveInterrupt2() throws Exception {
 635         testDatagramSocketAdaptorReceiveInterrupt(60_1000);

 636     }
 637 
 638     private void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
 639         VThreadRunner.run(() -> {

 640             try (DatagramChannel dc = DatagramChannel.open()) {
 641                 InetAddress lh = InetAddress.getLoopbackAddress();
 642                 dc.bind(new InetSocketAddress(lh, 0));
 643 
 644                 byte[] array = new byte[100];
 645                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 646                 if (timeout > 0)
 647                     dc.socket().setSoTimeout(timeout);
 648 
 649                 // interrupt current thread when it blocks in receive
 650                 Thread thisThread = Thread.currentThread();
 651                 runAfterParkedAsync(thisThread::interrupt);
 652 
 653                 try {
 654                     dc.socket().receive(p);
 655                     fail();
 656                 } catch (ClosedByInterruptException expected) {
 657                     assertTrue(Thread.interrupted());
 658                 }
 659             }
 660         });
 661     }
 662 
 663     /**
 664      * Pipe read/write, no blocking.
 665      */
 666     @Test
 667     void testPipeReadWrite1() throws Exception {
 668         VThreadRunner.run(() -> {

 669             Pipe p = Pipe.open();
 670             try (Pipe.SinkChannel sink = p.sink();
 671                  Pipe.SourceChannel source = p.source()) {
 672 
 673                 // write should not block
 674                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 675                 int n = sink.write(bb);
 676                 assertTrue(n > 0);
 677 
 678                 // read should not block
 679                 bb = ByteBuffer.allocate(10);
 680                 n = source.read(bb);
 681                 assertTrue(n > 0);
 682                 assertTrue(bb.get(0) == 'X');
 683             }
 684         });
 685     }
 686 
 687     /**
 688      * Virtual thread blocks in Pipe.SourceChannel read.
 689      */
 690     @Test
 691     void testPipeReadWrite2() throws Exception {
 692         VThreadRunner.run(() -> {

 693             Pipe p = Pipe.open();
 694             try (Pipe.SinkChannel sink = p.sink();
 695                  Pipe.SourceChannel source = p.source()) {
 696 
 697                 // write from sink when current thread blocks reading from source
 698                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 699                 runAfterParkedAsync(() -> sink.write(bb1));
 700 
 701                 // read should block
 702                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 703                 int n = source.read(bb2);
 704                 assertTrue(n > 0);
 705                 assertTrue(bb2.get(0) == 'X');
 706             }
 707         });
 708     }
 709 
 710     /**
 711      * Virtual thread blocks in Pipe.SinkChannel write.
 712      */
 713     @Test
 714     void testPipeReadWrite3() throws Exception {
 715         VThreadRunner.run(() -> {

 716             Pipe p = Pipe.open();
 717             try (Pipe.SinkChannel sink = p.sink();
 718                  Pipe.SourceChannel source = p.source()) {
 719 
 720                 // read from source to EOF when current thread blocking in write
 721                 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
 722 
 723                 // write to sink should block
 724                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
 725                 for (int i=0; i<1000; i++) {
 726                     int n = sink.write(bb);
 727                     assertTrue(n > 0);
 728                     bb.clear();
 729                 }
 730                 sink.close();
 731 
 732                 // wait for reader to finish
 733                 reader.join();
 734             }
 735         });
 736     }
 737 
 738     /**
 739      * Pipe.SourceChannel close while virtual thread blocked in read.
 740      */
 741     @Test
 742     void testPipeReadAsyncClose() throws Exception {
 743         VThreadRunner.run(() -> {

 744             Pipe p = Pipe.open();
 745             try (Pipe.SinkChannel sink = p.sink();
 746                  Pipe.SourceChannel source = p.source()) {
 747                 runAfterParkedAsync(source::close);
 748                 try {
 749                     int n = source.read(ByteBuffer.allocate(100));
 750                     fail("read returned " + n);
 751                 } catch (AsynchronousCloseException expected) { }
 752             }
 753         });
 754     }
 755 
 756     /**
 757      * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
 758      */
 759     @Test
 760     void testPipeReadInterrupt() throws Exception {
 761         VThreadRunner.run(() -> {

 762             Pipe p = Pipe.open();
 763             try (Pipe.SinkChannel sink = p.sink();
 764                  Pipe.SourceChannel source = p.source()) {
 765 
 766                 // interrupt current thread when it blocks reading from source
 767                 Thread thisThread = Thread.currentThread();
 768                 runAfterParkedAsync(thisThread::interrupt);
 769 
 770                 try {
 771                     int n = source.read(ByteBuffer.allocate(100));
 772                     fail("read returned " + n);
 773                 } catch (ClosedByInterruptException expected) {
 774                     assertTrue(Thread.interrupted());
 775                 }
 776             }
 777         });
 778     }
 779 
 780     /**
 781      * Pipe.SinkChannel close while virtual thread blocked in write.
 782      */
 783     @Test
 784     void testPipeWriteAsyncClose() throws Exception {
 785         VThreadRunner.run(() -> {

 786             boolean done = false;
 787             while (!done) {
 788                 Pipe p = Pipe.open();
 789                 try (Pipe.SinkChannel sink = p.sink();
 790                      Pipe.SourceChannel source = p.source()) {
 791 
 792                     // close sink when current thread blocks in write
 793                     runAfterParkedAsync(sink::close, true);
 794 
 795                     // write until channel is closed
 796                     try {
 797                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 798                         for (;;) {
 799                             int n = sink.write(bb);
 800                             assertTrue(n > 0);
 801                             bb.clear();
 802                         }
 803                     } catch (AsynchronousCloseException e) {
 804                         // closed when blocked in write
 805                         done = true;
 806                     } catch (ClosedChannelException e) {
 807                         // closed but not blocked in write, need to retry test
 808                         System.err.format("%s, need to retry!%n", e);
 809                     }
 810                 }
 811             }
 812         });
 813     }
 814 
 815     /**
 816      * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
 817      */
 818     @Test
 819     void testPipeWriteInterrupt() throws Exception {
 820         VThreadRunner.run(() -> {

 821             boolean done = false;
 822             while (!done) {
 823                 Pipe p = Pipe.open();
 824                 try (Pipe.SinkChannel sink = p.sink();
 825                      Pipe.SourceChannel source = p.source()) {
 826 
 827                     // interrupt current thread when it blocks in write
 828                     Thread thisThread = Thread.currentThread();
 829                     runAfterParkedAsync(thisThread::interrupt, true);
 830 
 831                     // write until channel is closed
 832                     try {
 833                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 834                         for (;;) {
 835                             int n = sink.write(bb);
 836                             assertTrue(n > 0);
 837                             bb.clear();
 838                         }
 839                     } catch (ClosedByInterruptException expected) {
 840                         // closed when blocked in write

   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /*
  25  * @test id=default
  26  * @bug 8284161
  27  * @summary Test virtual threads doing blocking I/O on NIO channels
  28  * @library /test/lib
  29  * @run junit/othervm/timeout=480 BlockingChannelOps
  30  */
  31 
  32 /*
  33  * @test id=poller-modes
  34  * @requires (os.family == "linux") | (os.family == "mac")
  35  * @library /test/lib
  36  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
  37  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
  38  * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 BlockingChannelOps
  39  */
  40 
  41 /*
  42  * @test id=io_uring
  43  * @requires os.family == "linux"
  44  * @library /test/lib
  45  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring=true BlockingChannelOps
  46  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring=true BlockingChannelOps
  47  * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring=true BlockingChannelOps
  48  */
  49 
  50 /*
  51  * @test id=no-vmcontinuations
  52  * @requires vm.continuations
  53  * @library /test/lib
  54  * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
  55  */
  56 
  57 import java.io.Closeable;
  58 import java.io.IOException;
  59 import java.net.DatagramPacket;
  60 import java.net.InetAddress;
  61 import java.net.InetSocketAddress;
  62 import java.net.Socket;
  63 import java.net.SocketAddress;
  64 import java.net.SocketException;
  65 import java.nio.ByteBuffer;
  66 import java.nio.channels.AsynchronousCloseException;
  67 import java.nio.channels.ClosedByInterruptException;
  68 import java.nio.channels.ClosedChannelException;
  69 import java.nio.channels.DatagramChannel;
  70 import java.nio.channels.Pipe;
  71 import java.nio.channels.ReadableByteChannel;
  72 import java.nio.channels.ServerSocketChannel;
  73 import java.nio.channels.SocketChannel;
  74 import java.nio.channels.WritableByteChannel;
  75 import java.util.concurrent.ExecutorService;
  76 import java.util.concurrent.Executors;
  77 import java.util.concurrent.ThreadFactory;
  78 import java.util.concurrent.locks.LockSupport;
  79 import java.util.stream.Stream;
  80 
  81 import jdk.test.lib.thread.VThreadRunner;
  82 import jdk.test.lib.thread.VThreadScheduler;
  83 
  84 import org.junit.jupiter.api.BeforeAll;
  85 import org.junit.jupiter.params.ParameterizedTest;
  86 import org.junit.jupiter.params.provider.MethodSource;
  87 import static org.junit.jupiter.api.Assertions.*;
  88 
  89 class BlockingChannelOps {
  90     private static ExecutorService threadPool;
  91     private static Thread.VirtualThreadScheduler customScheduler;
  92 
  93     @BeforeAll
  94     static void setup() {
  95         ThreadFactory factory = Thread.ofPlatform().daemon().factory();
  96         threadPool = Executors.newCachedThreadPool(factory);
  97         customScheduler = (_, task) -> threadPool.execute(task);
  98     }
  99 
 100     /**
 101      * Returns the Thread.Builder to create the virtual thread.
 102      */
 103     static Stream<Thread.Builder.OfVirtual> threadBuilders() {
 104         if (VThreadScheduler.supportsCustomScheduler()) {
 105             return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler));
 106         } else {
 107             return Stream.of(Thread.ofVirtual());
 108         }
 109     }
 110 
 111     /**
 112      * SocketChannel read/write, no blocking.
 113      */
 114     @ParameterizedTest
 115     @MethodSource("threadBuilders")
 116     void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
 117         VThreadRunner.run(builder, () -> {
 118             try (var connection = new Connection()) {
 119                 SocketChannel sc1 = connection.channel1();
 120                 SocketChannel sc2 = connection.channel2();
 121 
 122                 // write to sc1
 123                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 124                 int n = sc1.write(bb);
 125                 assertTrue(n > 0);
 126 
 127                 // read from sc2 should not block
 128                 bb = ByteBuffer.allocate(10);
 129                 n = sc2.read(bb);
 130                 assertTrue(n > 0);
 131                 assertTrue(bb.get(0) == 'X');
 132             }
 133         });
 134     }
 135 
 136     /**
 137      * Virtual thread blocks in SocketChannel read.
 138      */
 139     @ParameterizedTest
 140     @MethodSource("threadBuilders")
 141     void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception {
 142         VThreadRunner.run(builder, () -> {
 143             try (var connection = new Connection()) {
 144                 SocketChannel sc1 = connection.channel1();
 145                 SocketChannel sc2 = connection.channel2();
 146 
 147                 // write to sc1 when current thread blocks in sc2.read
 148                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 149                 runAfterParkedAsync(() -> sc1.write(bb1));
 150 
 151                 // read from sc2 should block
 152                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 153                 int n = sc2.read(bb2);
 154                 assertTrue(n > 0);
 155                 assertTrue(bb2.get(0) == 'X');
 156             }
 157         });
 158     }
 159 
 160     /**
 161      * Virtual thread blocks in SocketChannel write.
 162      */
 163     @ParameterizedTest
 164     @MethodSource("threadBuilders")
 165     void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception {
 166         VThreadRunner.run(builder, () -> {
 167             try (var connection = new Connection()) {
 168                 SocketChannel sc1 = connection.channel1();
 169                 SocketChannel sc2 = connection.channel2();
 170 
 171                 // read from sc2 to EOF when current thread blocks in sc1.write
 172                 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
 173 
 174                 // write to sc1 should block
 175                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
 176                 for (int i=0; i<1000; i++) {
 177                     int n = sc1.write(bb);
 178                     assertTrue(n > 0);
 179                     bb.clear();
 180                 }
 181                 sc1.close();
 182 
 183                 // wait for reader to finish
 184                 reader.join();
 185             }
 186         });
 187     }
 188 
 189     /**
 190      * SocketChannel close while virtual thread blocked in read.
 191      */
 192     @ParameterizedTest
 193     @MethodSource("threadBuilders")
 194     void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 195         VThreadRunner.run(builder, () -> {
 196             try (var connection = new Connection()) {
 197                 SocketChannel sc = connection.channel1();
 198                 runAfterParkedAsync(sc::close);
 199                 try {
 200                     int n = sc.read(ByteBuffer.allocate(100));
 201                     fail("read returned " + n);
 202                 } catch (AsynchronousCloseException expected) { }
 203             }
 204         });
 205     }
 206 
 207     /**
 208      * SocketChannel shutdownInput while virtual thread blocked in read.
 209      */
 210     @ParameterizedTest
 211     @MethodSource("threadBuilders")
 212     void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception {
 213         VThreadRunner.run(builder, () -> {
 214             try (var connection = new Connection()) {
 215                 SocketChannel sc = connection.channel1();
 216                 runAfterParkedAsync(sc::shutdownInput);
 217                 int n = sc.read(ByteBuffer.allocate(100));
 218                 assertEquals(-1, n);
 219                 assertTrue(sc.isOpen());
 220             }
 221         });
 222     }
 223 
 224     /**
 225      * Virtual thread interrupted while blocked in SocketChannel read.
 226      */
 227     @ParameterizedTest
 228     @MethodSource("threadBuilders")
 229     void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 230         VThreadRunner.run(builder, () -> {
 231             try (var connection = new Connection()) {
 232                 SocketChannel sc = connection.channel1();
 233 
 234                 // interrupt current thread when it blocks in read
 235                 Thread thisThread = Thread.currentThread();
 236                 runAfterParkedAsync(thisThread::interrupt);
 237 
 238                 try {
 239                     int n = sc.read(ByteBuffer.allocate(100));
 240                     fail("read returned " + n);
 241                 } catch (ClosedByInterruptException expected) {
 242                     assertTrue(Thread.interrupted());
 243                 }
 244             }
 245         });
 246     }
 247 
 248     /**
 249      * SocketChannel close while virtual thread blocked in write.
 250      */
 251     @ParameterizedTest
 252     @MethodSource("threadBuilders")
 253     void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 254         VThreadRunner.run(builder, () -> {
 255             boolean done = false;
 256             while (!done) {
 257                 try (var connection = new Connection()) {
 258                     SocketChannel sc = connection.channel1();
 259 
 260                     // close sc when current thread blocks in write
 261                     runAfterParkedAsync(sc::close, true);
 262 
 263                     // write until channel is closed
 264                     try {
 265                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 266                         for (;;) {
 267                             int n = sc.write(bb);
 268                             assertTrue(n > 0);
 269                             bb.clear();
 270                         }
 271                     } catch (AsynchronousCloseException expected) {
 272                         // closed when blocked in write
 273                         done = true;
 274                     } catch (ClosedChannelException e) {
 275                         // closed but not blocked in write, need to retry test
 276                         System.err.format("%s, need to retry!%n", e);
 277                     }
 278                 }
 279             }
 280         });
 281     }
 282 

 283     /**
 284      * SocketChannel shutdownOutput while virtual thread blocked in write.
 285      */
 286     @ParameterizedTest
 287     @MethodSource("threadBuilders")
 288     void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception {
 289         VThreadRunner.run(builder, () -> {
 290             try (var connection = new Connection()) {
 291                 SocketChannel sc = connection.channel1();
 292 
 293                 // shutdown output when current thread blocks in write
 294                 runAfterParkedAsync(sc::shutdownOutput);
 295                 try {
 296                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
 297                     for (;;) {
 298                         int n = sc.write(bb);
 299                         assertTrue(n > 0);
 300                         bb.clear();
 301                     }
 302                 } catch (ClosedChannelException e) {
 303                     // expected
 304                 }
 305                 assertTrue(sc.isOpen());
 306             }
 307         });
 308     }
 309 
 310     /**
 311      * Virtual thread interrupted while blocked in SocketChannel write.
 312      */
 313     @ParameterizedTest
 314     @MethodSource("threadBuilders")
 315     void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 316         VThreadRunner.run(builder, () -> {
 317             boolean done = false;
 318             while (!done) {
 319                 try (var connection = new Connection()) {
 320                     SocketChannel sc = connection.channel1();
 321 
 322                     // interrupt current thread when it blocks in write
 323                     Thread thisThread = Thread.currentThread();
 324                     runAfterParkedAsync(thisThread::interrupt, true);
 325 
 326                     // write until channel is closed
 327                     try {
 328                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 329                         for (;;) {
 330                             int n = sc.write(bb);
 331                             assertTrue(n > 0);
 332                             bb.clear();
 333                         }
 334                     } catch (ClosedByInterruptException e) {
 335                         // closed when blocked in write
 336                         assertTrue(Thread.interrupted());
 337                         done = true;
 338                     } catch (ClosedChannelException e) {
 339                         // closed but not blocked in write, need to retry test
 340                         System.err.format("%s, need to retry!%n", e);
 341                     }
 342                 }
 343             }
 344         });
 345     }
 346 
 347     /**
 348      * Virtual thread blocks in SocketChannel adaptor read.
 349      */
 350     @ParameterizedTest
 351     @MethodSource("threadBuilders")
 352     void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception {
 353         testSocketAdaptorRead(builder, 0);
 354     }
 355 
 356     /**
 357      * Virtual thread blocks in SocketChannel adaptor read with timeout.
 358      */
 359     @ParameterizedTest
 360     @MethodSource("threadBuilders")
 361     void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception {
 362         testSocketAdaptorRead(builder, 60_000);
 363     }
 364 
 365     private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder,
 366                                        int timeout) throws Exception {
 367         VThreadRunner.run(builder, () -> {
 368             try (var connection = new Connection()) {
 369                 SocketChannel sc1 = connection.channel1();
 370                 SocketChannel sc2 = connection.channel2();
 371 
 372                 // write to sc1 when currnet thread blocks reading from sc2
 373                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 374                 runAfterParkedAsync(() -> sc1.write(bb));
 375 
 376                 // read from sc2 should block
 377                 byte[] array = new byte[100];
 378                 if (timeout > 0)
 379                     sc2.socket().setSoTimeout(timeout);
 380                 int n = sc2.socket().getInputStream().read(array);
 381                 assertTrue(n > 0);
 382                 assertTrue(array[0] == 'X');
 383             }
 384         });
 385     }
 386 
 387     /**
 388      * ServerSocketChannel accept, no blocking.
 389      */
 390     @ParameterizedTest
 391     @MethodSource("threadBuilders")
 392     void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception {
 393         VThreadRunner.run(builder, () -> {
 394             try (var ssc = ServerSocketChannel.open()) {
 395                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 396                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
 397                 // accept should not block
 398                 var sc2 = ssc.accept();
 399                 sc1.close();
 400                 sc2.close();
 401             }
 402         });
 403     }
 404 
 405     /**
 406      * Virtual thread blocks in ServerSocketChannel accept.
 407      */
 408     @ParameterizedTest
 409     @MethodSource("threadBuilders")
 410     void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception {
 411         VThreadRunner.run(builder, () -> {
 412             try (var ssc = ServerSocketChannel.open()) {
 413                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 414                 var sc1 = SocketChannel.open();
 415 
 416                 // connect when current thread when it blocks in accept
 417                 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
 418 
 419                 // accept should block
 420                 var sc2 = ssc.accept();
 421                 sc1.close();
 422                 sc2.close();
 423             }
 424         });
 425     }
 426 
 427     /**
 428      * SeverSocketChannel close while virtual thread blocked in accept.
 429      */
 430     @ParameterizedTest
 431     @MethodSource("threadBuilders")
 432     void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 433         VThreadRunner.run(builder, () -> {
 434             try (var ssc = ServerSocketChannel.open()) {
 435                 InetAddress lh = InetAddress.getLoopbackAddress();
 436                 ssc.bind(new InetSocketAddress(lh, 0));
 437                 runAfterParkedAsync(ssc::close);
 438                 try {
 439                     SocketChannel sc = ssc.accept();
 440                     sc.close();
 441                     fail("connection accepted???");
 442                 } catch (AsynchronousCloseException expected) { }
 443             }
 444         });
 445     }
 446 
 447     /**
 448      * Virtual thread interrupted while blocked in ServerSocketChannel accept.
 449      */
 450     @ParameterizedTest
 451     @MethodSource("threadBuilders")
 452     void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 453         VThreadRunner.run(builder, () -> {
 454             try (var ssc = ServerSocketChannel.open()) {
 455                 InetAddress lh = InetAddress.getLoopbackAddress();
 456                 ssc.bind(new InetSocketAddress(lh, 0));
 457 
 458                 // interrupt current thread when it blocks in accept
 459                 Thread thisThread = Thread.currentThread();
 460                 runAfterParkedAsync(thisThread::interrupt);
 461 
 462                 try {
 463                     SocketChannel sc = ssc.accept();
 464                     sc.close();
 465                     fail("connection accepted???");
 466                 } catch (ClosedByInterruptException expected) {
 467                     assertTrue(Thread.interrupted());
 468                 }
 469             }
 470         });
 471     }
 472 
 473     /**
 474      * Virtual thread blocks in ServerSocketChannel adaptor accept.
 475      */
 476     @ParameterizedTest
 477     @MethodSource("threadBuilders")
 478     void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception {
 479         testSocketChannelAdaptorAccept(builder, 0);
 480     }
 481 
 482     /**
 483      * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
 484      */
 485     @ParameterizedTest
 486     @MethodSource("threadBuilders")
 487     void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception {
 488         testSocketChannelAdaptorAccept(builder, 60_000);
 489     }
 490 
 491     private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder,
 492                                                 int timeout) throws Exception {
 493         VThreadRunner.run(builder, () -> {
 494             try (var ssc = ServerSocketChannel.open()) {
 495                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 496                 var sc = SocketChannel.open();
 497 
 498                 // interrupt current thread when it blocks in accept
 499                 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
 500 
 501                 // accept should block
 502                 if (timeout > 0)
 503                     ssc.socket().setSoTimeout(timeout);
 504                 Socket s = ssc.socket().accept();
 505                 sc.close();
 506                 s.close();
 507             }
 508         });
 509     }
 510 
 511     /**
 512      * DatagramChannel receive/send, no blocking.
 513      */
 514     @ParameterizedTest
 515     @MethodSource("threadBuilders")
 516     void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception {
 517         VThreadRunner.run(builder, () -> {
 518             try (DatagramChannel dc1 = DatagramChannel.open();
 519                  DatagramChannel dc2 = DatagramChannel.open()) {
 520 
 521                 InetAddress lh = InetAddress.getLoopbackAddress();
 522                 dc2.bind(new InetSocketAddress(lh, 0));
 523 
 524                 // send should not block
 525                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 526                 int n = dc1.send(bb, dc2.getLocalAddress());
 527                 assertTrue(n > 0);
 528 
 529                 // receive should not block
 530                 bb = ByteBuffer.allocate(10);
 531                 dc2.receive(bb);
 532                 assertTrue(bb.get(0) == 'X');
 533             }
 534         });
 535     }
 536 
 537     /**
 538      * Virtual thread blocks in DatagramChannel receive.
 539      */
 540     @ParameterizedTest
 541     @MethodSource("threadBuilders")
 542     void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception {
 543         VThreadRunner.run(builder, () -> {
 544             try (DatagramChannel dc1 = DatagramChannel.open();
 545                  DatagramChannel dc2 = DatagramChannel.open()) {
 546 
 547                 InetAddress lh = InetAddress.getLoopbackAddress();
 548                 dc2.bind(new InetSocketAddress(lh, 0));
 549 
 550                 // send from dc1 when current thread blocked in dc2.receive
 551                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 552                 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
 553 
 554                 // read from dc2 should block
 555                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 556                 dc2.receive(bb2);
 557                 assertTrue(bb2.get(0) == 'X');
 558             }
 559         });
 560     }
 561 
 562     /**
 563      * DatagramChannel close while virtual thread blocked in receive.
 564      */
 565     @ParameterizedTest
 566     @MethodSource("threadBuilders")
 567     void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 568         VThreadRunner.run(builder, () -> {
 569             try (DatagramChannel dc = DatagramChannel.open()) {
 570                 InetAddress lh = InetAddress.getLoopbackAddress();
 571                 dc.bind(new InetSocketAddress(lh, 0));
 572                 runAfterParkedAsync(dc::close);
 573                 try {
 574                     dc.receive(ByteBuffer.allocate(100));
 575                     fail("receive returned");
 576                 } catch (AsynchronousCloseException expected) { }
 577             }
 578         });
 579     }
 580 
 581     /**
 582      * Virtual thread interrupted while blocked in DatagramChannel receive.
 583      */
 584     @ParameterizedTest
 585     @MethodSource("threadBuilders")
 586     void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 587         VThreadRunner.run(builder, () -> {
 588             try (DatagramChannel dc = DatagramChannel.open()) {
 589                 InetAddress lh = InetAddress.getLoopbackAddress();
 590                 dc.bind(new InetSocketAddress(lh, 0));
 591 
 592                 // interrupt current thread when it blocks in receive
 593                 Thread thisThread = Thread.currentThread();
 594                 runAfterParkedAsync(thisThread::interrupt);
 595 
 596                 try {
 597                     dc.receive(ByteBuffer.allocate(100));
 598                     fail("receive returned");
 599                 } catch (ClosedByInterruptException expected) {
 600                     assertTrue(Thread.interrupted());
 601                 }
 602             }
 603         });
 604     }
 605 
 606     /**
 607      * Virtual thread blocks in DatagramSocket adaptor receive.
 608      */
 609     @ParameterizedTest
 610     @MethodSource("threadBuilders")
 611     void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception {
 612         testDatagramSocketAdaptorReceive(builder, 0);
 613     }
 614 
 615     /**
 616      * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
 617      */
 618     @ParameterizedTest
 619     @MethodSource("threadBuilders")
 620     void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception {
 621         testDatagramSocketAdaptorReceive(builder, 60_000);
 622     }
 623 
 624     private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder,
 625                                                   int timeout) throws Exception {
 626         VThreadRunner.run(builder, () -> {
 627             try (DatagramChannel dc1 = DatagramChannel.open();
 628                  DatagramChannel dc2 = DatagramChannel.open()) {
 629 
 630                 InetAddress lh = InetAddress.getLoopbackAddress();
 631                 dc2.bind(new InetSocketAddress(lh, 0));
 632 
 633                 // send from dc1 when current thread blocks in dc2 receive
 634                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 635                 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
 636 
 637                 // receive should block
 638                 byte[] array = new byte[100];
 639                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 640                 if (timeout > 0)
 641                     dc2.socket().setSoTimeout(timeout);
 642                 dc2.socket().receive(p);
 643                 assertTrue(p.getLength() == 3 && array[0] == 'X');
 644             }
 645         });
 646     }
 647 
 648     /**
 649      * DatagramChannel close while virtual thread blocked in adaptor receive.
 650      */
 651     @ParameterizedTest
 652     @MethodSource("threadBuilders")
 653     void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception {
 654         testDatagramSocketAdaptorReceiveAsyncClose(builder, 0);
 655     }
 656 
 657     /**
 658      * DatagramChannel close while virtual thread blocked in adaptor receive
 659      * with timeout.
 660      */
 661     @ParameterizedTest
 662     @MethodSource("threadBuilders")
 663     void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception {
 664         testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000);
 665     }
 666 
 667     private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder,
 668                                                             int timeout) throws Exception {
 669         VThreadRunner.run(builder, () -> {
 670             try (DatagramChannel dc = DatagramChannel.open()) {
 671                 InetAddress lh = InetAddress.getLoopbackAddress();
 672                 dc.bind(new InetSocketAddress(lh, 0));
 673 
 674                 byte[] array = new byte[100];
 675                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 676                 if (timeout > 0)
 677                     dc.socket().setSoTimeout(timeout);
 678 
 679                 // close channel/socket when current thread blocks in receive
 680                 runAfterParkedAsync(dc::close);
 681 
 682                 assertThrows(SocketException.class, () -> dc.socket().receive(p));
 683             }
 684         });
 685     }
 686 
 687     /**
 688      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
 689      */
 690     @ParameterizedTest
 691     @MethodSource("threadBuilders")
 692     void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception {
 693         testDatagramSocketAdaptorReceiveInterrupt(builder, 0);
 694     }
 695 
 696     /**
 697      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
 698      * with timeout.
 699      */
 700     @ParameterizedTest
 701     @MethodSource("threadBuilders")
 702     void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception {
 703         testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000);
 704     }
 705 
 706     private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder,
 707                                                            int timeout) throws Exception {
 708         VThreadRunner.run(builder, () -> {
 709             try (DatagramChannel dc = DatagramChannel.open()) {
 710                 InetAddress lh = InetAddress.getLoopbackAddress();
 711                 dc.bind(new InetSocketAddress(lh, 0));
 712 
 713                 byte[] array = new byte[100];
 714                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 715                 if (timeout > 0)
 716                     dc.socket().setSoTimeout(timeout);
 717 
 718                 // interrupt current thread when it blocks in receive
 719                 Thread thisThread = Thread.currentThread();
 720                 runAfterParkedAsync(thisThread::interrupt);
 721 
 722                 try {
 723                     dc.socket().receive(p);
 724                     fail();
 725                 } catch (ClosedByInterruptException expected) {
 726                     assertTrue(Thread.interrupted());
 727                 }
 728             }
 729         });
 730     }
 731 
 732     /**
 733      * Pipe read/write, no blocking.
 734      */
 735     @ParameterizedTest
 736     @MethodSource("threadBuilders")
 737     void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
 738         VThreadRunner.run(builder, () -> {
 739             Pipe p = Pipe.open();
 740             try (Pipe.SinkChannel sink = p.sink();
 741                  Pipe.SourceChannel source = p.source()) {
 742 
 743                 // write should not block
 744                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 745                 int n = sink.write(bb);
 746                 assertTrue(n > 0);
 747 
 748                 // read should not block
 749                 bb = ByteBuffer.allocate(10);
 750                 n = source.read(bb);
 751                 assertTrue(n > 0);
 752                 assertTrue(bb.get(0) == 'X');
 753             }
 754         });
 755     }
 756 
 757     /**
 758      * Virtual thread blocks in Pipe.SourceChannel read.
 759      */
 760     @ParameterizedTest
 761     @MethodSource("threadBuilders")
 762     void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception {
 763         VThreadRunner.run(builder, () -> {
 764             Pipe p = Pipe.open();
 765             try (Pipe.SinkChannel sink = p.sink();
 766                  Pipe.SourceChannel source = p.source()) {
 767 
 768                 // write from sink when current thread blocks reading from source
 769                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 770                 runAfterParkedAsync(() -> sink.write(bb1));
 771 
 772                 // read should block
 773                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 774                 int n = source.read(bb2);
 775                 assertTrue(n > 0);
 776                 assertTrue(bb2.get(0) == 'X');
 777             }
 778         });
 779     }
 780 
 781     /**
 782      * Virtual thread blocks in Pipe.SinkChannel write.
 783      */
 784     @ParameterizedTest
 785     @MethodSource("threadBuilders")
 786     void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception {
 787         VThreadRunner.run(builder, () -> {
 788             Pipe p = Pipe.open();
 789             try (Pipe.SinkChannel sink = p.sink();
 790                  Pipe.SourceChannel source = p.source()) {
 791 
 792                 // read from source to EOF when current thread blocking in write
 793                 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
 794 
 795                 // write to sink should block
 796                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
 797                 for (int i=0; i<1000; i++) {
 798                     int n = sink.write(bb);
 799                     assertTrue(n > 0);
 800                     bb.clear();
 801                 }
 802                 sink.close();
 803 
 804                 // wait for reader to finish
 805                 reader.join();
 806             }
 807         });
 808     }
 809 
 810     /**
 811      * Pipe.SourceChannel close while virtual thread blocked in read.
 812      */
 813     @ParameterizedTest
 814     @MethodSource("threadBuilders")
 815     void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 816         VThreadRunner.run(builder, () -> {
 817             Pipe p = Pipe.open();
 818             try (Pipe.SinkChannel sink = p.sink();
 819                  Pipe.SourceChannel source = p.source()) {
 820                 runAfterParkedAsync(source::close);
 821                 try {
 822                     int n = source.read(ByteBuffer.allocate(100));
 823                     fail("read returned " + n);
 824                 } catch (AsynchronousCloseException expected) { }
 825             }
 826         });
 827     }
 828 
 829     /**
 830      * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
 831      */
 832     @ParameterizedTest
 833     @MethodSource("threadBuilders")
 834     void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 835         VThreadRunner.run(builder, () -> {
 836             Pipe p = Pipe.open();
 837             try (Pipe.SinkChannel sink = p.sink();
 838                  Pipe.SourceChannel source = p.source()) {
 839 
 840                 // interrupt current thread when it blocks reading from source
 841                 Thread thisThread = Thread.currentThread();
 842                 runAfterParkedAsync(thisThread::interrupt);
 843 
 844                 try {
 845                     int n = source.read(ByteBuffer.allocate(100));
 846                     fail("read returned " + n);
 847                 } catch (ClosedByInterruptException expected) {
 848                     assertTrue(Thread.interrupted());
 849                 }
 850             }
 851         });
 852     }
 853 
 854     /**
 855      * Pipe.SinkChannel close while virtual thread blocked in write.
 856      */
 857     @ParameterizedTest
 858     @MethodSource("threadBuilders")
 859     void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 860         VThreadRunner.run(builder, () -> {
 861             boolean done = false;
 862             while (!done) {
 863                 Pipe p = Pipe.open();
 864                 try (Pipe.SinkChannel sink = p.sink();
 865                      Pipe.SourceChannel source = p.source()) {
 866 
 867                     // close sink when current thread blocks in write
 868                     runAfterParkedAsync(sink::close, true);
 869 
 870                     // write until channel is closed
 871                     try {
 872                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 873                         for (;;) {
 874                             int n = sink.write(bb);
 875                             assertTrue(n > 0);
 876                             bb.clear();
 877                         }
 878                     } catch (AsynchronousCloseException e) {
 879                         // closed when blocked in write
 880                         done = true;
 881                     } catch (ClosedChannelException e) {
 882                         // closed but not blocked in write, need to retry test
 883                         System.err.format("%s, need to retry!%n", e);
 884                     }
 885                 }
 886             }
 887         });
 888     }
 889 
 890     /**
 891      * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
 892      */
 893     @ParameterizedTest
 894     @MethodSource("threadBuilders")
 895     void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 896         VThreadRunner.run(builder, () -> {
 897             boolean done = false;
 898             while (!done) {
 899                 Pipe p = Pipe.open();
 900                 try (Pipe.SinkChannel sink = p.sink();
 901                      Pipe.SourceChannel source = p.source()) {
 902 
 903                     // interrupt current thread when it blocks in write
 904                     Thread thisThread = Thread.currentThread();
 905                     runAfterParkedAsync(thisThread::interrupt, true);
 906 
 907                     // write until channel is closed
 908                     try {
 909                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 910                         for (;;) {
 911                             int n = sink.write(bb);
 912                             assertTrue(n > 0);
 913                             bb.clear();
 914                         }
 915                     } catch (ClosedByInterruptException expected) {
 916                         // closed when blocked in write
< prev index next >