1 /*
  2  * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /**
 25  * @test id=default
 26  * @bug 8284161
 27  * @summary Test virtual threads doing blocking I/O on java.net Sockets
 28  * @library /test/lib
 29  * @run junit BlockingSocketOps
 30  */
 31 
 32 /**
 33  * @test id=poller-modes
 34  * @requires (os.family == "linux") | (os.family == "mac")
 35  * @library /test/lib
 36  * @run junit/othervm -Djdk.pollerMode=1 BlockingSocketOps
 37  * @run junit/othervm -Djdk.pollerMode=2 BlockingSocketOps
 38  */
 39 
 40 /**
 41  * @test id=no-vmcontinuations
 42  * @requires vm.continuations
 43  * @library /test/lib
 44  * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingSocketOps
 45  */
 46 
 47 import java.io.Closeable;
 48 import java.io.IOException;
 49 import java.io.InputStream;
 50 import java.io.OutputStream;
 51 import java.net.DatagramPacket;
 52 import java.net.DatagramSocket;
 53 import java.net.InetAddress;
 54 import java.net.InetSocketAddress;
 55 import java.net.ServerSocket;
 56 import java.net.Socket;
 57 import java.net.SocketAddress;
 58 import java.net.SocketException;
 59 import java.net.SocketTimeoutException;
 60 
 61 import jdk.test.lib.thread.VThreadRunner;
 62 import org.junit.jupiter.api.Test;
 63 import static org.junit.jupiter.api.Assertions.*;
 64 
 65 class BlockingSocketOps {
 66 
 67     /**
 68      * Socket read/write, no blocking.
 69      */
 70     @Test
 71     void testSocketReadWrite1() throws Exception {
 72         VThreadRunner.run(() -> {
 73             try (var connection = new Connection()) {
 74                 Socket s1 = connection.socket1();
 75                 Socket s2 = connection.socket2();
 76 
 77                 // write should not block
 78                 byte[] ba = "XXX".getBytes("UTF-8");
 79                 s1.getOutputStream().write(ba);
 80 
 81                 // read should not block
 82                 ba = new byte[10];
 83                 int n = s2.getInputStream().read(ba);
 84                 assertTrue(n > 0);
 85                 assertTrue(ba[0] == 'X');
 86             }
 87         });
 88     }
 89 
 90     /**
 91      * Virtual thread blocks in read.
 92      */
 93     @Test
 94     void testSocketRead1() throws Exception {
 95         testSocketRead(0);
 96     }
 97 
 98     /**
 99      * Virtual thread blocks in timed read.
100      */
101     @Test
102     void testSocketRead2() throws Exception {
103         testSocketRead(60_000);
104     }
105 
106     void testSocketRead(int timeout) throws Exception {
107         VThreadRunner.run(() -> {
108             try (var connection = new Connection()) {
109                 Socket s1 = connection.socket1();
110                 Socket s2 = connection.socket2();
111 
112                 // delayed write from sc1
113                 byte[] ba1 = "XXX".getBytes("UTF-8");
114                 runAfterParkedAsync(() -> s1.getOutputStream().write(ba1));
115 
116                 // read from sc2 should block
117                 if (timeout > 0) {
118                     s2.setSoTimeout(timeout);
119                 }
120                 byte[] ba2 = new byte[10];
121                 int n = s2.getInputStream().read(ba2);
122                 assertTrue(n > 0);
123                 assertTrue(ba2[0] == 'X');
124             }
125         });
126     }
127 
128     /**
129      * Virtual thread blocks in write.
130      */
131     @Test
132     void testSocketWrite1() throws Exception {
133         VThreadRunner.run(() -> {
134             try (var connection = new Connection()) {
135                 Socket s1 = connection.socket1();
136                 Socket s2 = connection.socket2();
137 
138                 // delayed read from s2 to EOF
139                 InputStream in = s2.getInputStream();
140                 Thread reader = runAfterParkedAsync(() ->
141                         in.transferTo(OutputStream.nullOutputStream()));
142 
143                 // write should block
144                 byte[] ba = new byte[100*1024];
145                 try (OutputStream out = s1.getOutputStream()) {
146                     for (int i = 0; i < 1000; i++) {
147                         out.write(ba);
148                     }
149                 }
150 
151                 // wait for reader to finish
152                 reader.join();
153             }
154         });
155     }
156 
157     /**
158      * Virtual thread blocks in read, peer closes connection gracefully.
159      */
160     @Test
161     void testSocketReadPeerClose1() throws Exception {
162         VThreadRunner.run(() -> {
163             try (var connection = new Connection()) {
164                 Socket s1 = connection.socket1();
165                 Socket s2 = connection.socket2();
166 
167                 // delayed close of s2
168                 runAfterParkedAsync(s2::close);
169 
170                 // read from s1 should block, then read -1
171                 int n = s1.getInputStream().read();
172                 assertTrue(n == -1);
173             }
174         });
175     }
176 
177     /**
178      * Virtual thread blocks in read, peer closes connection abruptly.
179      */
180     @Test
181     void testSocketReadPeerClose2() throws Exception {
182         VThreadRunner.run(() -> {
183             try (var connection = new Connection()) {
184                 Socket s1 = connection.socket1();
185                 Socket s2 = connection.socket2();
186 
187                 // delayed abrupt close of s2
188                 s2.setSoLinger(true, 0);
189                 runAfterParkedAsync(s2::close);
190 
191                 // read from s1 should block, then throw
192                 try {
193                     int n = s1.getInputStream().read();
194                     fail("read " + n);
195                 } catch (IOException ioe) {
196                     // expected
197                 }
198             }
199         });
200     }
201 
202     /**
203      * Socket close while virtual thread blocked in read.
204      */
205     @Test
206     void testSocketReadAsyncClose1() throws Exception {
207         testSocketReadAsyncClose(0);
208     }
209 
210     /**
211      * Socket close while virtual thread blocked in timed read.
212      */
213     @Test
214     void testSocketReadAsyncClose2() throws Exception {
215         testSocketReadAsyncClose(0);
216     }
217 
218     void testSocketReadAsyncClose(int timeout) throws Exception {
219         VThreadRunner.run(() -> {
220             try (var connection = new Connection()) {
221                 Socket s = connection.socket1();
222 
223                 // delayed close of s
224                 runAfterParkedAsync(s::close);
225 
226                 // read from s should block, then throw
227                 if (timeout > 0) {
228                     s.setSoTimeout(timeout);
229                 }
230                 try {
231                     int n = s.getInputStream().read();
232                     fail("read " + n);
233                 } catch (SocketException expected) { }
234             }
235         });
236     }
237 
238     /**
239      * Virtual thread interrupted while blocked in Socket read.
240      */
241     @Test
242     void testSocketReadInterrupt1() throws Exception {
243         testSocketReadInterrupt(0);
244     }
245 
246     /**
247      * Virtual thread interrupted while blocked in Socket read with timeout
248      */
249     @Test
250     void testSocketReadInterrupt2() throws Exception {
251         testSocketReadInterrupt(60_000);
252     }
253 
254     void testSocketReadInterrupt(int timeout) throws Exception {
255         VThreadRunner.run(() -> {
256             try (var connection = new Connection()) {
257                 Socket s = connection.socket1();
258 
259 
260                 // delayed interrupt of current thread
261                 Thread thisThread = Thread.currentThread();
262                 runAfterParkedAsync(thisThread::interrupt);
263 
264                 // read from s should block, then throw
265                 if (timeout > 0) {
266                     s.setSoTimeout(timeout);
267                 }
268                 try {
269                     int n = s.getInputStream().read();
270                     fail("read " + n);
271                 } catch (SocketException expected) {
272                     assertTrue(Thread.interrupted());
273                     assertTrue(s.isClosed());
274                 }
275             }
276         });
277     }
278 
279     /**
280      * Socket close while virtual thread blocked in write.
281      */
282     @Test
283     void testSocketWriteAsyncClose() throws Exception {
284         VThreadRunner.run(() -> {
285             try (var connection = new Connection()) {
286                 Socket s = connection.socket1();
287 
288                 // delayedclose of s
289                 runAfterParkedAsync(s::close);
290 
291                 // write to s should block, then throw
292                 try {
293                     byte[] ba = new byte[100*1024];
294                     OutputStream out = s.getOutputStream();
295                     for (;;) {
296                         out.write(ba);
297                     }
298                 } catch (SocketException expected) { }
299             }
300         });
301     }
302 
303     /**
304      * Virtual thread interrupted while blocked in Socket write.
305      */
306     @Test
307     void testSocketWriteInterrupt() throws Exception {
308         VThreadRunner.run(() -> {
309             try (var connection = new Connection()) {
310                 Socket s = connection.socket1();
311 
312                 // delayed interrupt of current thread
313                 Thread thisThread = Thread.currentThread();
314                 runAfterParkedAsync(thisThread::interrupt);
315 
316                 // write to s should block, then throw
317                 try {
318                     byte[] ba = new byte[100*1024];
319                     OutputStream out = s.getOutputStream();
320                     for (;;) {
321                         out.write(ba);
322                     }
323                 } catch (SocketException expected) {
324                     assertTrue(Thread.interrupted());
325                     assertTrue(s.isClosed());
326                 }
327             }
328         });
329     }
330 
331     /**
332      * Virtual thread reading urgent data when SO_OOBINLINE is enabled.
333      */
334     @Test
335     void testSocketReadUrgentData() throws Exception {
336         VThreadRunner.run(() -> {
337             try (var connection = new Connection()) {
338                 Socket s1 = connection.socket1();
339                 Socket s2 = connection.socket2();
340 
341                 // urgent data should be received
342                 runAfterParkedAsync(() -> s2.sendUrgentData('X'));
343 
344                 // read should block, then read the OOB byte
345                 s1.setOOBInline(true);
346                 byte[] ba = new byte[10];
347                 int n = s1.getInputStream().read(ba);
348                 assertTrue(n == 1);
349                 assertTrue(ba[0] == 'X');
350 
351                 // urgent data should not be received
352                 s1.setOOBInline(false);
353                 s1.setSoTimeout(500);
354                 s2.sendUrgentData('X');
355                 try {
356                     s1.getInputStream().read(ba);
357                     fail();
358                 } catch (SocketTimeoutException expected) { }
359             }
360         });
361     }
362 
363     /**
364      * ServerSocket accept, no blocking.
365      */
366     @Test
367     void testServerSocketAccept1() throws Exception {
368         VThreadRunner.run(() -> {
369             try (var listener = new ServerSocket()) {
370                 InetAddress loopback = InetAddress.getLoopbackAddress();
371                 listener.bind(new InetSocketAddress(loopback, 0));
372 
373                 // establish connection
374                 var socket1 = new Socket(loopback, listener.getLocalPort());
375 
376                 // accept should not block
377                 var socket2 = listener.accept();
378                 socket1.close();
379                 socket2.close();
380             }
381         });
382     }
383 
384     /**
385      * Virtual thread blocks in accept.
386      */
387     @Test
388     void testServerSocketAccept2() throws Exception {
389         testServerSocketAccept(0);
390     }
391 
392     /**
393      * Virtual thread blocks in timed accept.
394      */
395     @Test
396     void testServerSocketAccept3() throws Exception {
397         testServerSocketAccept(60_000);
398     }
399 
400     void testServerSocketAccept(int timeout) throws Exception {
401         VThreadRunner.run(() -> {
402             try (var listener = new ServerSocket()) {
403                 InetAddress loopback = InetAddress.getLoopbackAddress();
404                 listener.bind(new InetSocketAddress(loopback, 0));
405 
406                 // schedule connect
407                 var socket1 = new Socket();
408                 SocketAddress remote = listener.getLocalSocketAddress();
409                 runAfterParkedAsync(() -> socket1.connect(remote));
410 
411                 // accept should block
412                 if (timeout > 0) {
413                     listener.setSoTimeout(timeout);
414                 }
415                 var socket2 = listener.accept();
416                 socket1.close();
417                 socket2.close();
418             }
419         });
420     }
421 
422     /**
423      * ServerSocket close while virtual thread blocked in accept.
424      */
425     @Test
426     void testServerSocketAcceptAsyncClose1() throws Exception {
427         testServerSocketAcceptAsyncClose(0);
428     }
429 
430     /**
431      * ServerSocket close while virtual thread blocked in timed accept.
432      */
433     @Test
434     void testServerSocketAcceptAsyncClose2() throws Exception {
435         testServerSocketAcceptAsyncClose(60_000);
436     }
437 
438     void testServerSocketAcceptAsyncClose(int timeout) throws Exception {
439         VThreadRunner.run(() -> {
440             try (var listener = new ServerSocket()) {
441                 InetAddress loopback = InetAddress.getLoopbackAddress();
442                 listener.bind(new InetSocketAddress(loopback, 0));
443 
444                 // delayed close of listener
445                 runAfterParkedAsync(listener::close);
446 
447                 // accept should block, then throw
448                 if (timeout > 0) {
449                     listener.setSoTimeout(timeout);
450                 }
451                 try {
452                     listener.accept().close();
453                     fail("connection accepted???");
454                 } catch (SocketException expected) { }
455             }
456         });
457     }
458 
459     /**
460      * Virtual thread interrupted while blocked in ServerSocket accept.
461      */
462     @Test
463     void testServerSocketAcceptInterrupt1() throws Exception {
464         testServerSocketAcceptInterrupt(0);
465     }
466 
467     /**
468      * Virtual thread interrupted while blocked in ServerSocket accept with timeout.
469      */
470     @Test
471     void testServerSocketAcceptInterrupt2() throws Exception {
472         testServerSocketAcceptInterrupt(60_000);
473     }
474 
475     void testServerSocketAcceptInterrupt(int timeout) throws Exception {
476         VThreadRunner.run(() -> {
477             try (var listener = new ServerSocket()) {
478                 InetAddress loopback = InetAddress.getLoopbackAddress();
479                 listener.bind(new InetSocketAddress(loopback, 0));
480 
481                 // delayed interrupt of current thread
482                 Thread thisThread = Thread.currentThread();
483                 runAfterParkedAsync(thisThread::interrupt);
484 
485                 // accept should block, then throw
486                 if (timeout > 0) {
487                     listener.setSoTimeout(timeout);
488                 }
489                 try {
490                     listener.accept().close();
491                     fail("connection accepted???");
492                 } catch (SocketException expected) {
493                     assertTrue(Thread.interrupted());
494                     assertTrue(listener.isClosed());
495                 }
496             }
497         });
498     }
499 
500     /**
501      * DatagramSocket receive/send, no blocking.
502      */
503     @Test
504     void testDatagramSocketSendReceive1() throws Exception {
505         VThreadRunner.run(() -> {
506             try (DatagramSocket s1 = new DatagramSocket(null);
507                  DatagramSocket s2 = new DatagramSocket(null)) {
508 
509                 InetAddress lh = InetAddress.getLoopbackAddress();
510                 s1.bind(new InetSocketAddress(lh, 0));
511                 s2.bind(new InetSocketAddress(lh, 0));
512 
513                 // send should not block
514                 byte[] bytes = "XXX".getBytes("UTF-8");
515                 DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
516                 p1.setSocketAddress(s2.getLocalSocketAddress());
517                 s1.send(p1);
518 
519                 // receive should not block
520                 byte[] ba = new byte[100];
521                 DatagramPacket p2 = new DatagramPacket(ba, ba.length);
522                 s2.receive(p2);
523                 assertEquals(s1.getLocalSocketAddress(), p2.getSocketAddress());
524                 assertTrue(ba[0] == 'X');
525             }
526         });
527     }
528 
529     /**
530      * Virtual thread blocks in DatagramSocket receive.
531      */
532     @Test
533     void testDatagramSocketSendReceive2() throws Exception {
534         testDatagramSocketSendReceive(0);
535     }
536 
537     /**
538      * Virtual thread blocks in DatagramSocket receive with timeout.
539      */
540     @Test
541     void testDatagramSocketSendReceive3() throws Exception {
542         testDatagramSocketSendReceive(60_000);
543     }
544 
545     private void testDatagramSocketSendReceive(int timeout) throws Exception {
546         VThreadRunner.run(() -> {
547             try (DatagramSocket s1 = new DatagramSocket(null);
548                  DatagramSocket s2 = new DatagramSocket(null)) {
549 
550                 InetAddress lh = InetAddress.getLoopbackAddress();
551                 s1.bind(new InetSocketAddress(lh, 0));
552                 s2.bind(new InetSocketAddress(lh, 0));
553 
554                 // delayed send
555                 byte[] bytes = "XXX".getBytes("UTF-8");
556                 DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
557                 p1.setSocketAddress(s2.getLocalSocketAddress());
558                 runAfterParkedAsync(() -> s1.send(p1));
559 
560                 // receive should block
561                 if (timeout > 0) {
562                     s2.setSoTimeout(timeout);
563                 }
564                 byte[] ba = new byte[100];
565                 DatagramPacket p2 = new DatagramPacket(ba, ba.length);
566                 s2.receive(p2);
567                 assertEquals(s1.getLocalSocketAddress(), p2.getSocketAddress());
568                 assertTrue(ba[0] == 'X');
569             }
570         });
571     }
572 
573     /**
574      * Virtual thread blocks in DatagramSocket receive that times out.
575      */
576     @Test
577     void testDatagramSocketReceiveTimeout() throws Exception {
578         VThreadRunner.run(() -> {
579             try (DatagramSocket s = new DatagramSocket(null)) {
580                 InetAddress lh = InetAddress.getLoopbackAddress();
581                 s.bind(new InetSocketAddress(lh, 0));
582                 s.setSoTimeout(500);
583                 byte[] ba = new byte[100];
584                 DatagramPacket p = new DatagramPacket(ba, ba.length);
585                 try {
586                     s.receive(p);
587                     fail();
588                 } catch (SocketTimeoutException expected) { }
589             }
590         });
591     }
592 
593     /**
594      * DatagramSocket close while virtual thread blocked in receive.
595      */
596     @Test
597     void testDatagramSocketReceiveAsyncClose1() throws Exception {
598         testDatagramSocketReceiveAsyncClose(0);
599     }
600 
601     /**
602      * DatagramSocket close while virtual thread blocked with timeout.
603      */
604     @Test
605     void testDatagramSocketReceiveAsyncClose2() throws Exception {
606         testDatagramSocketReceiveAsyncClose(60_000);
607     }
608 
609     private void testDatagramSocketReceiveAsyncClose(int timeout) throws Exception {
610         VThreadRunner.run(() -> {
611             try (DatagramSocket s = new DatagramSocket(null)) {
612                 InetAddress lh = InetAddress.getLoopbackAddress();
613                 s.bind(new InetSocketAddress(lh, 0));
614 
615                 // delayed close of s
616                 runAfterParkedAsync(s::close);
617 
618                 // receive should block, then throw
619                 if (timeout > 0) {
620                     s.setSoTimeout(timeout);
621                 }
622                 try {
623                     byte[] ba = new byte[100];
624                     DatagramPacket p = new DatagramPacket(ba, ba.length);
625                     s.receive(p);
626                     fail();
627                 } catch (SocketException expected) { }
628             }
629         });
630     }
631 
632     /**
633      * Virtual thread interrupted while blocked in DatagramSocket receive.
634      */
635     @Test
636     void testDatagramSocketReceiveInterrupt1() throws Exception {
637         testDatagramSocketReceiveInterrupt(0);
638     }
639 
640     /**
641      * Virtual thread interrupted while blocked in DatagramSocket receive with timeout.
642      */
643     @Test
644     void testDatagramSocketReceiveInterrupt2() throws Exception {
645         testDatagramSocketReceiveInterrupt(60_000);
646     }
647 
648     private void testDatagramSocketReceiveInterrupt(int timeout) throws Exception {
649         VThreadRunner.run(() -> {
650             try (DatagramSocket s = new DatagramSocket(null)) {
651                 InetAddress lh = InetAddress.getLoopbackAddress();
652                 s.bind(new InetSocketAddress(lh, 0));
653 
654                 // delayed interrupt of current thread
655                 Thread thisThread = Thread.currentThread();
656                 runAfterParkedAsync(thisThread::interrupt);
657 
658                 // receive should block, then throw
659                 if (timeout > 0) {
660                     s.setSoTimeout(timeout);
661                 }
662                 try {
663                     byte[] ba = new byte[100];
664                     DatagramPacket p = new DatagramPacket(ba, ba.length);
665                     s.receive(p);
666                     fail();
667                 } catch (SocketException expected) {
668                     assertTrue(Thread.interrupted());
669                     assertTrue(s.isClosed());
670                 }
671             }
672         });
673     }
674 
675     /**
676      * Creates a loopback connection
677      */
678     static class Connection implements Closeable {
679         private final Socket s1;
680         private final Socket s2;
681         Connection() throws IOException {
682             var lh = InetAddress.getLoopbackAddress();
683             try (var listener = new ServerSocket()) {
684                 listener.bind(new InetSocketAddress(lh, 0));
685                 Socket s1 = new Socket();
686                 Socket s2;
687                 try {
688                     s1.connect(listener.getLocalSocketAddress());
689                     s2 = listener.accept();
690                 } catch (IOException ioe) {
691                     s1.close();
692                     throw ioe;
693                 }
694                 this.s1 = s1;
695                 this.s2 = s2;
696             }
697 
698         }
699         Socket socket1() {
700             return s1;
701         }
702         Socket socket2() {
703             return s2;
704         }
705         @Override
706         public void close() throws IOException {
707             s1.close();
708             s2.close();
709         }
710     }
711 
712     @FunctionalInterface
713     interface ThrowingRunnable {
714         void run() throws Exception;
715     }
716 
717     /**
718      * Runs the given task asynchronously after the current virtual thread has parked.
719      * @return the thread started to run the task
720      */
721     static Thread runAfterParkedAsync(ThrowingRunnable task) {
722         Thread target = Thread.currentThread();
723         if (!target.isVirtual())
724             throw new WrongThreadException();
725         return Thread.ofPlatform().daemon().start(() -> {
726             try {
727                 Thread.State state = target.getState();
728                 while (state != Thread.State.WAITING
729                         && state != Thread.State.TIMED_WAITING) {
730                     Thread.sleep(20);
731                     state = target.getState();
732                 }
733                 Thread.sleep(20);  // give a bit more time to release carrier
734                 task.run();
735             } catch (Exception e) {
736                 e.printStackTrace();
737             }
738         });
739     }
740 }