1 /*
  2  * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test id=default
 26  * @bug 8284161 8372958
 27  * @summary Test virtual threads doing blocking I/O on java.net Sockets
 28  * @library /test/lib
 29  * @run junit BlockingSocketOps
 30  */
 31 
 32 /*
 33  * @test id=poller-modes
 34  * @requires (os.family == "linux") | (os.family == "mac")
 35  * @library /test/lib
 36  * @run junit/othervm -Djdk.pollerMode=1 BlockingSocketOps
 37  * @run junit/othervm -Djdk.pollerMode=2 BlockingSocketOps
 38  */
 39 
 40 /*
 41  * @test id=no-vmcontinuations
 42  * @requires vm.continuations
 43  * @library /test/lib
 44  * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingSocketOps
 45  */
 46 
 47 import java.io.Closeable;
 48 import java.io.IOException;
 49 import java.io.InputStream;
 50 import java.io.OutputStream;
 51 import java.net.DatagramPacket;
 52 import java.net.DatagramSocket;
 53 import java.net.InetAddress;
 54 import java.net.InetSocketAddress;
 55 import java.net.ServerSocket;
 56 import java.net.Socket;
 57 import java.net.SocketAddress;
 58 import java.net.SocketException;
 59 import java.net.SocketTimeoutException;
 60 
 61 import jdk.test.lib.thread.VThreadRunner;
 62 import org.junit.jupiter.api.Test;
 63 import org.junit.jupiter.params.ParameterizedTest;
 64 import org.junit.jupiter.params.provider.ValueSource;
 65 import static org.junit.jupiter.api.Assertions.*;
 66 
 67 class BlockingSocketOps {
 68 
 69     /**
 70      * Socket read/write, no blocking.
 71      */
 72     @Test
 73     void testSocketReadWrite1() throws Exception {
 74         VThreadRunner.run(() -> {
 75             try (var connection = new Connection()) {
 76                 Socket s1 = connection.socket1();
 77                 Socket s2 = connection.socket2();
 78 
 79                 // write should not block
 80                 byte[] ba = "XXX".getBytes("UTF-8");
 81                 s1.getOutputStream().write(ba);
 82 
 83                 // read should not block
 84                 ba = new byte[10];
 85                 int n = s2.getInputStream().read(ba);
 86                 assertTrue(n > 0);
 87                 assertTrue(ba[0] == 'X');
 88             }
 89         });
 90     }
 91 
 92     /**
 93      * Virtual thread blocks in read.
 94      */
 95     @ParameterizedTest
 96     @ValueSource(ints = { 0, 60_000 })
 97     void testSocketRead(int timeout) throws Exception {
 98         VThreadRunner.run(() -> {
 99             try (var connection = new Connection()) {
100                 Socket s1 = connection.socket1();
101                 Socket s2 = connection.socket2();
102 
103                 // delayed write from sc1
104                 byte[] ba1 = "XXX".getBytes("UTF-8");
105                 runAfterParkedAsync(() -> s1.getOutputStream().write(ba1));
106 
107                 // read from sc2 should block
108                 if (timeout > 0) {
109                     s2.setSoTimeout(timeout);
110                 }
111                 byte[] ba2 = new byte[10];
112                 int n = s2.getInputStream().read(ba2);
113                 assertTrue(n > 0);
114                 assertTrue(ba2[0] == 'X');
115             }
116         });
117     }
118 
119     /**
120      * Virtual thread blocks in write.
121      */
122     @Test
123     void testSocketWrite1() throws Exception {
124         VThreadRunner.run(() -> {
125             try (var connection = new Connection()) {
126                 Socket s1 = connection.socket1();
127                 Socket s2 = connection.socket2();
128 
129                 // delayed read from s2 to EOF
130                 InputStream in = s2.getInputStream();
131                 Thread reader = runAfterParkedAsync(() ->
132                         in.transferTo(OutputStream.nullOutputStream()));
133 
134                 // write should block
135                 byte[] ba = new byte[100*1024];
136                 try (OutputStream out = s1.getOutputStream()) {
137                     for (int i = 0; i < 1000; i++) {
138                         out.write(ba);
139                     }
140                 }
141 
142                 // wait for reader to finish
143                 reader.join();
144             }
145         });
146     }
147 
148     /**
149      * Virtual thread blocks in read, peer closes connection gracefully.
150      */
151     @Test
152     void testSocketReadPeerClose1() throws Exception {
153         VThreadRunner.run(() -> {
154             try (var connection = new Connection()) {
155                 Socket s1 = connection.socket1();
156                 Socket s2 = connection.socket2();
157 
158                 // delayed close of s2
159                 runAfterParkedAsync(s2::close);
160 
161                 // read from s1 should block, then read -1
162                 int n = s1.getInputStream().read();
163                 assertTrue(n == -1);
164             }
165         });
166     }
167 
168     /**
169      * Virtual thread blocks in read, peer closes connection abruptly.
170      */
171     @Test
172     void testSocketReadPeerClose2() throws Exception {
173         VThreadRunner.run(() -> {
174             try (var connection = new Connection()) {
175                 Socket s1 = connection.socket1();
176                 Socket s2 = connection.socket2();
177 
178                 // delayed abrupt close of s2
179                 s2.setSoLinger(true, 0);
180                 runAfterParkedAsync(s2::close);
181 
182                 // read from s1 should block, then throw
183                 try {
184                     int n = s1.getInputStream().read();
185                     fail("read " + n);
186                 } catch (IOException ioe) {
187                     // expected
188                 }
189             }
190         });
191     }
192 
193     /**
194      * Socket close while virtual thread blocked in read.
195      */
196     @ParameterizedTest
197     @ValueSource(ints = { 0, 60_000 })
198     void testSocketReadAsyncClose(int timeout) throws Exception {
199         VThreadRunner.run(() -> {
200             try (var connection = new Connection()) {
201                 Socket s = connection.socket1();
202 
203                 // delayed close of s
204                 runAfterParkedAsync(s::close);
205 
206                 // read from s should block, then throw
207                 if (timeout > 0) {
208                     s.setSoTimeout(timeout);
209                 }
210                 try {
211                     int n = s.getInputStream().read();
212                     fail("read " + n);
213                 } catch (SocketException expected) {
214                     log(expected);
215                 }
216             }
217         });
218     }
219 
220     /**
221      * Socket shutdownInput while virtual thread blocked in read.
222      */
223     @ParameterizedTest
224     @ValueSource(ints = { 0, 60_000 })
225     void testSocketReadAsyncShutdownInput(int timeout) throws Exception {
226         VThreadRunner.run(() -> {
227             try (var connection = new Connection()) {
228                 Socket s = connection.socket1();
229 
230                 // delayed shutdown of input stream
231                 InputStream in = s.getInputStream();
232                 runAfterParkedAsync(s::shutdownInput);
233 
234                 // read should return -1
235                 if (timeout > 0) {
236                     s.setSoTimeout(timeout);
237                 }
238                 assertEquals(-1, in.read());
239                 assertEquals(0, in.available());
240                 assertFalse(s.isClosed());
241             }
242         });
243     }
244 
245     /**
246      * Virtual thread interrupted while blocked in Socket read.
247      */
248     @ParameterizedTest
249     @ValueSource(ints = { 0, 60_000 })
250     void testSocketReadInterrupt(int timeout) throws Exception {
251         VThreadRunner.run(() -> {
252             try (var connection = new Connection()) {
253                 Socket s = connection.socket1();
254 
255 
256                 // delayed interrupt of current thread
257                 Thread thisThread = Thread.currentThread();
258                 runAfterParkedAsync(thisThread::interrupt);
259 
260                 // read from s should block, then throw
261                 if (timeout > 0) {
262                     s.setSoTimeout(timeout);
263                 }
264                 try {
265                     int n = s.getInputStream().read();
266                     fail("read " + n);
267                 } catch (SocketException expected) {
268                     log(expected);
269                     assertTrue(Thread.interrupted());
270                     assertTrue(s.isClosed());
271                 }
272             }
273         });
274     }
275 
276     /**
277      * Socket close while virtual thread blocked in write.
278      */
279     @Test
280     void testSocketWriteAsyncClose() throws Exception {
281         VThreadRunner.run(() -> {
282             try (var connection = new Connection()) {
283                 Socket s = connection.socket1();
284 
285                 // delayed close of s
286                 runAfterParkedAsync(s::close);
287 
288                 // write to s should block, then throw
289                 try {
290                     byte[] ba = new byte[100*1024];
291                     OutputStream out = s.getOutputStream();
292                     for (;;) {
293                         out.write(ba);
294                     }
295                 } catch (SocketException expected) {
296                     log(expected);
297                 }
298             }
299         });
300     }
301 
302     /**
303      * Socket shutdownOutput while virtual thread blocked in write.
304      */
305     @Test
306     void testSocketWriteAsyncShutdownOutput() throws Exception {
307         VThreadRunner.run(() -> {
308             try (var connection = new Connection()) {
309                 Socket s = connection.socket1();
310 
311                 // delayed shutdown of output stream
312                 OutputStream out = s.getOutputStream();
313                 runAfterParkedAsync(s::shutdownOutput);
314 
315                 // write to s should block, then throw
316                 try {
317                     byte[] ba = new byte[100*1024];
318                     for (;;) {
319                         out.write(ba);
320                     }
321                 } catch (SocketException expected) {
322                     log(expected);
323                 }
324                 assertFalse(s.isClosed());
325             }
326         });
327     }
328 
329     /**
330      * Virtual thread interrupted while blocked in Socket write.
331      */
332     @Test
333     void testSocketWriteInterrupt() throws Exception {
334         VThreadRunner.run(() -> {
335             try (var connection = new Connection()) {
336                 Socket s = connection.socket1();
337 
338                 // delayed interrupt of current thread
339                 Thread thisThread = Thread.currentThread();
340                 runAfterParkedAsync(thisThread::interrupt);
341 
342                 // write to s should block, then throw
343                 try {
344                     byte[] ba = new byte[100*1024];
345                     OutputStream out = s.getOutputStream();
346                     for (;;) {
347                         out.write(ba);
348                     }
349                 } catch (SocketException expected) {
350                     log(expected);
351                     assertTrue(Thread.interrupted());
352                     assertTrue(s.isClosed());
353                 }
354             }
355         });
356     }
357 
358     /**
359      * Virtual thread reading urgent data when SO_OOBINLINE is enabled.
360      */
361     @Test
362     void testSocketReadUrgentData() throws Exception {
363         VThreadRunner.run(() -> {
364             try (var connection = new Connection()) {
365                 Socket s1 = connection.socket1();
366                 Socket s2 = connection.socket2();
367 
368                 // urgent data should be received
369                 runAfterParkedAsync(() -> s2.sendUrgentData('X'));
370 
371                 // read should block, then read the OOB byte
372                 s1.setOOBInline(true);
373                 byte[] ba = new byte[10];
374                 int n = s1.getInputStream().read(ba);
375                 assertTrue(n == 1);
376                 assertTrue(ba[0] == 'X');
377 
378                 // urgent data should not be received
379                 s1.setOOBInline(false);
380                 s1.setSoTimeout(500);
381                 s2.sendUrgentData('X');
382                 try {
383                     s1.getInputStream().read(ba);
384                     fail();
385                 } catch (SocketTimeoutException expected) {
386                     log(expected);
387                 }
388             }
389         });
390     }
391 
392     /**
393      * ServerSocket accept, no blocking.
394      */
395     @Test
396     void testServerSocketAccept1() throws Exception {
397         VThreadRunner.run(() -> {
398             try (var listener = new ServerSocket()) {
399                 InetAddress loopback = InetAddress.getLoopbackAddress();
400                 listener.bind(new InetSocketAddress(loopback, 0));
401 
402                 // establish connection
403                 var socket1 = new Socket(loopback, listener.getLocalPort());
404 
405                 // accept should not block
406                 var socket2 = listener.accept();
407                 socket1.close();
408                 socket2.close();
409             }
410         });
411     }
412 
413     /**
414      * Virtual thread blocks in accept.
415      */
416     @ParameterizedTest
417     @ValueSource(ints = { 0, 60_000 })
418     void testServerSocketAccept(int timeout) throws Exception {
419         VThreadRunner.run(() -> {
420             try (var listener = new ServerSocket()) {
421                 InetAddress loopback = InetAddress.getLoopbackAddress();
422                 listener.bind(new InetSocketAddress(loopback, 0));
423 
424                 // schedule connect
425                 var socket1 = new Socket();
426                 SocketAddress remote = listener.getLocalSocketAddress();
427                 runAfterParkedAsync(() -> socket1.connect(remote));
428 
429                 // accept should block
430                 if (timeout > 0) {
431                     listener.setSoTimeout(timeout);
432                 }
433                 var socket2 = listener.accept();
434                 socket1.close();
435                 socket2.close();
436             }
437         });
438     }
439 
440     /**
441      * ServerSocket close while virtual thread blocked in accept.
442      */
443     @ParameterizedTest
444     @ValueSource(ints = { 0, 60_000 })
445     void testServerSocketAcceptAsyncClose(int timeout) throws Exception {
446         VThreadRunner.run(() -> {
447             try (var listener = new ServerSocket()) {
448                 InetAddress loopback = InetAddress.getLoopbackAddress();
449                 listener.bind(new InetSocketAddress(loopback, 0));
450 
451                 // delayed close of listener
452                 runAfterParkedAsync(listener::close);
453 
454                 // accept should block, then throw
455                 if (timeout > 0) {
456                     listener.setSoTimeout(timeout);
457                 }
458                 try {
459                     listener.accept().close();
460                     fail("connection accepted???");
461                 } catch (SocketException expected) {
462                     log(expected);
463                 }
464             }
465         });
466     }
467 
468     /**
469      * Virtual thread interrupted while blocked in ServerSocket accept.
470      */
471     @ParameterizedTest
472     @ValueSource(ints = { 0, 60_000 })
473     void testServerSocketAcceptInterrupt(int timeout) throws Exception {
474         VThreadRunner.run(() -> {
475             try (var listener = new ServerSocket()) {
476                 InetAddress loopback = InetAddress.getLoopbackAddress();
477                 listener.bind(new InetSocketAddress(loopback, 0));
478 
479                 // delayed interrupt of current thread
480                 Thread thisThread = Thread.currentThread();
481                 runAfterParkedAsync(thisThread::interrupt);
482 
483                 // accept should block, then throw
484                 if (timeout > 0) {
485                     listener.setSoTimeout(timeout);
486                 }
487                 try {
488                     listener.accept().close();
489                     fail("connection accepted???");
490                 } catch (SocketException expected) {
491                     log(expected);
492                     assertTrue(Thread.interrupted());
493                     assertTrue(listener.isClosed());
494                 }
495             }
496         });
497     }
498 
499     /**
500      * DatagramSocket receive/send, no blocking.
501      */
502     @Test
503     void testDatagramSocketSendReceive1() throws Exception {
504         VThreadRunner.run(() -> {
505             try (DatagramSocket s1 = new DatagramSocket(null);
506                  DatagramSocket s2 = new DatagramSocket(null)) {
507 
508                 InetAddress lh = InetAddress.getLoopbackAddress();
509                 s1.bind(new InetSocketAddress(lh, 0));
510                 s2.bind(new InetSocketAddress(lh, 0));
511 
512                 // send should not block
513                 byte[] bytes = "XXX".getBytes("UTF-8");
514                 DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
515                 p1.setSocketAddress(s2.getLocalSocketAddress());
516                 s1.send(p1);
517 
518                 // receive should not block
519                 byte[] ba = new byte[100];
520                 DatagramPacket p2 = new DatagramPacket(ba, ba.length);
521                 s2.receive(p2);
522                 assertEquals(s1.getLocalSocketAddress(), p2.getSocketAddress());
523                 assertTrue(ba[0] == 'X');
524             }
525         });
526     }
527 
528     /**
529      * Virtual thread blocks in DatagramSocket receive.
530      */
531     @ParameterizedTest
532     @ValueSource(ints = { 0, 60_000 })
533     void testDatagramSocketSendReceive(int timeout) throws Exception {
534         VThreadRunner.run(() -> {
535             try (DatagramSocket s1 = new DatagramSocket(null);
536                  DatagramSocket s2 = new DatagramSocket(null)) {
537 
538                 InetAddress lh = InetAddress.getLoopbackAddress();
539                 s1.bind(new InetSocketAddress(lh, 0));
540                 s2.bind(new InetSocketAddress(lh, 0));
541 
542                 // delayed send
543                 byte[] bytes = "XXX".getBytes("UTF-8");
544                 DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
545                 p1.setSocketAddress(s2.getLocalSocketAddress());
546                 runAfterParkedAsync(() -> s1.send(p1));
547 
548                 // receive should block
549                 if (timeout > 0) {
550                     s2.setSoTimeout(timeout);
551                 }
552                 byte[] ba = new byte[100];
553                 DatagramPacket p2 = new DatagramPacket(ba, ba.length);
554                 s2.receive(p2);
555                 assertEquals(s1.getLocalSocketAddress(), p2.getSocketAddress());
556                 assertTrue(ba[0] == 'X');
557             }
558         });
559     }
560 
561     /**
562      * Virtual thread blocks in DatagramSocket receive that times out.
563      */
564     @Test
565     void testDatagramSocketReceiveTimeout() throws Exception {
566         VThreadRunner.run(() -> {
567             try (DatagramSocket s = new DatagramSocket(null)) {
568                 InetAddress lh = InetAddress.getLoopbackAddress();
569                 s.bind(new InetSocketAddress(lh, 0));
570                 s.setSoTimeout(500);
571                 byte[] ba = new byte[100];
572                 DatagramPacket p = new DatagramPacket(ba, ba.length);
573                 try {
574                     s.receive(p);
575                     fail();
576                 } catch (SocketTimeoutException expected) {
577                     log(expected);
578                 }
579             }
580         });
581     }
582 
583     /**
584      * DatagramSocket close while virtual thread blocked in receive.
585      */
586     @ParameterizedTest
587     @ValueSource(ints = { 0, 60_000 })
588     void testDatagramSocketReceiveAsyncClose(int timeout) throws Exception {
589         VThreadRunner.run(() -> {
590             try (DatagramSocket s = new DatagramSocket(null)) {
591                 InetAddress lh = InetAddress.getLoopbackAddress();
592                 s.bind(new InetSocketAddress(lh, 0));
593 
594                 // delayed close of s
595                 runAfterParkedAsync(s::close);
596 
597                 // receive should block, then throw
598                 if (timeout > 0) {
599                     s.setSoTimeout(timeout);
600                 }
601                 try {
602                     byte[] ba = new byte[100];
603                     DatagramPacket p = new DatagramPacket(ba, ba.length);
604                     s.receive(p);
605                     fail();
606                 } catch (SocketException expected) {
607                     log(expected);
608                 }
609             }
610         });
611     }
612 
613     /**
614      * Virtual thread interrupted while blocked in DatagramSocket receive.
615      */
616     @ParameterizedTest
617     @ValueSource(ints = { 0, 60_000 })
618     void testDatagramSocketReceiveInterrupt(int timeout) throws Exception {
619         VThreadRunner.run(() -> {
620             try (DatagramSocket s = new DatagramSocket(null)) {
621                 InetAddress lh = InetAddress.getLoopbackAddress();
622                 s.bind(new InetSocketAddress(lh, 0));
623 
624                 // delayed interrupt of current thread
625                 Thread thisThread = Thread.currentThread();
626                 runAfterParkedAsync(thisThread::interrupt);
627 
628                 // receive should block, then throw
629                 if (timeout > 0) {
630                     s.setSoTimeout(timeout);
631                 }
632                 try {
633                     byte[] ba = new byte[100];
634                     DatagramPacket p = new DatagramPacket(ba, ba.length);
635                     s.receive(p);
636                     fail();
637                 } catch (SocketException expected) {
638                     log(expected);
639                     assertTrue(Thread.interrupted());
640                     assertTrue(s.isClosed());
641                 }
642             }
643         });
644     }
645 
646     /**
647      * Creates a loopback connection
648      */
649     static class Connection implements Closeable {
650         private final Socket s1;
651         private final Socket s2;
652         Connection() throws IOException {
653             var lh = InetAddress.getLoopbackAddress();
654             try (var listener = new ServerSocket()) {
655                 listener.bind(new InetSocketAddress(lh, 0));
656                 Socket s1 = new Socket();
657                 Socket s2;
658                 try {
659                     s1.connect(listener.getLocalSocketAddress());
660                     s2 = listener.accept();
661                 } catch (IOException ioe) {
662                     s1.close();
663                     throw ioe;
664                 }
665                 this.s1 = s1;
666                 this.s2 = s2;
667             }
668 
669         }
670         Socket socket1() {
671             return s1;
672         }
673         Socket socket2() {
674             return s2;
675         }
676         @Override
677         public void close() throws IOException {
678             s1.close();
679             s2.close();
680         }
681     }
682 
683     @FunctionalInterface
684     interface ThrowingRunnable {
685         void run() throws Exception;
686     }
687 
688     /**
689      * Runs the given task asynchronously after the current virtual thread has parked.
690      * @return the thread started to run the task
691      */
692     static Thread runAfterParkedAsync(ThrowingRunnable task) {
693         Thread target = Thread.currentThread();
694         if (!target.isVirtual())
695             throw new WrongThreadException();
696         return Thread.ofPlatform().daemon().start(() -> {
697             try {
698                 Thread.State state = target.getState();
699                 while (state != Thread.State.WAITING
700                         && state != Thread.State.TIMED_WAITING) {
701                     Thread.sleep(20);
702                     state = target.getState();
703                 }
704                 Thread.sleep(20);  // give a bit more time to release carrier
705                 task.run();
706             } catch (Exception e) {
707                 e.printStackTrace();
708             }
709         });
710     }
711 
712     /**
713      * Log to System.err to inline with the JUnit messages.
714      */
715     static void log(Throwable e) {
716         System.err.println(e);
717     }
718 }