1 /*
   2  * Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /**
  25  * @test
  26  * @run testng NetSockets
  27  * @summary Basic tests for Fibers using java.net.Socket/ServerSocket
  28  */
  29 
  30 import java.io.*;
  31 import java.net.*;
  32 
  33 import org.testng.annotations.Test;
  34 import org.testng.annotations.DataProvider;
  35 import static org.testng.Assert.*;
  36 
  37 @Test
  38 public class NetSockets {
  39 
  40     private static final long DELAY = 2000;
  41 
  42     private interface TestCase {
  43         void run() throws IOException;
  44     }
  45 
  46     private void test(TestCase test) throws Exception {
  47         try (var scope = FiberScope.open()) {
  48             scope.schedule(() -> {
  49                 test.run();
  50                 return null;
  51             }).join();
  52         }
  53     }
  54 
  55     /**
  56      * Cancel a fiber in connect.
  57      */
  58     public void testSocketConnectCancel() throws Exception {
  59         test(() -> {
  60             try (var listener = new ServerSocket()) {
  61                 listener.bind(new InetSocketAddress( InetAddress.getLocalHost(), 0));
  62                 Fiber.current().map(Fiber::cancel);
  63                 Socket s = new Socket();
  64                 try {
  65                     s.connect(listener.getLocalSocketAddress());
  66                     assertTrue(false);
  67                 } catch (IOException expected) {
  68                 } finally {
  69                     s.close();
  70                 }
  71             }
  72         });
  73     }
  74 
  75     /**
  76      * Socket read/write, no blocking.
  77      */
  78     public void testSocketReadWrite1() throws Exception {
  79         test(() -> {
  80             try (var connection = new Connection()) {
  81                 Socket s1 = connection.socket1();
  82                 Socket s2 = connection.socket2();
  83 
  84                 // write should not block
  85                 byte[] ba = "XXX".getBytes("UTF-8");
  86                 s1.getOutputStream().write(ba);
  87 
  88                 // read should not block
  89                 ba = new byte[10];
  90                 int n = s2.getInputStream().read(ba);
  91                 assertTrue(n > 0);
  92                 assertTrue(ba[0] == 'X');
  93             }
  94         });
  95     }
  96 
  97     /**
  98      * Fiber blocks in read.
  99      */
 100     public void testSocketReadWrite2() throws Exception {
 101         test(() -> {
 102             try (var connection = new Connection()) {
 103                 Socket s1 = connection.socket1();
 104                 Socket s2 = connection.socket2();
 105 
 106                 // schedule write
 107                 byte[] ba = "XXX".getBytes("UTF-8");
 108                 ScheduledWriter.schedule(s1, ba, DELAY);
 109 
 110                 // read should block
 111                 ba = new byte[10];
 112                 int n = s2.getInputStream().read(ba);
 113                 assertTrue(n > 0);
 114                 assertTrue(ba[0] == 'X');
 115             }
 116         });
 117     }
 118 
 119     /**
 120      * Fiber blocks in write.
 121      */
 122     public void testSocketReadWrite3() throws Exception {
 123         test(() -> {
 124             try (var connection = new Connection()) {
 125                 Socket s1 = connection.socket1();
 126                 Socket s2 = connection.socket2();
 127 
 128                 // schedule thread to read to EOF
 129                 ScheduledReader.schedule(s2, true, DELAY);
 130 
 131                 // write should block
 132                 byte[] ba = new byte[100*1024];
 133                 OutputStream out = s1.getOutputStream();
 134                 for (int i=0; i<1000; i++) {
 135                     out.write(ba);
 136                 }
 137             }
 138         });
 139     }
 140 
 141     /**
 142      * Fibers blocks in read, peer closes connection.
 143      */
 144     public void testSocketReadPeerClose1() throws Exception {
 145         test(() -> {
 146             try (var connection = new Connection()) {
 147                 Socket s1 = connection.socket1();
 148                 Socket s2 = connection.socket2();
 149 
 150                 ScheduledCloser.schedule(s2, DELAY);
 151 
 152                 int n = s1.getInputStream().read();
 153                 assertTrue(n == -1);
 154             }
 155         });
 156     }
 157 
 158     /**
 159      * Fibers blocks in read, peer closes connection abruptly.
 160      */
 161     public void testSocketReadPeerClose2() throws Exception {
 162         test(() -> {
 163             try (var connection = new Connection()) {
 164                 Socket s1 = connection.socket1();
 165                 Socket s2 = connection.socket2();
 166 
 167                 s2.setSoLinger(true, 0);
 168                 ScheduledCloser.schedule(s2, DELAY);
 169 
 170                 try {
 171                     s1.getInputStream().read();
 172                     assertTrue(false);
 173                 } catch (IOException ioe) {
 174                     // expected
 175                 }
 176             }
 177         });
 178     }
 179 
 180     /**
 181      * Socket close while Fiber blocked in read.
 182      */
 183     public void testSocketReadAsyncClose() throws Exception {
 184         test(() -> {
 185             try (var connection = new Connection()) {
 186                 Socket s = connection.socket1();
 187                 ScheduledCloser.schedule(s, DELAY);
 188                 try {
 189                     int n = s.getInputStream().read();
 190                     throw new RuntimeException("read returned " + n);
 191                 } catch (SocketException expected) { }
 192             }
 193         });
 194     }
 195 
 196     /**
 197      * Socket close while Fiber blocked in write.
 198      */
 199     public void testSocketWriteAsyncClose() throws Exception {
 200         test(() -> {
 201             try (var connection = new Connection()) {
 202                 Socket s = connection.socket1();
 203                 ScheduledCloser.schedule(s, DELAY);
 204                 try {
 205                     byte[] ba = new byte[100*10024];
 206                     OutputStream out = s.getOutputStream();
 207                     for (;;) {
 208                         out.write(ba);
 209                     }
 210                 } catch (SocketException expected) { }
 211             }
 212         });
 213     }
 214 
 215     /**
 216      * Cancel a fiber blocked in read.
 217      */
 218     public void testSocketReadCancel() throws Exception {
 219         test(() -> {
 220             try (var connection = new Connection()) {
 221                 var fiber = Fiber.current().orElseThrow();
 222                 ScheduledCanceller.schedule(fiber, DELAY);
 223                 Socket s = connection.socket1();
 224                 try {
 225                     int n = s.getInputStream().read();
 226                     throw new RuntimeException("read returned " + n);
 227                 } catch (SocketException expected) { }
 228             }
 229         });
 230     }
 231 
 232     /**
 233      * Cancel a fiber blocked in write.
 234      */
 235     public void testSocketWriteCancel() throws Exception {
 236         test(() -> {
 237             try (var connection = new Connection()) {
 238                 var fiber = Fiber.current().orElseThrow();
 239                 ScheduledCanceller.schedule(fiber, DELAY);
 240                 Socket s = connection.socket1();
 241                 try {
 242                     byte[] ba = new byte[100*10024];
 243                     OutputStream out = s.getOutputStream();
 244                     for (;;) {
 245                         out.write(ba);
 246                     }
 247                 } catch (SocketException expected) { }
 248             }
 249         });
 250     }
 251 
 252     /**
 253      * ServerSocket accept, no blocking.
 254      */
 255     public void testServerSocketAccept1() throws Exception {
 256         test(() -> {
 257             try (var listener = new ServerSocket(0)) {
 258                 var socket1 = new Socket(listener.getInetAddress(), listener.getLocalPort());
 259                 // accept should not block
 260                 var socket2 = listener.accept();
 261                 socket1.close();
 262                 socket2.close();
 263             }
 264         });
 265     }
 266 
 267     /**
 268      * Fiber blocks in accept.
 269      */
 270     public void testServerSocketAccept2() throws Exception {
 271         test(() -> {
 272             try (var listener = new ServerSocket(0)) {
 273                 var socket1 = new Socket();
 274                 ScheduledConnector.schedule(socket1, listener.getLocalSocketAddress(), DELAY);
 275                 // accept will block
 276                 var socket2 = listener.accept();
 277                 socket1.close();
 278                 socket2.close();
 279             }
 280         });
 281     }
 282 
 283     /**
 284      * ServerSocket close while Fiber blocked in accept.
 285      */
 286     public void testServerSocketAcceptAsyncClose() throws Exception {
 287         test(() -> {
 288             try (var listener = new ServerSocket(0)) {
 289                 ScheduledCloser.schedule(listener, DELAY);
 290                 try {
 291                     listener.accept().close();
 292                     throw new RuntimeException("connection accepted???");
 293                 } catch (SocketException expected) { }
 294             }
 295         });
 296     }
 297 
 298     /**
 299      * Fiber cancelled while blocked in accept.
 300      */
 301     public void testServerSocketAcceptCancel() throws Exception {
 302         test(() -> {
 303             try (var listener = new ServerSocket(0)) {
 304                 var fiber = Fiber.current().orElseThrow();
 305                 ScheduledCanceller.schedule(fiber, DELAY);
 306                 try {
 307                     listener.accept().close();
 308                     throw new RuntimeException("connection accepted???");
 309                 } catch (SocketException expected) { }
 310             }
 311         });
 312     }
 313 
 314     // -- supporting classes --
 315 
 316     /**
 317      * Creates a loopback connection
 318      */
 319     static class Connection implements Closeable {
 320         private final ServerSocket ss;
 321         private final Socket s1;
 322         private final Socket s2;
 323         Connection() throws IOException {
 324             ServerSocket ss = new ServerSocket();
 325             var lh = InetAddress.getLocalHost();
 326             ss.bind(new InetSocketAddress(lh, 0));
 327             Socket s = new Socket();
 328             s.connect(ss.getLocalSocketAddress());
 329 
 330             this.ss = ss;
 331             this.s1 = s;
 332             this.s2 = ss.accept();
 333         }
 334         Socket socket1() {
 335             return s1;
 336         }
 337         Socket socket2() {
 338             return s2;
 339         }
 340         @Override
 341         public void close() throws IOException {
 342             if (ss != null) ss.close();
 343             if (s1 != null) s1.close();
 344             if (s2 != null) s2.close();
 345         }
 346     }
 347 
 348     /**
 349      * Closes a socket after a delay
 350      */
 351     static class ScheduledCloser implements Runnable {
 352         private final Closeable c;
 353         private final long delay;
 354         ScheduledCloser(Closeable c, long delay) {
 355             this.c = c;
 356             this.delay = delay;
 357         }
 358         @Override
 359         public void run() {
 360             try {
 361                 Thread.sleep(delay);
 362                 c.close();
 363             } catch (Exception e) { }
 364         }
 365         static void schedule(Closeable c, long delay) {
 366             new Thread(new ScheduledCloser(c, delay)).start();
 367         }
 368     }
 369 
 370     /**
 371      * Reads from a socket, and to EOF, after a delay
 372      */
 373     static class ScheduledReader implements Runnable {
 374         private final Socket s;
 375         private final boolean readAll;
 376         private final long delay;
 377 
 378         ScheduledReader(Socket s, boolean readAll, long delay) {
 379             this.s = s;
 380             this.readAll = readAll;
 381             this.delay = delay;
 382         }
 383 
 384         @Override
 385         public void run() {
 386             try {
 387                 Thread.sleep(delay);
 388                 byte[] ba = new byte[8192];
 389                 InputStream in = s.getInputStream();
 390                 for (;;) {
 391                     int n = in.read(ba);
 392                     if (n == -1 || !readAll)
 393                         break;
 394                 }
 395             } catch (Exception e) { }
 396         }
 397 
 398         static void schedule(Socket s, boolean readAll, long delay) {
 399             new Thread(new ScheduledReader(s, readAll, delay)).start();
 400         }
 401     }
 402 
 403     /**
 404      * Writes to a socket after a delay
 405      */
 406     static class ScheduledWriter implements Runnable {
 407         private final Socket s;
 408         private final byte[] ba;
 409         private final long delay;
 410 
 411         ScheduledWriter(Socket s, byte[] ba, long delay) {
 412             this.s = s;
 413             this.ba = ba.clone();
 414             this.delay = delay;
 415         }
 416 
 417         @Override
 418         public void run() {
 419             try {
 420                 Thread.sleep(delay);
 421                 s.getOutputStream().write(ba);
 422             } catch (Exception e) { }
 423         }
 424 
 425         static void schedule(Socket s, byte[] ba, long delay) {
 426             new Thread(new ScheduledWriter(s, ba, delay)).start();
 427         }
 428     }
 429 
 430     /**
 431      * Establish a connection to a socket address after a delay
 432      */
 433     static class ScheduledConnector implements Runnable {
 434         private final Socket socket;
 435         private final SocketAddress address;
 436         private final long delay;
 437 
 438         ScheduledConnector(Socket socket, SocketAddress address, long delay) {
 439             this.socket = socket;
 440             this.address = address;
 441             this.delay = delay;
 442         }
 443 
 444         @Override
 445         public void run() {
 446             try {
 447                 Thread.sleep(delay);
 448                 socket.connect(address);
 449             } catch (Exception e) { }
 450         }
 451 
 452         static void schedule(Socket socket, SocketAddress address, long delay) {
 453             new Thread(new ScheduledConnector(socket, address, delay)).start();
 454         }
 455     }
 456 
 457     /**
 458      * Cancel a fiber after a delay
 459      */
 460     static class ScheduledCanceller implements Runnable {
 461         private final Fiber fiber;
 462         private final long delay;
 463 
 464         ScheduledCanceller(Fiber fiber, long delay) {
 465             this.fiber = fiber;
 466             this.delay = delay;
 467         }
 468 
 469         @Override
 470         public void run() {
 471             try {
 472                 Thread.sleep(delay);
 473                 fiber.cancel();
 474             } catch (Exception e) { }
 475         }
 476 
 477         static void schedule(Fiber fiber, long delay) {
 478             new Thread(new ScheduledCanceller(fiber, delay)).start();
 479         }
 480     }
 481 }