< prev index next >

test/jdk/java/nio/channels/vthread/BlockingChannelOps.java

Print this page

   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /*
  25  * @test id=default
  26  * @bug 8284161
  27  * @summary Test virtual threads doing blocking I/O on NIO channels
  28  * @library /test/lib
  29  * @run junit/timeout=480 BlockingChannelOps
  30  */
  31 
  32 /*
  33  * @test id=poller-modes
  34  * @requires (os.family == "linux") | (os.family == "mac")
  35  * @library /test/lib
  36  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
  37  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps













  38  */
  39 
  40 /*
  41  * @test id=no-vmcontinuations
  42  * @requires vm.continuations
  43  * @library /test/lib
  44  * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
  45  */
  46 
  47 import java.io.Closeable;
  48 import java.io.IOException;
  49 import java.net.DatagramPacket;
  50 import java.net.InetAddress;
  51 import java.net.InetSocketAddress;
  52 import java.net.Socket;
  53 import java.net.SocketAddress;
  54 import java.net.SocketException;
  55 import java.nio.ByteBuffer;
  56 import java.nio.channels.AsynchronousCloseException;
  57 import java.nio.channels.ClosedByInterruptException;
  58 import java.nio.channels.ClosedChannelException;
  59 import java.nio.channels.DatagramChannel;
  60 import java.nio.channels.Pipe;
  61 import java.nio.channels.ReadableByteChannel;
  62 import java.nio.channels.ServerSocketChannel;
  63 import java.nio.channels.SocketChannel;
  64 import java.nio.channels.WritableByteChannel;



  65 import java.util.concurrent.locks.LockSupport;

  66 
  67 import jdk.test.lib.thread.VThreadRunner;
  68 import org.junit.jupiter.api.Test;




  69 import static org.junit.jupiter.api.Assertions.*;
  70 
  71 class BlockingChannelOps {




















  72 
  73     /**
  74      * SocketChannel read/write, no blocking.
  75      */
  76     @Test
  77     void testSocketChannelReadWrite1() throws Exception {
  78         VThreadRunner.run(() -> {

  79             try (var connection = new Connection()) {
  80                 SocketChannel sc1 = connection.channel1();
  81                 SocketChannel sc2 = connection.channel2();
  82 
  83                 // write to sc1
  84                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
  85                 int n = sc1.write(bb);
  86                 assertTrue(n > 0);
  87 
  88                 // read from sc2 should not block
  89                 bb = ByteBuffer.allocate(10);
  90                 n = sc2.read(bb);
  91                 assertTrue(n > 0);
  92                 assertTrue(bb.get(0) == 'X');
  93             }
  94         });
  95     }
  96 
  97     /**
  98      * Virtual thread blocks in SocketChannel read.
  99      */
 100     @Test
 101     void testSocketChannelRead() throws Exception {
 102         VThreadRunner.run(() -> {

 103             try (var connection = new Connection()) {
 104                 SocketChannel sc1 = connection.channel1();
 105                 SocketChannel sc2 = connection.channel2();
 106 
 107                 // write to sc1 when current thread blocks in sc2.read
 108                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 109                 runAfterParkedAsync(() -> sc1.write(bb1));
 110 
 111                 // read from sc2 should block
 112                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 113                 int n = sc2.read(bb2);
 114                 assertTrue(n > 0);
 115                 assertTrue(bb2.get(0) == 'X');
 116             }
 117         });
 118     }
 119 
 120     /**
 121      * Virtual thread blocks in SocketChannel write.
 122      */
 123     @Test
 124     void testSocketChannelWrite() throws Exception {
 125         VThreadRunner.run(() -> {

 126             try (var connection = new Connection()) {
 127                 SocketChannel sc1 = connection.channel1();
 128                 SocketChannel sc2 = connection.channel2();
 129 
 130                 // read from sc2 to EOF when current thread blocks in sc1.write
 131                 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
 132 
 133                 // write to sc1 should block
 134                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
 135                 for (int i=0; i<1000; i++) {
 136                     int n = sc1.write(bb);
 137                     assertTrue(n > 0);
 138                     bb.clear();
 139                 }
 140                 sc1.close();
 141 
 142                 // wait for reader to finish
 143                 reader.join();
 144             }
 145         });
 146     }
 147 
 148     /**
 149      * SocketChannel close while virtual thread blocked in read.
 150      */
 151     @Test
 152     void testSocketChannelReadAsyncClose() throws Exception {
 153         VThreadRunner.run(() -> {

 154             try (var connection = new Connection()) {
 155                 SocketChannel sc = connection.channel1();
 156                 runAfterParkedAsync(sc::close);
 157                 try {
 158                     int n = sc.read(ByteBuffer.allocate(100));
 159                     fail("read returned " + n);
 160                 } catch (AsynchronousCloseException expected) { }
 161             }
 162         });
 163     }
 164 
 165     /**
 166      * SocketChannel shutdownInput while virtual thread blocked in read.
 167      */
 168     @Test
 169     void testSocketChannelReadAsyncShutdownInput() throws Exception {
 170         VThreadRunner.run(() -> {

 171             try (var connection = new Connection()) {
 172                 SocketChannel sc = connection.channel1();
 173                 runAfterParkedAsync(sc::shutdownInput);
 174                 int n = sc.read(ByteBuffer.allocate(100));
 175                 assertEquals(-1, n);
 176                 assertTrue(sc.isOpen());
 177             }
 178         });
 179     }
 180 
 181     /**
 182      * Virtual thread interrupted while blocked in SocketChannel read.
 183      */
 184     @Test
 185     void testSocketChannelReadInterrupt() throws Exception {
 186         VThreadRunner.run(() -> {

 187             try (var connection = new Connection()) {
 188                 SocketChannel sc = connection.channel1();
 189 
 190                 // interrupt current thread when it blocks in read
 191                 Thread thisThread = Thread.currentThread();
 192                 runAfterParkedAsync(thisThread::interrupt);
 193 
 194                 try {
 195                     int n = sc.read(ByteBuffer.allocate(100));
 196                     fail("read returned " + n);
 197                 } catch (ClosedByInterruptException expected) {
 198                     assertTrue(Thread.interrupted());
 199                 }
 200             }
 201         });
 202     }
 203 
 204     /**
 205      * SocketChannel close while virtual thread blocked in write.
 206      */
 207     @Test
 208     void testSocketChannelWriteAsyncClose() throws Exception {
 209         VThreadRunner.run(() -> {

 210             boolean done = false;
 211             while (!done) {
 212                 try (var connection = new Connection()) {
 213                     SocketChannel sc = connection.channel1();
 214 
 215                     // close sc when current thread blocks in write
 216                     runAfterParkedAsync(sc::close, true);
 217 
 218                     // write until channel is closed
 219                     try {
 220                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 221                         for (;;) {
 222                             int n = sc.write(bb);
 223                             assertTrue(n > 0);
 224                             bb.clear();
 225                         }
 226                     } catch (AsynchronousCloseException expected) {
 227                         // closed when blocked in write
 228                         done = true;
 229                     } catch (ClosedChannelException e) {
 230                         // closed but not blocked in write, need to retry test
 231                         System.err.format("%s, need to retry!%n", e);
 232                     }
 233                 }
 234             }
 235         });
 236     }
 237 
 238 
 239     /**
 240      * SocketChannel shutdownOutput while virtual thread blocked in write.
 241      */
 242     @Test
 243     void testSocketChannelWriteAsyncShutdownOutput() throws Exception {
 244         VThreadRunner.run(() -> {

 245             try (var connection = new Connection()) {
 246                 SocketChannel sc = connection.channel1();
 247 
 248                 // shutdown output when current thread blocks in write
 249                 runAfterParkedAsync(sc::shutdownOutput);
 250                 try {
 251                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
 252                     for (;;) {
 253                         int n = sc.write(bb);
 254                         assertTrue(n > 0);
 255                         bb.clear();
 256                     }
 257                 } catch (ClosedChannelException e) {
 258                     // expected
 259                 }
 260                 assertTrue(sc.isOpen());
 261             }
 262         });
 263     }
 264 
 265     /**
 266      * Virtual thread interrupted while blocked in SocketChannel write.
 267      */
 268     @Test
 269     void testSocketChannelWriteInterrupt() throws Exception {
 270         VThreadRunner.run(() -> {

 271             boolean done = false;
 272             while (!done) {
 273                 try (var connection = new Connection()) {
 274                     SocketChannel sc = connection.channel1();
 275 
 276                     // interrupt current thread when it blocks in write
 277                     Thread thisThread = Thread.currentThread();
 278                     runAfterParkedAsync(thisThread::interrupt, true);
 279 
 280                     // write until channel is closed
 281                     try {
 282                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 283                         for (;;) {
 284                             int n = sc.write(bb);
 285                             assertTrue(n > 0);
 286                             bb.clear();
 287                         }
 288                     } catch (ClosedByInterruptException e) {
 289                         // closed when blocked in write
 290                         assertTrue(Thread.interrupted());
 291                         done = true;
 292                     } catch (ClosedChannelException e) {
 293                         // closed but not blocked in write, need to retry test
 294                         System.err.format("%s, need to retry!%n", e);
 295                     }
 296                 }
 297             }
 298         });
 299     }
 300 
 301     /**
 302      * Virtual thread blocks in SocketChannel adaptor read.
 303      */
 304     @Test
 305     void testSocketAdaptorRead1() throws Exception {
 306         testSocketAdaptorRead(0);

 307     }
 308 
 309     /**
 310      * Virtual thread blocks in SocketChannel adaptor read with timeout.
 311      */
 312     @Test
 313     void testSocketAdaptorRead2() throws Exception {
 314         testSocketAdaptorRead(60_000);

 315     }
 316 
 317     private void testSocketAdaptorRead(int timeout) throws Exception {
 318         VThreadRunner.run(() -> {

 319             try (var connection = new Connection()) {
 320                 SocketChannel sc1 = connection.channel1();
 321                 SocketChannel sc2 = connection.channel2();
 322 
 323                 // write to sc1 when currnet thread blocks reading from sc2
 324                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 325                 runAfterParkedAsync(() -> sc1.write(bb));
 326 
 327                 // read from sc2 should block
 328                 byte[] array = new byte[100];
 329                 if (timeout > 0)
 330                     sc2.socket().setSoTimeout(timeout);
 331                 int n = sc2.socket().getInputStream().read(array);
 332                 assertTrue(n > 0);
 333                 assertTrue(array[0] == 'X');
 334             }
 335         });
 336     }
 337 
 338     /**
 339      * ServerSocketChannel accept, no blocking.
 340      */
 341     @Test
 342     void testServerSocketChannelAccept1() throws Exception {
 343         VThreadRunner.run(() -> {

 344             try (var ssc = ServerSocketChannel.open()) {
 345                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 346                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
 347                 // accept should not block
 348                 var sc2 = ssc.accept();
 349                 sc1.close();
 350                 sc2.close();
 351             }
 352         });
 353     }
 354 
 355     /**
 356      * Virtual thread blocks in ServerSocketChannel accept.
 357      */
 358     @Test
 359     void testServerSocketChannelAccept2() throws Exception {
 360         VThreadRunner.run(() -> {

 361             try (var ssc = ServerSocketChannel.open()) {
 362                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 363                 var sc1 = SocketChannel.open();
 364 
 365                 // connect when current thread when it blocks in accept
 366                 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
 367 
 368                 // accept should block
 369                 var sc2 = ssc.accept();
 370                 sc1.close();
 371                 sc2.close();
 372             }
 373         });
 374     }
 375 
 376     /**
 377      * SeverSocketChannel close while virtual thread blocked in accept.
 378      */
 379     @Test
 380     void testServerSocketChannelAcceptAsyncClose() throws Exception {
 381         VThreadRunner.run(() -> {

 382             try (var ssc = ServerSocketChannel.open()) {
 383                 InetAddress lh = InetAddress.getLoopbackAddress();
 384                 ssc.bind(new InetSocketAddress(lh, 0));
 385                 runAfterParkedAsync(ssc::close);
 386                 try {
 387                     SocketChannel sc = ssc.accept();
 388                     sc.close();
 389                     fail("connection accepted???");
 390                 } catch (AsynchronousCloseException expected) { }
 391             }
 392         });
 393     }
 394 
 395     /**
 396      * Virtual thread interrupted while blocked in ServerSocketChannel accept.
 397      */
 398     @Test
 399     void testServerSocketChannelAcceptInterrupt() throws Exception {
 400         VThreadRunner.run(() -> {

 401             try (var ssc = ServerSocketChannel.open()) {
 402                 InetAddress lh = InetAddress.getLoopbackAddress();
 403                 ssc.bind(new InetSocketAddress(lh, 0));
 404 
 405                 // interrupt current thread when it blocks in accept
 406                 Thread thisThread = Thread.currentThread();
 407                 runAfterParkedAsync(thisThread::interrupt);
 408 
 409                 try {
 410                     SocketChannel sc = ssc.accept();
 411                     sc.close();
 412                     fail("connection accepted???");
 413                 } catch (ClosedByInterruptException expected) {
 414                     assertTrue(Thread.interrupted());
 415                 }
 416             }
 417         });
 418     }
 419 
 420     /**
 421      * Virtual thread blocks in ServerSocketChannel adaptor accept.
 422      */
 423     @Test
 424     void testSocketChannelAdaptorAccept1() throws Exception {
 425         testSocketChannelAdaptorAccept(0);

 426     }
 427 
 428     /**
 429      * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
 430      */
 431     @Test
 432     void testSocketChannelAdaptorAccept2() throws Exception {
 433         testSocketChannelAdaptorAccept(60_000);

 434     }
 435 
 436     private void testSocketChannelAdaptorAccept(int timeout) throws Exception {
 437         VThreadRunner.run(() -> {

 438             try (var ssc = ServerSocketChannel.open()) {
 439                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 440                 var sc = SocketChannel.open();
 441 
 442                 // interrupt current thread when it blocks in accept
 443                 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
 444 
 445                 // accept should block
 446                 if (timeout > 0)
 447                     ssc.socket().setSoTimeout(timeout);
 448                 Socket s = ssc.socket().accept();
 449                 sc.close();
 450                 s.close();
 451             }
 452         });
 453     }
 454 
 455     /**
 456      * DatagramChannel receive/send, no blocking.
 457      */
 458     @Test
 459     void testDatagramChannelSendReceive1() throws Exception {
 460         VThreadRunner.run(() -> {

 461             try (DatagramChannel dc1 = DatagramChannel.open();
 462                  DatagramChannel dc2 = DatagramChannel.open()) {
 463 
 464                 InetAddress lh = InetAddress.getLoopbackAddress();
 465                 dc2.bind(new InetSocketAddress(lh, 0));
 466 
 467                 // send should not block
 468                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 469                 int n = dc1.send(bb, dc2.getLocalAddress());
 470                 assertTrue(n > 0);
 471 
 472                 // receive should not block
 473                 bb = ByteBuffer.allocate(10);
 474                 dc2.receive(bb);
 475                 assertTrue(bb.get(0) == 'X');
 476             }
 477         });
 478     }
 479 
 480     /**
 481      * Virtual thread blocks in DatagramChannel receive.
 482      */
 483     @Test
 484     void testDatagramChannelSendReceive2() throws Exception {
 485         VThreadRunner.run(() -> {

 486             try (DatagramChannel dc1 = DatagramChannel.open();
 487                  DatagramChannel dc2 = DatagramChannel.open()) {
 488 
 489                 InetAddress lh = InetAddress.getLoopbackAddress();
 490                 dc2.bind(new InetSocketAddress(lh, 0));
 491 
 492                 // send from dc1 when current thread blocked in dc2.receive
 493                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 494                 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
 495 
 496                 // read from dc2 should block
 497                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 498                 dc2.receive(bb2);
 499                 assertTrue(bb2.get(0) == 'X');
 500             }
 501         });
 502     }
 503 
 504     /**
 505      * DatagramChannel close while virtual thread blocked in receive.
 506      */
 507     @Test
 508     void testDatagramChannelReceiveAsyncClose() throws Exception {
 509         VThreadRunner.run(() -> {

 510             try (DatagramChannel dc = DatagramChannel.open()) {
 511                 InetAddress lh = InetAddress.getLoopbackAddress();
 512                 dc.bind(new InetSocketAddress(lh, 0));
 513                 runAfterParkedAsync(dc::close);
 514                 try {
 515                     dc.receive(ByteBuffer.allocate(100));
 516                     fail("receive returned");
 517                 } catch (AsynchronousCloseException expected) { }
 518             }
 519         });
 520     }
 521 
 522     /**
 523      * Virtual thread interrupted while blocked in DatagramChannel receive.
 524      */
 525     @Test
 526     void testDatagramChannelReceiveInterrupt() throws Exception {
 527         VThreadRunner.run(() -> {

 528             try (DatagramChannel dc = DatagramChannel.open()) {
 529                 InetAddress lh = InetAddress.getLoopbackAddress();
 530                 dc.bind(new InetSocketAddress(lh, 0));
 531 
 532                 // interrupt current thread when it blocks in receive
 533                 Thread thisThread = Thread.currentThread();
 534                 runAfterParkedAsync(thisThread::interrupt);
 535 
 536                 try {
 537                     dc.receive(ByteBuffer.allocate(100));
 538                     fail("receive returned");
 539                 } catch (ClosedByInterruptException expected) {
 540                     assertTrue(Thread.interrupted());
 541                 }
 542             }
 543         });
 544     }
 545 
 546     /**
 547      * Virtual thread blocks in DatagramSocket adaptor receive.
 548      */
 549     @Test
 550     void testDatagramSocketAdaptorReceive1() throws Exception {
 551         testDatagramSocketAdaptorReceive(0);

 552     }
 553 
 554     /**
 555      * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
 556      */
 557     @Test
 558     void testDatagramSocketAdaptorReceive2() throws Exception {
 559         testDatagramSocketAdaptorReceive(60_000);

 560     }
 561 
 562     private void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
 563         VThreadRunner.run(() -> {

 564             try (DatagramChannel dc1 = DatagramChannel.open();
 565                  DatagramChannel dc2 = DatagramChannel.open()) {
 566 
 567                 InetAddress lh = InetAddress.getLoopbackAddress();
 568                 dc2.bind(new InetSocketAddress(lh, 0));
 569 
 570                 // send from dc1 when current thread blocks in dc2 receive
 571                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 572                 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
 573 
 574                 // receive should block
 575                 byte[] array = new byte[100];
 576                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 577                 if (timeout > 0)
 578                     dc2.socket().setSoTimeout(timeout);
 579                 dc2.socket().receive(p);
 580                 assertTrue(p.getLength() == 3 && array[0] == 'X');
 581             }
 582         });
 583     }
 584 
 585     /**
 586      * DatagramChannel close while virtual thread blocked in adaptor receive.
 587      */
 588     @Test
 589     void testDatagramSocketAdaptorReceiveAsyncClose1() throws Exception {
 590         testDatagramSocketAdaptorReceiveAsyncClose(0);

 591     }
 592 
 593     /**
 594      * DatagramChannel close while virtual thread blocked in adaptor receive
 595      * with timeout.
 596      */
 597     @Test
 598     void testDatagramSocketAdaptorReceiveAsyncClose2() throws Exception {
 599         testDatagramSocketAdaptorReceiveAsyncClose(60_1000);

 600     }
 601 
 602     private void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
 603         VThreadRunner.run(() -> {

 604             try (DatagramChannel dc = DatagramChannel.open()) {
 605                 InetAddress lh = InetAddress.getLoopbackAddress();
 606                 dc.bind(new InetSocketAddress(lh, 0));
 607 
 608                 byte[] array = new byte[100];
 609                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 610                 if (timeout > 0)
 611                     dc.socket().setSoTimeout(timeout);
 612 
 613                 // close channel/socket when current thread blocks in receive
 614                 runAfterParkedAsync(dc::close);
 615 
 616                 assertThrows(SocketException.class, () -> dc.socket().receive(p));
 617             }
 618         });
 619     }
 620 
 621     /**
 622      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
 623      */
 624     @Test
 625     void testDatagramSocketAdaptorReceiveInterrupt1() throws Exception {
 626         testDatagramSocketAdaptorReceiveInterrupt(0);

 627     }
 628 
 629     /**
 630      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
 631      * with timeout.
 632      */
 633     @Test
 634     void testDatagramSocketAdaptorReceiveInterrupt2() throws Exception {
 635         testDatagramSocketAdaptorReceiveInterrupt(60_1000);

 636     }
 637 
 638     private void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
 639         VThreadRunner.run(() -> {

 640             try (DatagramChannel dc = DatagramChannel.open()) {
 641                 InetAddress lh = InetAddress.getLoopbackAddress();
 642                 dc.bind(new InetSocketAddress(lh, 0));
 643 
 644                 byte[] array = new byte[100];
 645                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 646                 if (timeout > 0)
 647                     dc.socket().setSoTimeout(timeout);
 648 
 649                 // interrupt current thread when it blocks in receive
 650                 Thread thisThread = Thread.currentThread();
 651                 runAfterParkedAsync(thisThread::interrupt);
 652 
 653                 try {
 654                     dc.socket().receive(p);
 655                     fail();
 656                 } catch (ClosedByInterruptException expected) {
 657                     assertTrue(Thread.interrupted());
 658                 }
 659             }
 660         });
 661     }
 662 
 663     /**
 664      * Pipe read/write, no blocking.
 665      */
 666     @Test
 667     void testPipeReadWrite1() throws Exception {
 668         VThreadRunner.run(() -> {

 669             Pipe p = Pipe.open();
 670             try (Pipe.SinkChannel sink = p.sink();
 671                  Pipe.SourceChannel source = p.source()) {
 672 
 673                 // write should not block
 674                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 675                 int n = sink.write(bb);
 676                 assertTrue(n > 0);
 677 
 678                 // read should not block
 679                 bb = ByteBuffer.allocate(10);
 680                 n = source.read(bb);
 681                 assertTrue(n > 0);
 682                 assertTrue(bb.get(0) == 'X');
 683             }
 684         });
 685     }
 686 
 687     /**
 688      * Virtual thread blocks in Pipe.SourceChannel read.
 689      */
 690     @Test
 691     void testPipeReadWrite2() throws Exception {
 692         VThreadRunner.run(() -> {

 693             Pipe p = Pipe.open();
 694             try (Pipe.SinkChannel sink = p.sink();
 695                  Pipe.SourceChannel source = p.source()) {
 696 
 697                 // write from sink when current thread blocks reading from source
 698                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 699                 runAfterParkedAsync(() -> sink.write(bb1));
 700 
 701                 // read should block
 702                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 703                 int n = source.read(bb2);
 704                 assertTrue(n > 0);
 705                 assertTrue(bb2.get(0) == 'X');
 706             }
 707         });
 708     }
 709 
 710     /**
 711      * Virtual thread blocks in Pipe.SinkChannel write.
 712      */
 713     @Test
 714     void testPipeReadWrite3() throws Exception {
 715         VThreadRunner.run(() -> {

 716             Pipe p = Pipe.open();
 717             try (Pipe.SinkChannel sink = p.sink();
 718                  Pipe.SourceChannel source = p.source()) {
 719 
 720                 // read from source to EOF when current thread blocking in write
 721                 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
 722 
 723                 // write to sink should block
 724                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
 725                 for (int i=0; i<1000; i++) {
 726                     int n = sink.write(bb);
 727                     assertTrue(n > 0);
 728                     bb.clear();
 729                 }
 730                 sink.close();
 731 
 732                 // wait for reader to finish
 733                 reader.join();
 734             }
 735         });
 736     }
 737 
 738     /**
 739      * Pipe.SourceChannel close while virtual thread blocked in read.
 740      */
 741     @Test
 742     void testPipeReadAsyncClose() throws Exception {
 743         VThreadRunner.run(() -> {

 744             Pipe p = Pipe.open();
 745             try (Pipe.SinkChannel sink = p.sink();
 746                  Pipe.SourceChannel source = p.source()) {
 747                 runAfterParkedAsync(source::close);
 748                 try {
 749                     int n = source.read(ByteBuffer.allocate(100));
 750                     fail("read returned " + n);
 751                 } catch (AsynchronousCloseException expected) { }
 752             }
 753         });
 754     }
 755 
 756     /**
 757      * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
 758      */
 759     @Test
 760     void testPipeReadInterrupt() throws Exception {
 761         VThreadRunner.run(() -> {

 762             Pipe p = Pipe.open();
 763             try (Pipe.SinkChannel sink = p.sink();
 764                  Pipe.SourceChannel source = p.source()) {
 765 
 766                 // interrupt current thread when it blocks reading from source
 767                 Thread thisThread = Thread.currentThread();
 768                 runAfterParkedAsync(thisThread::interrupt);
 769 
 770                 try {
 771                     int n = source.read(ByteBuffer.allocate(100));
 772                     fail("read returned " + n);
 773                 } catch (ClosedByInterruptException expected) {
 774                     assertTrue(Thread.interrupted());
 775                 }
 776             }
 777         });
 778     }
 779 
 780     /**
 781      * Pipe.SinkChannel close while virtual thread blocked in write.
 782      */
 783     @Test
 784     void testPipeWriteAsyncClose() throws Exception {
 785         VThreadRunner.run(() -> {

 786             boolean done = false;
 787             while (!done) {
 788                 Pipe p = Pipe.open();
 789                 try (Pipe.SinkChannel sink = p.sink();
 790                      Pipe.SourceChannel source = p.source()) {
 791 
 792                     // close sink when current thread blocks in write
 793                     runAfterParkedAsync(sink::close, true);
 794 
 795                     // write until channel is closed
 796                     try {
 797                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 798                         for (;;) {
 799                             int n = sink.write(bb);
 800                             assertTrue(n > 0);
 801                             bb.clear();
 802                         }
 803                     } catch (AsynchronousCloseException e) {
 804                         // closed when blocked in write
 805                         done = true;
 806                     } catch (ClosedChannelException e) {
 807                         // closed but not blocked in write, need to retry test
 808                         System.err.format("%s, need to retry!%n", e);
 809                     }
 810                 }
 811             }
 812         });
 813     }
 814 
 815     /**
 816      * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
 817      */
 818     @Test
 819     void testPipeWriteInterrupt() throws Exception {
 820         VThreadRunner.run(() -> {

 821             boolean done = false;
 822             while (!done) {
 823                 Pipe p = Pipe.open();
 824                 try (Pipe.SinkChannel sink = p.sink();
 825                      Pipe.SourceChannel source = p.source()) {
 826 
 827                     // interrupt current thread when it blocks in write
 828                     Thread thisThread = Thread.currentThread();
 829                     runAfterParkedAsync(thisThread::interrupt, true);
 830 
 831                     // write until channel is closed
 832                     try {
 833                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 834                         for (;;) {
 835                             int n = sink.write(bb);
 836                             assertTrue(n > 0);
 837                             bb.clear();
 838                         }
 839                     } catch (ClosedByInterruptException expected) {
 840                         // closed when blocked in write

   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /*
  25  * @test id=default
  26  * @bug 8284161
  27  * @summary Test virtual threads doing blocking I/O on NIO channels
  28  * @library /test/lib
  29  * @run junit/othervm/timeout=480 BlockingChannelOps
  30  */
  31 
  32 /*
  33  * @test id=poller-modes
  34  * @requires (os.family == "linux") | (os.family == "mac")
  35  * @library /test/lib
  36  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
  37  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
  38  * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 BlockingChannelOps
  39  */
  40 
  41 /*
  42  * @test id=io_uring
  43  * @requires os.family == "linux"
  44  * @library /test/lib
  45  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring=true BlockingChannelOps
  46  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring=true BlockingChannelOps
  47  * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring=true BlockingChannelOps
  48  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
  49  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
  50  * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
  51  */
  52 
  53 /*
  54  * @test id=no-vmcontinuations
  55  * @requires vm.continuations
  56  * @library /test/lib
  57  * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
  58  */
  59 
  60 import java.io.Closeable;
  61 import java.io.IOException;
  62 import java.net.DatagramPacket;
  63 import java.net.InetAddress;
  64 import java.net.InetSocketAddress;
  65 import java.net.Socket;
  66 import java.net.SocketAddress;
  67 import java.net.SocketException;
  68 import java.nio.ByteBuffer;
  69 import java.nio.channels.AsynchronousCloseException;
  70 import java.nio.channels.ClosedByInterruptException;
  71 import java.nio.channels.ClosedChannelException;
  72 import java.nio.channels.DatagramChannel;
  73 import java.nio.channels.Pipe;
  74 import java.nio.channels.ReadableByteChannel;
  75 import java.nio.channels.ServerSocketChannel;
  76 import java.nio.channels.SocketChannel;
  77 import java.nio.channels.WritableByteChannel;
  78 import java.util.concurrent.ExecutorService;
  79 import java.util.concurrent.Executors;
  80 import java.util.concurrent.ThreadFactory;
  81 import java.util.concurrent.locks.LockSupport;
  82 import java.util.stream.Stream;
  83 
  84 import jdk.test.lib.thread.VThreadRunner;
  85 import jdk.test.lib.thread.VThreadScheduler;
  86 
  87 import org.junit.jupiter.api.BeforeAll;
  88 import org.junit.jupiter.params.ParameterizedTest;
  89 import org.junit.jupiter.params.provider.MethodSource;
  90 import static org.junit.jupiter.api.Assertions.*;
  91 
  92 class BlockingChannelOps {
  93     private static ExecutorService threadPool;
  94     private static Thread.VirtualThreadScheduler customScheduler;
  95 
  96     @BeforeAll
  97     static void setup() {
  98         ThreadFactory factory = Thread.ofPlatform().daemon().factory();
  99         threadPool = Executors.newCachedThreadPool(factory);
 100         customScheduler = (_, task) -> threadPool.execute(task);
 101     }
 102 
 103     /**
 104      * Returns the Thread.Builder to create the virtual thread.
 105      */
 106     static Stream<Thread.Builder.OfVirtual> threadBuilders() {
 107         if (VThreadScheduler.supportsCustomScheduler()) {
 108             return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler));
 109         } else {
 110             return Stream.of(Thread.ofVirtual());
 111         }
 112     }
 113 
 114     /**
 115      * SocketChannel read/write, no blocking.
 116      */
 117     @ParameterizedTest
 118     @MethodSource("threadBuilders")
 119     void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
 120         VThreadRunner.run(builder, () -> {
 121             try (var connection = new Connection()) {
 122                 SocketChannel sc1 = connection.channel1();
 123                 SocketChannel sc2 = connection.channel2();
 124 
 125                 // write to sc1
 126                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 127                 int n = sc1.write(bb);
 128                 assertTrue(n > 0);
 129 
 130                 // read from sc2 should not block
 131                 bb = ByteBuffer.allocate(10);
 132                 n = sc2.read(bb);
 133                 assertTrue(n > 0);
 134                 assertTrue(bb.get(0) == 'X');
 135             }
 136         });
 137     }
 138 
 139     /**
 140      * Virtual thread blocks in SocketChannel read.
 141      */
 142     @ParameterizedTest
 143     @MethodSource("threadBuilders")
 144     void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception {
 145         VThreadRunner.run(builder, () -> {
 146             try (var connection = new Connection()) {
 147                 SocketChannel sc1 = connection.channel1();
 148                 SocketChannel sc2 = connection.channel2();
 149 
 150                 // write to sc1 when current thread blocks in sc2.read
 151                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 152                 runAfterParkedAsync(() -> sc1.write(bb1));
 153 
 154                 // read from sc2 should block
 155                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 156                 int n = sc2.read(bb2);
 157                 assertTrue(n > 0);
 158                 assertTrue(bb2.get(0) == 'X');
 159             }
 160         });
 161     }
 162 
 163     /**
 164      * Virtual thread blocks in SocketChannel write.
 165      */
 166     @ParameterizedTest
 167     @MethodSource("threadBuilders")
 168     void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception {
 169         VThreadRunner.run(builder, () -> {
 170             try (var connection = new Connection()) {
 171                 SocketChannel sc1 = connection.channel1();
 172                 SocketChannel sc2 = connection.channel2();
 173 
 174                 // read from sc2 to EOF when current thread blocks in sc1.write
 175                 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
 176 
 177                 // write to sc1 should block
 178                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
 179                 for (int i=0; i<1000; i++) {
 180                     int n = sc1.write(bb);
 181                     assertTrue(n > 0);
 182                     bb.clear();
 183                 }
 184                 sc1.close();
 185 
 186                 // wait for reader to finish
 187                 reader.join();
 188             }
 189         });
 190     }
 191 
 192     /**
 193      * SocketChannel close while virtual thread blocked in read.
 194      */
 195     @ParameterizedTest
 196     @MethodSource("threadBuilders")
 197     void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 198         VThreadRunner.run(builder, () -> {
 199             try (var connection = new Connection()) {
 200                 SocketChannel sc = connection.channel1();
 201                 runAfterParkedAsync(sc::close);
 202                 try {
 203                     int n = sc.read(ByteBuffer.allocate(100));
 204                     fail("read returned " + n);
 205                 } catch (AsynchronousCloseException expected) { }
 206             }
 207         });
 208     }
 209 
 210     /**
 211      * SocketChannel shutdownInput while virtual thread blocked in read.
 212      */
 213     @ParameterizedTest
 214     @MethodSource("threadBuilders")
 215     void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception {
 216         VThreadRunner.run(builder, () -> {
 217             try (var connection = new Connection()) {
 218                 SocketChannel sc = connection.channel1();
 219                 runAfterParkedAsync(sc::shutdownInput);
 220                 int n = sc.read(ByteBuffer.allocate(100));
 221                 assertEquals(-1, n);
 222                 assertTrue(sc.isOpen());
 223             }
 224         });
 225     }
 226 
 227     /**
 228      * Virtual thread interrupted while blocked in SocketChannel read.
 229      */
 230     @ParameterizedTest
 231     @MethodSource("threadBuilders")
 232     void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 233         VThreadRunner.run(builder, () -> {
 234             try (var connection = new Connection()) {
 235                 SocketChannel sc = connection.channel1();
 236 
 237                 // interrupt current thread when it blocks in read
 238                 Thread thisThread = Thread.currentThread();
 239                 runAfterParkedAsync(thisThread::interrupt);
 240 
 241                 try {
 242                     int n = sc.read(ByteBuffer.allocate(100));
 243                     fail("read returned " + n);
 244                 } catch (ClosedByInterruptException expected) {
 245                     assertTrue(Thread.interrupted());
 246                 }
 247             }
 248         });
 249     }
 250 
 251     /**
 252      * SocketChannel close while virtual thread blocked in write.
 253      */
 254     @ParameterizedTest
 255     @MethodSource("threadBuilders")
 256     void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 257         VThreadRunner.run(builder, () -> {
 258             boolean done = false;
 259             while (!done) {
 260                 try (var connection = new Connection()) {
 261                     SocketChannel sc = connection.channel1();
 262 
 263                     // close sc when current thread blocks in write
 264                     runAfterParkedAsync(sc::close, true);
 265 
 266                     // write until channel is closed
 267                     try {
 268                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 269                         for (;;) {
 270                             int n = sc.write(bb);
 271                             assertTrue(n > 0);
 272                             bb.clear();
 273                         }
 274                     } catch (AsynchronousCloseException expected) {
 275                         // closed when blocked in write
 276                         done = true;
 277                     } catch (ClosedChannelException e) {
 278                         // closed but not blocked in write, need to retry test
 279                         System.err.format("%s, need to retry!%n", e);
 280                     }
 281                 }
 282             }
 283         });
 284     }
 285 

 286     /**
 287      * SocketChannel shutdownOutput while virtual thread blocked in write.
 288      */
 289     @ParameterizedTest
 290     @MethodSource("threadBuilders")
 291     void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception {
 292         VThreadRunner.run(builder, () -> {
 293             try (var connection = new Connection()) {
 294                 SocketChannel sc = connection.channel1();
 295 
 296                 // shutdown output when current thread blocks in write
 297                 runAfterParkedAsync(sc::shutdownOutput);
 298                 try {
 299                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
 300                     for (;;) {
 301                         int n = sc.write(bb);
 302                         assertTrue(n > 0);
 303                         bb.clear();
 304                     }
 305                 } catch (ClosedChannelException e) {
 306                     // expected
 307                 }
 308                 assertTrue(sc.isOpen());
 309             }
 310         });
 311     }
 312 
 313     /**
 314      * Virtual thread interrupted while blocked in SocketChannel write.
 315      */
 316     @ParameterizedTest
 317     @MethodSource("threadBuilders")
 318     void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 319         VThreadRunner.run(builder, () -> {
 320             boolean done = false;
 321             while (!done) {
 322                 try (var connection = new Connection()) {
 323                     SocketChannel sc = connection.channel1();
 324 
 325                     // interrupt current thread when it blocks in write
 326                     Thread thisThread = Thread.currentThread();
 327                     runAfterParkedAsync(thisThread::interrupt, true);
 328 
 329                     // write until channel is closed
 330                     try {
 331                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 332                         for (;;) {
 333                             int n = sc.write(bb);
 334                             assertTrue(n > 0);
 335                             bb.clear();
 336                         }
 337                     } catch (ClosedByInterruptException e) {
 338                         // closed when blocked in write
 339                         assertTrue(Thread.interrupted());
 340                         done = true;
 341                     } catch (ClosedChannelException e) {
 342                         // closed but not blocked in write, need to retry test
 343                         System.err.format("%s, need to retry!%n", e);
 344                     }
 345                 }
 346             }
 347         });
 348     }
 349 
 350     /**
 351      * Virtual thread blocks in SocketChannel adaptor read.
 352      */
 353     @ParameterizedTest
 354     @MethodSource("threadBuilders")
 355     void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception {
 356         testSocketAdaptorRead(builder, 0);
 357     }
 358 
 359     /**
 360      * Virtual thread blocks in SocketChannel adaptor read with timeout.
 361      */
 362     @ParameterizedTest
 363     @MethodSource("threadBuilders")
 364     void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception {
 365         testSocketAdaptorRead(builder, 60_000);
 366     }
 367 
 368     private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder,
 369                                        int timeout) throws Exception {
 370         VThreadRunner.run(builder, () -> {
 371             try (var connection = new Connection()) {
 372                 SocketChannel sc1 = connection.channel1();
 373                 SocketChannel sc2 = connection.channel2();
 374 
 375                 // write to sc1 when currnet thread blocks reading from sc2
 376                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 377                 runAfterParkedAsync(() -> sc1.write(bb));
 378 
 379                 // read from sc2 should block
 380                 byte[] array = new byte[100];
 381                 if (timeout > 0)
 382                     sc2.socket().setSoTimeout(timeout);
 383                 int n = sc2.socket().getInputStream().read(array);
 384                 assertTrue(n > 0);
 385                 assertTrue(array[0] == 'X');
 386             }
 387         });
 388     }
 389 
 390     /**
 391      * ServerSocketChannel accept, no blocking.
 392      */
 393     @ParameterizedTest
 394     @MethodSource("threadBuilders")
 395     void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception {
 396         VThreadRunner.run(builder, () -> {
 397             try (var ssc = ServerSocketChannel.open()) {
 398                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 399                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
 400                 // accept should not block
 401                 var sc2 = ssc.accept();
 402                 sc1.close();
 403                 sc2.close();
 404             }
 405         });
 406     }
 407 
 408     /**
 409      * Virtual thread blocks in ServerSocketChannel accept.
 410      */
 411     @ParameterizedTest
 412     @MethodSource("threadBuilders")
 413     void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception {
 414         VThreadRunner.run(builder, () -> {
 415             try (var ssc = ServerSocketChannel.open()) {
 416                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 417                 var sc1 = SocketChannel.open();
 418 
 419                 // connect when current thread when it blocks in accept
 420                 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
 421 
 422                 // accept should block
 423                 var sc2 = ssc.accept();
 424                 sc1.close();
 425                 sc2.close();
 426             }
 427         });
 428     }
 429 
 430     /**
 431      * SeverSocketChannel close while virtual thread blocked in accept.
 432      */
 433     @ParameterizedTest
 434     @MethodSource("threadBuilders")
 435     void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 436         VThreadRunner.run(builder, () -> {
 437             try (var ssc = ServerSocketChannel.open()) {
 438                 InetAddress lh = InetAddress.getLoopbackAddress();
 439                 ssc.bind(new InetSocketAddress(lh, 0));
 440                 runAfterParkedAsync(ssc::close);
 441                 try {
 442                     SocketChannel sc = ssc.accept();
 443                     sc.close();
 444                     fail("connection accepted???");
 445                 } catch (AsynchronousCloseException expected) { }
 446             }
 447         });
 448     }
 449 
 450     /**
 451      * Virtual thread interrupted while blocked in ServerSocketChannel accept.
 452      */
 453     @ParameterizedTest
 454     @MethodSource("threadBuilders")
 455     void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 456         VThreadRunner.run(builder, () -> {
 457             try (var ssc = ServerSocketChannel.open()) {
 458                 InetAddress lh = InetAddress.getLoopbackAddress();
 459                 ssc.bind(new InetSocketAddress(lh, 0));
 460 
 461                 // interrupt current thread when it blocks in accept
 462                 Thread thisThread = Thread.currentThread();
 463                 runAfterParkedAsync(thisThread::interrupt);
 464 
 465                 try {
 466                     SocketChannel sc = ssc.accept();
 467                     sc.close();
 468                     fail("connection accepted???");
 469                 } catch (ClosedByInterruptException expected) {
 470                     assertTrue(Thread.interrupted());
 471                 }
 472             }
 473         });
 474     }
 475 
 476     /**
 477      * Virtual thread blocks in ServerSocketChannel adaptor accept.
 478      */
 479     @ParameterizedTest
 480     @MethodSource("threadBuilders")
 481     void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception {
 482         testSocketChannelAdaptorAccept(builder, 0);
 483     }
 484 
 485     /**
 486      * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
 487      */
 488     @ParameterizedTest
 489     @MethodSource("threadBuilders")
 490     void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception {
 491         testSocketChannelAdaptorAccept(builder, 60_000);
 492     }
 493 
 494     private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder,
 495                                                 int timeout) throws Exception {
 496         VThreadRunner.run(builder, () -> {
 497             try (var ssc = ServerSocketChannel.open()) {
 498                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
 499                 var sc = SocketChannel.open();
 500 
 501                 // interrupt current thread when it blocks in accept
 502                 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
 503 
 504                 // accept should block
 505                 if (timeout > 0)
 506                     ssc.socket().setSoTimeout(timeout);
 507                 Socket s = ssc.socket().accept();
 508                 sc.close();
 509                 s.close();
 510             }
 511         });
 512     }
 513 
 514     /**
 515      * DatagramChannel receive/send, no blocking.
 516      */
 517     @ParameterizedTest
 518     @MethodSource("threadBuilders")
 519     void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception {
 520         VThreadRunner.run(builder, () -> {
 521             try (DatagramChannel dc1 = DatagramChannel.open();
 522                  DatagramChannel dc2 = DatagramChannel.open()) {
 523 
 524                 InetAddress lh = InetAddress.getLoopbackAddress();
 525                 dc2.bind(new InetSocketAddress(lh, 0));
 526 
 527                 // send should not block
 528                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 529                 int n = dc1.send(bb, dc2.getLocalAddress());
 530                 assertTrue(n > 0);
 531 
 532                 // receive should not block
 533                 bb = ByteBuffer.allocate(10);
 534                 dc2.receive(bb);
 535                 assertTrue(bb.get(0) == 'X');
 536             }
 537         });
 538     }
 539 
 540     /**
 541      * Virtual thread blocks in DatagramChannel receive.
 542      */
 543     @ParameterizedTest
 544     @MethodSource("threadBuilders")
 545     void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception {
 546         VThreadRunner.run(builder, () -> {
 547             try (DatagramChannel dc1 = DatagramChannel.open();
 548                  DatagramChannel dc2 = DatagramChannel.open()) {
 549 
 550                 InetAddress lh = InetAddress.getLoopbackAddress();
 551                 dc2.bind(new InetSocketAddress(lh, 0));
 552 
 553                 // send from dc1 when current thread blocked in dc2.receive
 554                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 555                 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
 556 
 557                 // read from dc2 should block
 558                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 559                 dc2.receive(bb2);
 560                 assertTrue(bb2.get(0) == 'X');
 561             }
 562         });
 563     }
 564 
 565     /**
 566      * DatagramChannel close while virtual thread blocked in receive.
 567      */
 568     @ParameterizedTest
 569     @MethodSource("threadBuilders")
 570     void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 571         VThreadRunner.run(builder, () -> {
 572             try (DatagramChannel dc = DatagramChannel.open()) {
 573                 InetAddress lh = InetAddress.getLoopbackAddress();
 574                 dc.bind(new InetSocketAddress(lh, 0));
 575                 runAfterParkedAsync(dc::close);
 576                 try {
 577                     dc.receive(ByteBuffer.allocate(100));
 578                     fail("receive returned");
 579                 } catch (AsynchronousCloseException expected) { }
 580             }
 581         });
 582     }
 583 
 584     /**
 585      * Virtual thread interrupted while blocked in DatagramChannel receive.
 586      */
 587     @ParameterizedTest
 588     @MethodSource("threadBuilders")
 589     void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 590         VThreadRunner.run(builder, () -> {
 591             try (DatagramChannel dc = DatagramChannel.open()) {
 592                 InetAddress lh = InetAddress.getLoopbackAddress();
 593                 dc.bind(new InetSocketAddress(lh, 0));
 594 
 595                 // interrupt current thread when it blocks in receive
 596                 Thread thisThread = Thread.currentThread();
 597                 runAfterParkedAsync(thisThread::interrupt);
 598 
 599                 try {
 600                     dc.receive(ByteBuffer.allocate(100));
 601                     fail("receive returned");
 602                 } catch (ClosedByInterruptException expected) {
 603                     assertTrue(Thread.interrupted());
 604                 }
 605             }
 606         });
 607     }
 608 
 609     /**
 610      * Virtual thread blocks in DatagramSocket adaptor receive.
 611      */
 612     @ParameterizedTest
 613     @MethodSource("threadBuilders")
 614     void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception {
 615         testDatagramSocketAdaptorReceive(builder, 0);
 616     }
 617 
 618     /**
 619      * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
 620      */
 621     @ParameterizedTest
 622     @MethodSource("threadBuilders")
 623     void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception {
 624         testDatagramSocketAdaptorReceive(builder, 60_000);
 625     }
 626 
 627     private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder,
 628                                                   int timeout) throws Exception {
 629         VThreadRunner.run(builder, () -> {
 630             try (DatagramChannel dc1 = DatagramChannel.open();
 631                  DatagramChannel dc2 = DatagramChannel.open()) {
 632 
 633                 InetAddress lh = InetAddress.getLoopbackAddress();
 634                 dc2.bind(new InetSocketAddress(lh, 0));
 635 
 636                 // send from dc1 when current thread blocks in dc2 receive
 637                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 638                 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
 639 
 640                 // receive should block
 641                 byte[] array = new byte[100];
 642                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 643                 if (timeout > 0)
 644                     dc2.socket().setSoTimeout(timeout);
 645                 dc2.socket().receive(p);
 646                 assertTrue(p.getLength() == 3 && array[0] == 'X');
 647             }
 648         });
 649     }
 650 
 651     /**
 652      * DatagramChannel close while virtual thread blocked in adaptor receive.
 653      */
 654     @ParameterizedTest
 655     @MethodSource("threadBuilders")
 656     void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception {
 657         testDatagramSocketAdaptorReceiveAsyncClose(builder, 0);
 658     }
 659 
 660     /**
 661      * DatagramChannel close while virtual thread blocked in adaptor receive
 662      * with timeout.
 663      */
 664     @ParameterizedTest
 665     @MethodSource("threadBuilders")
 666     void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception {
 667         testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000);
 668     }
 669 
 670     private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder,
 671                                                             int timeout) throws Exception {
 672         VThreadRunner.run(builder, () -> {
 673             try (DatagramChannel dc = DatagramChannel.open()) {
 674                 InetAddress lh = InetAddress.getLoopbackAddress();
 675                 dc.bind(new InetSocketAddress(lh, 0));
 676 
 677                 byte[] array = new byte[100];
 678                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 679                 if (timeout > 0)
 680                     dc.socket().setSoTimeout(timeout);
 681 
 682                 // close channel/socket when current thread blocks in receive
 683                 runAfterParkedAsync(dc::close);
 684 
 685                 assertThrows(SocketException.class, () -> dc.socket().receive(p));
 686             }
 687         });
 688     }
 689 
 690     /**
 691      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
 692      */
 693     @ParameterizedTest
 694     @MethodSource("threadBuilders")
 695     void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception {
 696         testDatagramSocketAdaptorReceiveInterrupt(builder, 0);
 697     }
 698 
 699     /**
 700      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
 701      * with timeout.
 702      */
 703     @ParameterizedTest
 704     @MethodSource("threadBuilders")
 705     void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception {
 706         testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000);
 707     }
 708 
 709     private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder,
 710                                                            int timeout) throws Exception {
 711         VThreadRunner.run(builder, () -> {
 712             try (DatagramChannel dc = DatagramChannel.open()) {
 713                 InetAddress lh = InetAddress.getLoopbackAddress();
 714                 dc.bind(new InetSocketAddress(lh, 0));
 715 
 716                 byte[] array = new byte[100];
 717                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
 718                 if (timeout > 0)
 719                     dc.socket().setSoTimeout(timeout);
 720 
 721                 // interrupt current thread when it blocks in receive
 722                 Thread thisThread = Thread.currentThread();
 723                 runAfterParkedAsync(thisThread::interrupt);
 724 
 725                 try {
 726                     dc.socket().receive(p);
 727                     fail();
 728                 } catch (ClosedByInterruptException expected) {
 729                     assertTrue(Thread.interrupted());
 730                 }
 731             }
 732         });
 733     }
 734 
 735     /**
 736      * Pipe read/write, no blocking.
 737      */
 738     @ParameterizedTest
 739     @MethodSource("threadBuilders")
 740     void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
 741         VThreadRunner.run(builder, () -> {
 742             Pipe p = Pipe.open();
 743             try (Pipe.SinkChannel sink = p.sink();
 744                  Pipe.SourceChannel source = p.source()) {
 745 
 746                 // write should not block
 747                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 748                 int n = sink.write(bb);
 749                 assertTrue(n > 0);
 750 
 751                 // read should not block
 752                 bb = ByteBuffer.allocate(10);
 753                 n = source.read(bb);
 754                 assertTrue(n > 0);
 755                 assertTrue(bb.get(0) == 'X');
 756             }
 757         });
 758     }
 759 
 760     /**
 761      * Virtual thread blocks in Pipe.SourceChannel read.
 762      */
 763     @ParameterizedTest
 764     @MethodSource("threadBuilders")
 765     void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception {
 766         VThreadRunner.run(builder, () -> {
 767             Pipe p = Pipe.open();
 768             try (Pipe.SinkChannel sink = p.sink();
 769                  Pipe.SourceChannel source = p.source()) {
 770 
 771                 // write from sink when current thread blocks reading from source
 772                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 773                 runAfterParkedAsync(() -> sink.write(bb1));
 774 
 775                 // read should block
 776                 ByteBuffer bb2 = ByteBuffer.allocate(10);
 777                 int n = source.read(bb2);
 778                 assertTrue(n > 0);
 779                 assertTrue(bb2.get(0) == 'X');
 780             }
 781         });
 782     }
 783 
 784     /**
 785      * Virtual thread blocks in Pipe.SinkChannel write.
 786      */
 787     @ParameterizedTest
 788     @MethodSource("threadBuilders")
 789     void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception {
 790         VThreadRunner.run(builder, () -> {
 791             Pipe p = Pipe.open();
 792             try (Pipe.SinkChannel sink = p.sink();
 793                  Pipe.SourceChannel source = p.source()) {
 794 
 795                 // read from source to EOF when current thread blocking in write
 796                 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
 797 
 798                 // write to sink should block
 799                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
 800                 for (int i=0; i<1000; i++) {
 801                     int n = sink.write(bb);
 802                     assertTrue(n > 0);
 803                     bb.clear();
 804                 }
 805                 sink.close();
 806 
 807                 // wait for reader to finish
 808                 reader.join();
 809             }
 810         });
 811     }
 812 
 813     /**
 814      * Pipe.SourceChannel close while virtual thread blocked in read.
 815      */
 816     @ParameterizedTest
 817     @MethodSource("threadBuilders")
 818     void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 819         VThreadRunner.run(builder, () -> {
 820             Pipe p = Pipe.open();
 821             try (Pipe.SinkChannel sink = p.sink();
 822                  Pipe.SourceChannel source = p.source()) {
 823                 runAfterParkedAsync(source::close);
 824                 try {
 825                     int n = source.read(ByteBuffer.allocate(100));
 826                     fail("read returned " + n);
 827                 } catch (AsynchronousCloseException expected) { }
 828             }
 829         });
 830     }
 831 
 832     /**
 833      * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
 834      */
 835     @ParameterizedTest
 836     @MethodSource("threadBuilders")
 837     void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 838         VThreadRunner.run(builder, () -> {
 839             Pipe p = Pipe.open();
 840             try (Pipe.SinkChannel sink = p.sink();
 841                  Pipe.SourceChannel source = p.source()) {
 842 
 843                 // interrupt current thread when it blocks reading from source
 844                 Thread thisThread = Thread.currentThread();
 845                 runAfterParkedAsync(thisThread::interrupt);
 846 
 847                 try {
 848                     int n = source.read(ByteBuffer.allocate(100));
 849                     fail("read returned " + n);
 850                 } catch (ClosedByInterruptException expected) {
 851                     assertTrue(Thread.interrupted());
 852                 }
 853             }
 854         });
 855     }
 856 
 857     /**
 858      * Pipe.SinkChannel close while virtual thread blocked in write.
 859      */
 860     @ParameterizedTest
 861     @MethodSource("threadBuilders")
 862     void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
 863         VThreadRunner.run(builder, () -> {
 864             boolean done = false;
 865             while (!done) {
 866                 Pipe p = Pipe.open();
 867                 try (Pipe.SinkChannel sink = p.sink();
 868                      Pipe.SourceChannel source = p.source()) {
 869 
 870                     // close sink when current thread blocks in write
 871                     runAfterParkedAsync(sink::close, true);
 872 
 873                     // write until channel is closed
 874                     try {
 875                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 876                         for (;;) {
 877                             int n = sink.write(bb);
 878                             assertTrue(n > 0);
 879                             bb.clear();
 880                         }
 881                     } catch (AsynchronousCloseException e) {
 882                         // closed when blocked in write
 883                         done = true;
 884                     } catch (ClosedChannelException e) {
 885                         // closed but not blocked in write, need to retry test
 886                         System.err.format("%s, need to retry!%n", e);
 887                     }
 888                 }
 889             }
 890         });
 891     }
 892 
 893     /**
 894      * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
 895      */
 896     @ParameterizedTest
 897     @MethodSource("threadBuilders")
 898     void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
 899         VThreadRunner.run(builder, () -> {
 900             boolean done = false;
 901             while (!done) {
 902                 Pipe p = Pipe.open();
 903                 try (Pipe.SinkChannel sink = p.sink();
 904                      Pipe.SourceChannel source = p.source()) {
 905 
 906                     // interrupt current thread when it blocks in write
 907                     Thread thisThread = Thread.currentThread();
 908                     runAfterParkedAsync(thisThread::interrupt, true);
 909 
 910                     // write until channel is closed
 911                     try {
 912                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
 913                         for (;;) {
 914                             int n = sink.write(bb);
 915                             assertTrue(n > 0);
 916                             bb.clear();
 917                         }
 918                     } catch (ClosedByInterruptException expected) {
 919                         // closed when blocked in write
< prev index next >