1 /*
  2  * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test id=default
 26  * @bug 8284161 8372958
 27  * @summary Test virtual threads doing blocking I/O on java.net Sockets
 28  * @library /test/lib
 29  * @run junit BlockingSocketOps
 30  */
 31 
 32 /*
 33  * @test id=poller-modes
 34  * @requires (os.family == "linux") | (os.family == "mac")
 35  * @library /test/lib
 36  * @run junit/othervm -Djdk.pollerMode=1 BlockingSocketOps
 37  * @run junit/othervm -Djdk.pollerMode=2 BlockingSocketOps
 38  * @run junit/othervm -Djdk.pollerMode=3 BlockingSocketOps
 39  */
 40 
 41 /*
 42  * @test id=no-vmcontinuations
 43  * @requires vm.continuations
 44  * @library /test/lib
 45  * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingSocketOps
 46  */
 47 
 48 import java.io.Closeable;
 49 import java.io.IOException;
 50 import java.io.InputStream;
 51 import java.io.OutputStream;
 52 import java.net.DatagramPacket;
 53 import java.net.DatagramSocket;
 54 import java.net.InetAddress;
 55 import java.net.InetSocketAddress;
 56 import java.net.ServerSocket;
 57 import java.net.Socket;
 58 import java.net.SocketAddress;
 59 import java.net.SocketException;
 60 import java.net.SocketTimeoutException;
 61 
 62 import jdk.test.lib.thread.VThreadRunner;
 63 import org.junit.jupiter.api.Test;
 64 import org.junit.jupiter.params.ParameterizedTest;
 65 import org.junit.jupiter.params.provider.ValueSource;
 66 import static org.junit.jupiter.api.Assertions.*;
 67 
 68 class BlockingSocketOps {
 69 
 70     /**
 71      * Socket read/write, no blocking.
 72      */
 73     @Test
 74     void testSocketReadWrite1() throws Exception {
 75         VThreadRunner.run(() -> {
 76             try (var connection = new Connection()) {
 77                 Socket s1 = connection.socket1();
 78                 Socket s2 = connection.socket2();
 79 
 80                 // write should not block
 81                 byte[] ba = "XXX".getBytes("UTF-8");
 82                 s1.getOutputStream().write(ba);
 83 
 84                 // read should not block
 85                 ba = new byte[10];
 86                 int n = s2.getInputStream().read(ba);
 87                 assertTrue(n > 0);
 88                 assertTrue(ba[0] == 'X');
 89             }
 90         });
 91     }
 92 
 93     /**
 94      * Virtual thread blocks in read.
 95      */
 96     @ParameterizedTest
 97     @ValueSource(ints = { 0, 60_000 })
 98     void testSocketRead(int timeout) throws Exception {
 99         VThreadRunner.run(() -> {
100             try (var connection = new Connection()) {
101                 Socket s1 = connection.socket1();
102                 Socket s2 = connection.socket2();
103 
104                 // delayed write from sc1
105                 byte[] ba1 = "XXX".getBytes("UTF-8");
106                 runAfterParkedAsync(() -> s1.getOutputStream().write(ba1));
107 
108                 // read from sc2 should block
109                 if (timeout > 0) {
110                     s2.setSoTimeout(timeout);
111                 }
112                 byte[] ba2 = new byte[10];
113                 int n = s2.getInputStream().read(ba2);
114                 assertTrue(n > 0);
115                 assertTrue(ba2[0] == 'X');
116             }
117         });
118     }
119 
120     /**
121      * Virtual thread blocks in write.
122      */
123     @Test
124     void testSocketWrite1() throws Exception {
125         VThreadRunner.run(() -> {
126             try (var connection = new Connection()) {
127                 Socket s1 = connection.socket1();
128                 Socket s2 = connection.socket2();
129 
130                 // delayed read from s2 to EOF
131                 InputStream in = s2.getInputStream();
132                 Thread reader = runAfterParkedAsync(() ->
133                         in.transferTo(OutputStream.nullOutputStream()));
134 
135                 // write should block
136                 byte[] ba = new byte[100*1024];
137                 try (OutputStream out = s1.getOutputStream()) {
138                     for (int i = 0; i < 1000; i++) {
139                         out.write(ba);
140                     }
141                 }
142 
143                 // wait for reader to finish
144                 reader.join();
145             }
146         });
147     }
148 
149     /**
150      * Virtual thread blocks in read, peer closes connection gracefully.
151      */
152     @Test
153     void testSocketReadPeerClose1() throws Exception {
154         VThreadRunner.run(() -> {
155             try (var connection = new Connection()) {
156                 Socket s1 = connection.socket1();
157                 Socket s2 = connection.socket2();
158 
159                 // delayed close of s2
160                 runAfterParkedAsync(s2::close);
161 
162                 // read from s1 should block, then read -1
163                 int n = s1.getInputStream().read();
164                 assertTrue(n == -1);
165             }
166         });
167     }
168 
169     /**
170      * Virtual thread blocks in read, peer closes connection abruptly.
171      */
172     @Test
173     void testSocketReadPeerClose2() throws Exception {
174         VThreadRunner.run(() -> {
175             try (var connection = new Connection()) {
176                 Socket s1 = connection.socket1();
177                 Socket s2 = connection.socket2();
178 
179                 // delayed abrupt close of s2
180                 s2.setSoLinger(true, 0);
181                 runAfterParkedAsync(s2::close);
182 
183                 // read from s1 should block, then throw
184                 try {
185                     int n = s1.getInputStream().read();
186                     fail("read " + n);
187                 } catch (IOException ioe) {
188                     // expected
189                 }
190             }
191         });
192     }
193 
194     /**
195      * Socket close while virtual thread blocked in read.
196      */
197     @ParameterizedTest
198     @ValueSource(ints = { 0, 60_000 })
199     void testSocketReadAsyncClose(int timeout) throws Exception {
200         VThreadRunner.run(() -> {
201             try (var connection = new Connection()) {
202                 Socket s = connection.socket1();
203 
204                 // delayed close of s
205                 runAfterParkedAsync(s::close);
206 
207                 // read from s should block, then throw
208                 if (timeout > 0) {
209                     s.setSoTimeout(timeout);
210                 }
211                 try {
212                     int n = s.getInputStream().read();
213                     fail("read " + n);
214                 } catch (SocketException expected) {
215                     log(expected);
216                 }
217             }
218         });
219     }
220 
221     /**
222      * Socket shutdownInput while virtual thread blocked in read.
223      */
224     @ParameterizedTest
225     @ValueSource(ints = { 0, 60_000 })
226     void testSocketReadAsyncShutdownInput(int timeout) throws Exception {
227         VThreadRunner.run(() -> {
228             try (var connection = new Connection()) {
229                 Socket s = connection.socket1();
230 
231                 // delayed shutdown of input stream
232                 InputStream in = s.getInputStream();
233                 runAfterParkedAsync(s::shutdownInput);
234 
235                 // read should return -1
236                 if (timeout > 0) {
237                     s.setSoTimeout(timeout);
238                 }
239                 assertEquals(-1, in.read());
240                 assertEquals(0, in.available());
241                 assertFalse(s.isClosed());
242             }
243         });
244     }
245 
246     /**
247      * Virtual thread interrupted while blocked in Socket read.
248      */
249     @ParameterizedTest
250     @ValueSource(ints = { 0, 60_000 })
251     void testSocketReadInterrupt(int timeout) throws Exception {
252         VThreadRunner.run(() -> {
253             try (var connection = new Connection()) {
254                 Socket s = connection.socket1();
255 
256 
257                 // delayed interrupt of current thread
258                 Thread thisThread = Thread.currentThread();
259                 runAfterParkedAsync(thisThread::interrupt);
260 
261                 // read from s should block, then throw
262                 if (timeout > 0) {
263                     s.setSoTimeout(timeout);
264                 }
265                 try {
266                     int n = s.getInputStream().read();
267                     fail("read " + n);
268                 } catch (SocketException expected) {
269                     log(expected);
270                     assertTrue(Thread.interrupted());
271                     assertTrue(s.isClosed());
272                 }
273             }
274         });
275     }
276 
277     /**
278      * Socket close while virtual thread blocked in write.
279      */
280     @Test
281     void testSocketWriteAsyncClose() throws Exception {
282         VThreadRunner.run(() -> {
283             try (var connection = new Connection()) {
284                 Socket s = connection.socket1();
285 
286                 // delayed close of s
287                 runAfterParkedAsync(s::close);
288 
289                 // write to s should block, then throw
290                 try {
291                     byte[] ba = new byte[100*1024];
292                     OutputStream out = s.getOutputStream();
293                     for (;;) {
294                         out.write(ba);
295                     }
296                 } catch (SocketException expected) {
297                     log(expected);
298                 }
299             }
300         });
301     }
302 
303     /**
304      * Socket shutdownOutput while virtual thread blocked in write.
305      */
306     @Test
307     void testSocketWriteAsyncShutdownOutput() throws Exception {
308         VThreadRunner.run(() -> {
309             try (var connection = new Connection()) {
310                 Socket s = connection.socket1();
311 
312                 // delayed shutdown of output stream
313                 OutputStream out = s.getOutputStream();
314                 runAfterParkedAsync(s::shutdownOutput);
315 
316                 // write to s should block, then throw
317                 try {
318                     byte[] ba = new byte[100*1024];
319                     for (;;) {
320                         out.write(ba);
321                     }
322                 } catch (SocketException expected) {
323                     log(expected);
324                 }
325                 assertFalse(s.isClosed());
326             }
327         });
328     }
329 
330     /**
331      * Virtual thread interrupted while blocked in Socket write.
332      */
333     @Test
334     void testSocketWriteInterrupt() throws Exception {
335         VThreadRunner.run(() -> {
336             try (var connection = new Connection()) {
337                 Socket s = connection.socket1();
338 
339                 // delayed interrupt of current thread
340                 Thread thisThread = Thread.currentThread();
341                 runAfterParkedAsync(thisThread::interrupt);
342 
343                 // write to s should block, then throw
344                 try {
345                     byte[] ba = new byte[100*1024];
346                     OutputStream out = s.getOutputStream();
347                     for (;;) {
348                         out.write(ba);
349                     }
350                 } catch (SocketException expected) {
351                     log(expected);
352                     assertTrue(Thread.interrupted());
353                     assertTrue(s.isClosed());
354                 }
355             }
356         });
357     }
358 
359     /**
360      * Virtual thread reading urgent data when SO_OOBINLINE is enabled.
361      */
362     @Test
363     void testSocketReadUrgentData() throws Exception {
364         VThreadRunner.run(() -> {
365             try (var connection = new Connection()) {
366                 Socket s1 = connection.socket1();
367                 Socket s2 = connection.socket2();
368 
369                 // urgent data should be received
370                 runAfterParkedAsync(() -> s2.sendUrgentData('X'));
371 
372                 // read should block, then read the OOB byte
373                 s1.setOOBInline(true);
374                 byte[] ba = new byte[10];
375                 int n = s1.getInputStream().read(ba);
376                 assertTrue(n == 1);
377                 assertTrue(ba[0] == 'X');
378 
379                 // urgent data should not be received
380                 s1.setOOBInline(false);
381                 s1.setSoTimeout(500);
382                 s2.sendUrgentData('X');
383                 try {
384                     s1.getInputStream().read(ba);
385                     fail();
386                 } catch (SocketTimeoutException expected) {
387                     log(expected);
388                 }
389             }
390         });
391     }
392 
393     /**
394      * ServerSocket accept, no blocking.
395      */
396     @Test
397     void testServerSocketAccept1() throws Exception {
398         VThreadRunner.run(() -> {
399             try (var listener = new ServerSocket()) {
400                 InetAddress loopback = InetAddress.getLoopbackAddress();
401                 listener.bind(new InetSocketAddress(loopback, 0));
402 
403                 // establish connection
404                 var socket1 = new Socket(loopback, listener.getLocalPort());
405 
406                 // accept should not block
407                 var socket2 = listener.accept();
408                 socket1.close();
409                 socket2.close();
410             }
411         });
412     }
413 
414     /**
415      * Virtual thread blocks in accept.
416      */
417     @ParameterizedTest
418     @ValueSource(ints = { 0, 60_000 })
419     void testServerSocketAccept(int timeout) throws Exception {
420         VThreadRunner.run(() -> {
421             try (var listener = new ServerSocket()) {
422                 InetAddress loopback = InetAddress.getLoopbackAddress();
423                 listener.bind(new InetSocketAddress(loopback, 0));
424 
425                 // schedule connect
426                 var socket1 = new Socket();
427                 SocketAddress remote = listener.getLocalSocketAddress();
428                 runAfterParkedAsync(() -> socket1.connect(remote));
429 
430                 // accept should block
431                 if (timeout > 0) {
432                     listener.setSoTimeout(timeout);
433                 }
434                 var socket2 = listener.accept();
435                 socket1.close();
436                 socket2.close();
437             }
438         });
439     }
440 
441     /**
442      * ServerSocket close while virtual thread blocked in accept.
443      */
444     @ParameterizedTest
445     @ValueSource(ints = { 0, 60_000 })
446     void testServerSocketAcceptAsyncClose(int timeout) throws Exception {
447         VThreadRunner.run(() -> {
448             try (var listener = new ServerSocket()) {
449                 InetAddress loopback = InetAddress.getLoopbackAddress();
450                 listener.bind(new InetSocketAddress(loopback, 0));
451 
452                 // delayed close of listener
453                 runAfterParkedAsync(listener::close);
454 
455                 // accept should block, then throw
456                 if (timeout > 0) {
457                     listener.setSoTimeout(timeout);
458                 }
459                 try {
460                     listener.accept().close();
461                     fail("connection accepted???");
462                 } catch (SocketException expected) {
463                     log(expected);
464                 }
465             }
466         });
467     }
468 
469     /**
470      * Virtual thread interrupted while blocked in ServerSocket accept.
471      */
472     @ParameterizedTest
473     @ValueSource(ints = { 0, 60_000 })
474     void testServerSocketAcceptInterrupt(int timeout) throws Exception {
475         VThreadRunner.run(() -> {
476             try (var listener = new ServerSocket()) {
477                 InetAddress loopback = InetAddress.getLoopbackAddress();
478                 listener.bind(new InetSocketAddress(loopback, 0));
479 
480                 // delayed interrupt of current thread
481                 Thread thisThread = Thread.currentThread();
482                 runAfterParkedAsync(thisThread::interrupt);
483 
484                 // accept should block, then throw
485                 if (timeout > 0) {
486                     listener.setSoTimeout(timeout);
487                 }
488                 try {
489                     listener.accept().close();
490                     fail("connection accepted???");
491                 } catch (SocketException expected) {
492                     log(expected);
493                     assertTrue(Thread.interrupted());
494                     assertTrue(listener.isClosed());
495                 }
496             }
497         });
498     }
499 
500     /**
501      * DatagramSocket receive/send, no blocking.
502      */
503     @Test
504     void testDatagramSocketSendReceive1() throws Exception {
505         VThreadRunner.run(() -> {
506             try (DatagramSocket s1 = new DatagramSocket(null);
507                  DatagramSocket s2 = new DatagramSocket(null)) {
508 
509                 InetAddress lh = InetAddress.getLoopbackAddress();
510                 s1.bind(new InetSocketAddress(lh, 0));
511                 s2.bind(new InetSocketAddress(lh, 0));
512 
513                 // send should not block
514                 byte[] bytes = "XXX".getBytes("UTF-8");
515                 DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
516                 p1.setSocketAddress(s2.getLocalSocketAddress());
517                 s1.send(p1);
518 
519                 // receive should not block
520                 byte[] ba = new byte[100];
521                 DatagramPacket p2 = new DatagramPacket(ba, ba.length);
522                 s2.receive(p2);
523                 assertEquals(s1.getLocalSocketAddress(), p2.getSocketAddress());
524                 assertTrue(ba[0] == 'X');
525             }
526         });
527     }
528 
529     /**
530      * Virtual thread blocks in DatagramSocket receive.
531      */
532     @ParameterizedTest
533     @ValueSource(ints = { 0, 60_000 })
534     void testDatagramSocketSendReceive(int timeout) throws Exception {
535         VThreadRunner.run(() -> {
536             try (DatagramSocket s1 = new DatagramSocket(null);
537                  DatagramSocket s2 = new DatagramSocket(null)) {
538 
539                 InetAddress lh = InetAddress.getLoopbackAddress();
540                 s1.bind(new InetSocketAddress(lh, 0));
541                 s2.bind(new InetSocketAddress(lh, 0));
542 
543                 // delayed send
544                 byte[] bytes = "XXX".getBytes("UTF-8");
545                 DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
546                 p1.setSocketAddress(s2.getLocalSocketAddress());
547                 runAfterParkedAsync(() -> s1.send(p1));
548 
549                 // receive should block
550                 if (timeout > 0) {
551                     s2.setSoTimeout(timeout);
552                 }
553                 byte[] ba = new byte[100];
554                 DatagramPacket p2 = new DatagramPacket(ba, ba.length);
555                 s2.receive(p2);
556                 assertEquals(s1.getLocalSocketAddress(), p2.getSocketAddress());
557                 assertTrue(ba[0] == 'X');
558             }
559         });
560     }
561 
562     /**
563      * Virtual thread blocks in DatagramSocket receive that times out.
564      */
565     @Test
566     void testDatagramSocketReceiveTimeout() throws Exception {
567         VThreadRunner.run(() -> {
568             try (DatagramSocket s = new DatagramSocket(null)) {
569                 InetAddress lh = InetAddress.getLoopbackAddress();
570                 s.bind(new InetSocketAddress(lh, 0));
571                 s.setSoTimeout(500);
572                 byte[] ba = new byte[100];
573                 DatagramPacket p = new DatagramPacket(ba, ba.length);
574                 try {
575                     s.receive(p);
576                     fail();
577                 } catch (SocketTimeoutException expected) {
578                     log(expected);
579                 }
580             }
581         });
582     }
583 
584     /**
585      * DatagramSocket close while virtual thread blocked in receive.
586      */
587     @ParameterizedTest
588     @ValueSource(ints = { 0, 60_000 })
589     void testDatagramSocketReceiveAsyncClose(int timeout) throws Exception {
590         VThreadRunner.run(() -> {
591             try (DatagramSocket s = new DatagramSocket(null)) {
592                 InetAddress lh = InetAddress.getLoopbackAddress();
593                 s.bind(new InetSocketAddress(lh, 0));
594 
595                 // delayed close of s
596                 runAfterParkedAsync(s::close);
597 
598                 // receive should block, then throw
599                 if (timeout > 0) {
600                     s.setSoTimeout(timeout);
601                 }
602                 try {
603                     byte[] ba = new byte[100];
604                     DatagramPacket p = new DatagramPacket(ba, ba.length);
605                     s.receive(p);
606                     fail();
607                 } catch (SocketException expected) {
608                     log(expected);
609                 }
610             }
611         });
612     }
613 
614     /**
615      * Virtual thread interrupted while blocked in DatagramSocket receive.
616      */
617     @ParameterizedTest
618     @ValueSource(ints = { 0, 60_000 })
619     void testDatagramSocketReceiveInterrupt(int timeout) throws Exception {
620         VThreadRunner.run(() -> {
621             try (DatagramSocket s = new DatagramSocket(null)) {
622                 InetAddress lh = InetAddress.getLoopbackAddress();
623                 s.bind(new InetSocketAddress(lh, 0));
624 
625                 // delayed interrupt of current thread
626                 Thread thisThread = Thread.currentThread();
627                 runAfterParkedAsync(thisThread::interrupt);
628 
629                 // receive should block, then throw
630                 if (timeout > 0) {
631                     s.setSoTimeout(timeout);
632                 }
633                 try {
634                     byte[] ba = new byte[100];
635                     DatagramPacket p = new DatagramPacket(ba, ba.length);
636                     s.receive(p);
637                     fail();
638                 } catch (SocketException expected) {
639                     log(expected);
640                     assertTrue(Thread.interrupted());
641                     assertTrue(s.isClosed());
642                 }
643             }
644         });
645     }
646 
647     /**
648      * Creates a loopback connection
649      */
650     static class Connection implements Closeable {
651         private final Socket s1;
652         private final Socket s2;
653         Connection() throws IOException {
654             var lh = InetAddress.getLoopbackAddress();
655             try (var listener = new ServerSocket()) {
656                 listener.bind(new InetSocketAddress(lh, 0));
657                 Socket s1 = new Socket();
658                 Socket s2;
659                 try {
660                     s1.connect(listener.getLocalSocketAddress());
661                     s2 = listener.accept();
662                 } catch (IOException ioe) {
663                     s1.close();
664                     throw ioe;
665                 }
666                 this.s1 = s1;
667                 this.s2 = s2;
668             }
669 
670         }
671         Socket socket1() {
672             return s1;
673         }
674         Socket socket2() {
675             return s2;
676         }
677         @Override
678         public void close() throws IOException {
679             s1.close();
680             s2.close();
681         }
682     }
683 
684     @FunctionalInterface
685     interface ThrowingRunnable {
686         void run() throws Exception;
687     }
688 
689     /**
690      * Runs the given task asynchronously after the current virtual thread has parked.
691      * @return the thread started to run the task
692      */
693     static Thread runAfterParkedAsync(ThrowingRunnable task) {
694         Thread target = Thread.currentThread();
695         if (!target.isVirtual())
696             throw new WrongThreadException();
697         return Thread.ofPlatform().daemon().start(() -> {
698             try {
699                 Thread.State state = target.getState();
700                 while (state != Thread.State.WAITING
701                         && state != Thread.State.TIMED_WAITING) {
702                     Thread.sleep(20);
703                     state = target.getState();
704                 }
705                 Thread.sleep(20);  // give a bit more time to release carrier
706                 task.run();
707             } catch (Exception e) {
708                 e.printStackTrace();
709             }
710         });
711     }
712 
713     /**
714      * Log to System.err to inline with the JUnit messages.
715      */
716     static void log(Throwable e) {
717         System.err.println(e);
718     }
719 }