1 /*
  2  * Copyright (c) 2018, 2022, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /**
 25  * @test
 26  * @summary Basic tests for virtual threads using java.net sockets.
 27  * @compile --enable-preview -source ${jdk.version} NetSockets.java
 28  * @run testng/othervm/timeout=300 --enable-preview NetSockets
 29  * @run testng/othervm/timeout=300 --enable-preview -Djdk.useDirectRegister NetSockets
 30  */
 31 
 32 /**
 33  * @test
 34  * @requires (os.family == "windows")
 35  * @compile --enable-preview -source ${jdk.version} NetSockets.java
 36  * @run testng/othervm/timeout=300 --enable-preview
 37  *     -Djdk.PollerProvider=sun.nio.ch.WSAPollPollerProvider NetSockets
 38  */
 39 
 40 import java.io.Closeable;
 41 import java.io.IOException;
 42 import java.io.InputStream;
 43 import java.io.OutputStream;
 44 import java.net.DatagramPacket;
 45 import java.net.DatagramSocket;
 46 import java.net.InetAddress;
 47 import java.net.InetSocketAddress;
 48 import java.net.ServerSocket;
 49 import java.net.Socket;
 50 import java.net.SocketAddress;
 51 import java.net.SocketException;
 52 import java.net.SocketTimeoutException;
 53 
 54 import org.testng.annotations.Test;
 55 import static org.testng.Assert.*;
 56 
 57 public class NetSockets {
 58 
 59     private static final long DELAY = 2000;
 60     
 61     /**
 62      * Socket read/write, no blocking.
 63      */
 64     @Test
 65     public void testSocketReadWrite1() throws Exception {
 66         TestHelper.runInVirtualThread(() -> {
 67             try (var connection = new Connection()) {
 68                 Socket s1 = connection.socket1();
 69                 Socket s2 = connection.socket2();
 70 
 71                 // write should not block
 72                 byte[] ba = "XXX".getBytes("UTF-8");
 73                 s1.getOutputStream().write(ba);
 74 
 75                 // read should not block
 76                 ba = new byte[10];
 77                 int n = s2.getInputStream().read(ba);
 78                 assertTrue(n > 0);
 79                 assertTrue(ba[0] == 'X');
 80             }
 81         });
 82     }
 83 
 84     /**
 85      * Virtual thread blocks in read.
 86      */
 87     @Test
 88     public void testSocketRead1() throws Exception {
 89         testSocketRead(0);
 90     }
 91 
 92     /**
 93      * Virtual thread blocks in timed read.
 94      */
 95     @Test
 96     public void testSocketRead2() throws Exception {
 97         testSocketRead(60_000);
 98     }
 99 
100     void testSocketRead(int timeout) throws Exception {
101         TestHelper.runInVirtualThread(() -> {
102             try (var connection = new Connection()) {
103                 Socket s1 = connection.socket1();
104                 Socket s2 = connection.socket2();
105 
106                 // schedule write
107                 byte[] ba = "XXX".getBytes("UTF-8");
108                 ScheduledWriter.schedule(s1, ba, DELAY);
109 
110                 // read should block
111                 if (timeout > 0) {
112                     assert timeout > DELAY;
113                     s2.setSoTimeout(timeout);
114                 }
115                 ba = new byte[10];
116                 int n = s2.getInputStream().read(ba);
117                 assertTrue(n > 0);
118                 assertTrue(ba[0] == 'X');
119             }
120         });
121     }
122 
123     /**
124      * Virtual thread blocks in write.
125      */
126     @Test
127     public void testSocketWrite1() throws Exception {
128         TestHelper.runInVirtualThread(() -> {
129             try (var connection = new Connection()) {
130                 Socket s1 = connection.socket1();
131                 Socket s2 = connection.socket2();
132 
133                 // schedule thread to read to EOF
134                 ScheduledReader.schedule(s2, true, DELAY);
135 
136                 // write should block
137                 byte[] ba = new byte[100*1024];
138                 OutputStream out = s1.getOutputStream();
139                 for (int i=0; i<1000; i++) {
140                     out.write(ba);
141                 }
142             }
143         });
144     }
145 
146     /**
147      * Virtual thread blocks in read, peer closes connection.
148      */
149     @Test
150     public void testSocketReadPeerClose1() throws Exception {
151         TestHelper.runInVirtualThread(() -> {
152             try (var connection = new Connection()) {
153                 Socket s1 = connection.socket1();
154                 Socket s2 = connection.socket2();
155 
156                 ScheduledCloser.schedule(s2, DELAY);
157 
158                 int n = s1.getInputStream().read();
159                 assertTrue(n == -1);
160             }
161         });
162     }
163 
164     /**
165      * Virtual thread blocks in read, peer closes connection abruptly.
166      */
167     @Test
168     public void testSocketReadPeerClose2() throws Exception {
169         TestHelper.runInVirtualThread(() -> {
170             try (var connection = new Connection()) {
171                 Socket s1 = connection.socket1();
172                 Socket s2 = connection.socket2();
173 
174                 s2.setSoLinger(true, 0);
175                 ScheduledCloser.schedule(s2, DELAY);
176 
177                 try {
178                     s1.getInputStream().read();
179                     assertTrue(false);
180                 } catch (IOException ioe) {
181                     // expected
182                 }
183             }
184         });
185     }
186 
187     /**
188      * Socket close while virtual thread blocked in read.
189      */
190     @Test
191     public void testSocketReadAsyncClose1() throws Exception {
192         testSocketReadAsyncClose(0);
193     }
194 
195     /**
196      * Socket close while virtual thread blocked in timed read.
197      */
198     @Test
199     public void testSocketReadAsyncClose2() throws Exception {
200         testSocketReadAsyncClose(0);
201     }
202 
203     void testSocketReadAsyncClose(int timeout) throws Exception {
204         TestHelper.runInVirtualThread(() -> {
205             try (var connection = new Connection()) {
206                 Socket s = connection.socket1();
207                 ScheduledCloser.schedule(s, DELAY);
208                 try {
209                     if (timeout > 0) {
210                         assert timeout > DELAY;
211                         s.setSoTimeout(timeout);
212                     }
213                     int n = s.getInputStream().read();
214                     throw new RuntimeException("read returned " + n);
215                 } catch (SocketException expected) { }
216             }
217         });
218     }
219 
220     /**
221      * Virtual thread interrupted while blocked in Socket read.
222      */
223     @Test
224     public void testSocketReadInterrupt1() throws Exception {
225         testSocketReadInterrupt(0);
226     }
227 
228     /**
229      * Virtual thread interrupted while blocked in Socket read with timeout
230      */
231     @Test
232     public void testSocketReadInterrupt2() throws Exception {
233         testSocketReadInterrupt(60_000);
234     }
235 
236     void testSocketReadInterrupt(int timeout) throws Exception {
237         TestHelper.runInVirtualThread(() -> {
238             try (var connection = new Connection()) {
239                 Socket s = connection.socket1();
240                 ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
241                 try {
242                     if (timeout > 0) {
243                         assert timeout > DELAY;
244                         s.setSoTimeout(timeout);
245                     }
246                     int n = s.getInputStream().read();
247                     throw new RuntimeException("read returned " + n);
248                 } catch (SocketException expected) {
249                     assertTrue(Thread.interrupted());
250                     assertTrue(s.isClosed());
251                 }
252             }
253         });
254     }
255 
256     /**
257      * Socket close while virtual thread blocked in write.
258      */
259     @Test
260     public void testSocketWriteAsyncClose() throws Exception {
261         TestHelper.runInVirtualThread(() -> {
262             try (var connection = new Connection()) {
263                 Socket s = connection.socket1();
264                 ScheduledCloser.schedule(s, DELAY);
265                 try {
266                     byte[] ba = new byte[100*1024];
267                     OutputStream out = s.getOutputStream();
268                     for (;;) {
269                         out.write(ba);
270                     }
271                 } catch (SocketException expected) { }
272             }
273         });
274     }
275 
276     /**
277      * Virtual thread interrupted while blocked in Socket write
278      */
279     @Test
280     public void testSocketWriteInterrupt() throws Exception {
281         TestHelper.runInVirtualThread(() -> {
282             try (var connection = new Connection()) {
283                 Socket s = connection.socket1();
284                 ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
285                 try {
286                     byte[] ba = new byte[100*1024];
287                     OutputStream out = s.getOutputStream();
288                     for (;;) {
289                         out.write(ba);
290                     }
291                 } catch (SocketException expected) {
292                     assertTrue(Thread.interrupted());
293                     assertTrue(s.isClosed());
294                 }
295             }
296         });
297     }
298 
299     /**
300      * Virtual thread reading urgent data when SO_OOBINLINE is enabled.
301      */
302     @Test
303     public void testSocketReadUrgentData() throws Exception {
304         TestHelper.runInVirtualThread(() -> {
305             try (var connection = new Connection()) {
306                 Socket s1 = connection.socket1();
307                 Socket s2 = connection.socket2();
308 
309                 // urgent data should be received
310                 ScheduledUrgentData.scheduleUrgentData(s2, 'X', DELAY);
311                 s1.setOOBInline(true);
312                 byte[] ba = new byte[10];
313                 int n = s1.getInputStream().read(ba);
314                 assertTrue(n == 1);
315                 assertTrue(ba[0] == 'X');
316 
317                 // urgent data should not be received
318                 s1.setOOBInline(false);
319                 s1.setSoTimeout(500);
320                 s2.sendUrgentData('X');
321                 try {
322                     s1.getInputStream().read(ba);
323                     assertTrue(false);
324                 } catch (SocketTimeoutException expected) { }
325             }
326         });
327     }
328 
329     /**
330      * ServerSocket accept, no blocking.
331      */
332     @Test
333     public void testServerSocketAccept1() throws Exception {
334         TestHelper.runInVirtualThread(() -> {
335             try (var listener = new ServerSocket(0)) {
336                 var socket1 = new Socket(listener.getInetAddress(), listener.getLocalPort());
337                 // accept should not block
338                 var socket2 = listener.accept();
339                 socket1.close();
340                 socket2.close();
341             }
342         });
343     }
344 
345     /**
346      * Virtual thread blocks in accept.
347      */
348     @Test
349     public void testServerSocketAccept2() throws Exception {
350         testServerSocketAccept(0);
351     }
352 
353     /**
354      * Virtual thread blocks in timed accept.
355      */
356     @Test
357     public void testServerSocketAccept3() throws Exception {
358         testServerSocketAccept(60_000);
359     }
360 
361     void testServerSocketAccept(int timeout) throws Exception {
362         TestHelper.runInVirtualThread(() -> {
363             try (var listener = new ServerSocket(0)) {
364                 var socket1 = new Socket();
365                 ScheduledConnector.schedule(socket1, listener.getLocalSocketAddress(), DELAY);
366                 // accept will block
367                 if (timeout > 0) {
368                     assert timeout > DELAY;
369                     listener.setSoTimeout(timeout);
370                 }
371                 var socket2 = listener.accept();
372                 socket1.close();
373                 socket2.close();
374             }
375         });
376     }
377 
378     /**
379      * ServerSocket close while virtual thread blocked in accept.
380      */
381     @Test
382     public void testServerSocketAcceptAsyncClose1() throws Exception {
383         testServerSocketAcceptAsyncClose(0);
384     }
385 
386     /**
387      * ServerSocket close while virtual thread blocked in timed accept.
388      */
389     @Test
390     public void testServerSocketAcceptAsyncClose2() throws Exception {
391         testServerSocketAcceptAsyncClose(60_000);
392     }
393 
394     void testServerSocketAcceptAsyncClose(int timeout) throws Exception {
395         TestHelper.runInVirtualThread(() -> {
396             try (var listener = new ServerSocket(0)) {
397                 ScheduledCloser.schedule(listener, DELAY);
398                 if (timeout > 0) {
399                     assert timeout > DELAY;
400                     listener.setSoTimeout(timeout);
401                 }
402                 try {
403                     listener.accept().close();
404                     throw new RuntimeException("connection accepted???");
405                 } catch (SocketException expected) { }
406             }
407         });
408     }
409 
410     /**
411      * Virtual thread interrupted while blocked in ServerSocket accept
412      */
413     @Test
414     public void testServerSocketAcceptInterrupt1() throws Exception {
415         testServerSocketAcceptInterrupt(0);
416     }
417 
418     /**
419      * Virtual thread interrupted while blocked in ServerSocket accept with timeout
420      */
421     @Test
422     public void testServerSocketAcceptInterrupt2() throws Exception {
423         testServerSocketAcceptInterrupt(60_000);
424     }
425 
426     void testServerSocketAcceptInterrupt(int timeout) throws Exception {
427         TestHelper.runInVirtualThread(() -> {
428             try (var listener = new ServerSocket(0)) {
429                 ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
430                 if (timeout > 0) {
431                     assert timeout > DELAY;
432                     listener.setSoTimeout(timeout);
433                 }
434                 try {
435                     listener.accept().close();
436                     throw new RuntimeException("connection accepted???");
437                 } catch (SocketException expected) {
438                     assertTrue(Thread.interrupted());
439                     assertTrue(listener.isClosed());
440                 }
441             }
442         });
443     }
444 
445     /**
446      * DatagramSocket receive/send, no blocking.
447      */
448     @Test
449     public void testDatagramSocketSendReceive1() throws Exception {
450         TestHelper.runInVirtualThread(() -> {
451             try (DatagramSocket s1 = new DatagramSocket(null);
452                  DatagramSocket s2 = new DatagramSocket(null)) {
453 
454                 InetAddress lh = InetAddress.getLoopbackAddress();
455                 s1.bind(new InetSocketAddress(lh, 0));
456                 s2.bind(new InetSocketAddress(lh, 0));
457 
458                 // send should not block
459                 byte[] bytes = "XXX".getBytes("UTF-8");
460                 DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
461                 p1.setSocketAddress(s2.getLocalSocketAddress());
462                 s1.send(p1);
463 
464                 // receive should not block
465                 byte[] ba = new byte[100];
466                 DatagramPacket p2 = new DatagramPacket(ba, ba.length);
467                 s2.receive(p2);
468                 assertEquals(p2.getSocketAddress(), s1.getLocalSocketAddress());
469                 assertTrue(ba[0] == 'X');
470             }
471         });
472     }
473 
474     /**
475      * Virtual thread blocks in DatagramSocket receive
476      */
477     @Test
478     public void testDatagramSocketSendReceive2() throws Exception {
479         testDatagramSocketSendReceive(0);
480     }
481 
482     /**
483      * Virtual thread blocks in DatagramSocket receive with timeout
484      */
485     @Test
486     public void testDatagramSocketSendReceive3() throws Exception {
487         testDatagramSocketSendReceive(60_000);
488     }
489 
490     private void testDatagramSocketSendReceive(int timeout) throws Exception {
491         TestHelper.runInVirtualThread(() -> {
492             try (DatagramSocket s1 = new DatagramSocket(null);
493                  DatagramSocket s2 = new DatagramSocket(null)) {
494 
495                 InetAddress lh = InetAddress.getLoopbackAddress();
496                 s1.bind(new InetSocketAddress(lh, 0));
497                 s2.bind(new InetSocketAddress(lh, 0));
498 
499                 // schedule send
500                 byte[] bytes = "XXX".getBytes("UTF-8");
501                 DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
502                 p1.setSocketAddress(s2.getLocalSocketAddress());
503                 ScheduledSender.schedule(s1, p1, DELAY);
504 
505                 // receive should block
506                 if (timeout > 0) {
507                     assert timeout > DELAY;
508                     s2.setSoTimeout(timeout);
509                 }
510                 byte[] ba = new byte[100];
511                 DatagramPacket p2 = new DatagramPacket(ba, ba.length);
512                 s2.receive(p2);
513                 assertEquals(p2.getSocketAddress(), s1.getLocalSocketAddress());
514                 assertTrue(ba[0] == 'X');
515             }
516         });
517     }
518 
519     /**
520      * Virtual thread blocks in DatagramSocket receive that times out
521      */
522     @Test
523     public void testDatagramSocketReceiveTimeout() throws Exception {
524         TestHelper.runInVirtualThread(() -> {
525             try (DatagramSocket s = new DatagramSocket(null)) {
526                 InetAddress lh = InetAddress.getLoopbackAddress();
527                 s.bind(new InetSocketAddress(lh, 0));
528                 s.setSoTimeout(2000);
529                 byte[] ba = new byte[100];
530                 DatagramPacket p = new DatagramPacket(ba, ba.length);
531                 try {
532                     s.receive(p);
533                     assertTrue(false);
534                 } catch (SocketTimeoutException expected) { }
535             }
536         });
537     }
538 
539     /**
540      * DatagramSocket close while virtual thread blocked in receive.
541      */
542     @Test
543     public void testDatagramSocketReceiveAsyncClose1() throws Exception {
544         testDatagramSocketReceiveAsyncClose(0);
545     }
546 
547     /**
548      * DatagramSocket close while virtual thread blocked with timeout.
549      */
550     @Test
551     public void testDatagramSocketReceiveAsyncClose2() throws Exception {
552         testDatagramSocketReceiveAsyncClose(60_000);
553     }
554 
555     private void testDatagramSocketReceiveAsyncClose(int timeout) throws Exception {
556         TestHelper.runInVirtualThread(() -> {
557             try (DatagramSocket s = new DatagramSocket(null)) {
558                 InetAddress lh = InetAddress.getLoopbackAddress();
559                 s.bind(new InetSocketAddress(lh, 0));
560 
561                 // schedule close
562                 ScheduledCloser.schedule(s, DELAY);
563 
564                 // receive
565                 if (timeout > 0) {
566                     assert timeout > DELAY;
567                     s.setSoTimeout(timeout);
568                 }
569                 try {
570                     byte[] ba = new byte[100];
571                     DatagramPacket p = new DatagramPacket(ba, ba.length);
572                     s.receive(p);
573                     assertTrue(false);
574                 } catch (SocketException expected) { }
575             }
576         });
577     }
578 
579     /**
580      * Virtual thread interrupted while blocked in DatagramSocket receive.
581      */
582     @Test
583     public void testDatagramSocketReceiveInterrupt1() throws Exception {
584         testDatagramSocketReceiveInterrupt(0);
585     }
586 
587     /**
588      * Virtual thread interrupted while blocked in DatagramSocket receive with timeout
589      */
590     @Test
591     public void testDatagramSocketReceiveInterrupt2() throws Exception {
592         testDatagramSocketReceiveInterrupt(60_000);
593     }
594 
595     private void testDatagramSocketReceiveInterrupt(int timeout) throws Exception {
596         TestHelper.runInVirtualThread(() -> {
597             try (DatagramSocket s = new DatagramSocket(null)) {
598                 InetAddress lh = InetAddress.getLoopbackAddress();
599                 s.bind(new InetSocketAddress(lh, 0));
600                 if (timeout > 0) {
601                     assert timeout > DELAY;
602                     s.setSoTimeout(timeout);
603                 }
604 
605                 // schedule interrupt
606                 ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
607 
608                 // receive
609                 try {
610                     byte[] ba = new byte[100];
611                     DatagramPacket p = new DatagramPacket(ba, ba.length);
612                     s.receive(p);
613                     assertTrue(false);
614                 } catch (SocketException expected) {
615                     assertTrue(Thread.interrupted());
616                     assertTrue(s.isClosed());
617                 }
618             }
619         });
620     }
621 
622     // -- supporting classes --
623 
624     /**
625      * Creates a loopback connection
626      */
627     static class Connection implements Closeable {
628         private final ServerSocket ss;
629         private final Socket s1;
630         private final Socket s2;
631         Connection() throws IOException {
632             ServerSocket ss = new ServerSocket();
633             var lh = InetAddress.getLoopbackAddress();
634             ss.bind(new InetSocketAddress(lh, 0));
635             Socket s = new Socket();
636             s.connect(ss.getLocalSocketAddress());
637 
638             this.ss = ss;
639             this.s1 = s;
640             this.s2 = ss.accept();
641         }
642         Socket socket1() {
643             return s1;
644         }
645         Socket socket2() {
646             return s2;
647         }
648         @Override
649         public void close() throws IOException {
650             if (ss != null) ss.close();
651             if (s1 != null) s1.close();
652             if (s2 != null) s2.close();
653         }
654     }
655 
656     /**
657      * Closes a socket after a delay
658      */
659     static class ScheduledCloser implements Runnable {
660         private final Closeable c;
661         private final long delay;
662         ScheduledCloser(Closeable c, long delay) {
663             this.c = c;
664             this.delay = delay;
665         }
666         @Override
667         public void run() {
668             try {
669                 Thread.sleep(delay);
670                 c.close();
671             } catch (Exception e) { }
672         }
673         static void schedule(Closeable c, long delay) {
674             new Thread(new ScheduledCloser(c, delay)).start();
675         }
676     }
677 
678     /**
679      * Interrupts a thread after a delay
680      */
681     static class ScheduledInterrupter implements Runnable {
682         private final Thread thread;
683         private final long delay;
684 
685         ScheduledInterrupter(Thread thread, long delay) {
686             this.thread = thread;
687             this.delay = delay;
688         }
689 
690         @Override
691         public void run() {
692             try {
693                 Thread.sleep(delay);
694                 thread.interrupt();
695             } catch (Exception e) { }
696         }
697 
698         static void schedule(Thread thread, long delay) {
699             new Thread(new ScheduledInterrupter(thread, delay)).start();
700         }
701     }
702 
703     /**
704      * Reads from a socket, and to EOF, after a delay
705      */
706     static class ScheduledReader implements Runnable {
707         private final Socket s;
708         private final boolean readAll;
709         private final long delay;
710 
711         ScheduledReader(Socket s, boolean readAll, long delay) {
712             this.s = s;
713             this.readAll = readAll;
714             this.delay = delay;
715         }
716 
717         @Override
718         public void run() {
719             try {
720                 Thread.sleep(delay);
721                 byte[] ba = new byte[8192];
722                 InputStream in = s.getInputStream();
723                 for (;;) {
724                     int n = in.read(ba);
725                     if (n == -1 || !readAll)
726                         break;
727                 }
728             } catch (Exception e) { }
729         }
730 
731         static void schedule(Socket s, boolean readAll, long delay) {
732             new Thread(new ScheduledReader(s, readAll, delay)).start();
733         }
734     }
735 
736     /**
737      * Writes to a socket after a delay
738      */
739     static class ScheduledWriter implements Runnable {
740         private final Socket s;
741         private final byte[] ba;
742         private final long delay;
743 
744         ScheduledWriter(Socket s, byte[] ba, long delay) {
745             this.s = s;
746             this.ba = ba.clone();
747             this.delay = delay;
748         }
749 
750         @Override
751         public void run() {
752             try {
753                 Thread.sleep(delay);
754                 s.getOutputStream().write(ba);
755             } catch (Exception e) { }
756         }
757 
758         static void schedule(Socket s, byte[] ba, long delay) {
759             new Thread(new ScheduledWriter(s, ba, delay)).start();
760         }
761     }
762 
763     /**
764      * Establish a connection to a socket address after a delay
765      */
766     static class ScheduledConnector implements Runnable {
767         private final Socket socket;
768         private final SocketAddress address;
769         private final long delay;
770 
771         ScheduledConnector(Socket socket, SocketAddress address, long delay) {
772             this.socket = socket;
773             this.address = address;
774             this.delay = delay;
775         }
776 
777         @Override
778         public void run() {
779             try {
780                 Thread.sleep(delay);
781                 socket.connect(address);
782             } catch (Exception e) { }
783         }
784 
785         static void schedule(Socket socket, SocketAddress address, long delay) {
786             new Thread(new ScheduledConnector(socket, address, delay)).start();
787         }
788     }
789 
790     /**
791      * Sends a datagram to a target address after a delay
792      */
793     static class ScheduledSender implements Runnable {
794         private final DatagramSocket socket;
795         private final DatagramPacket packet;
796         private final long delay;
797 
798         ScheduledSender(DatagramSocket socket, DatagramPacket packet, long delay) {
799             this.socket = socket;
800             this.packet = packet;
801             this.delay = delay;
802         }
803 
804         @Override
805         public void run() {
806             try {
807                 Thread.sleep(delay);
808                 socket.send(packet);
809             } catch (Exception e) { }
810         }
811 
812         static void schedule(DatagramSocket socket, DatagramPacket packet, long delay) {
813             new Thread(new ScheduledSender(socket, packet, delay)).start();
814         }
815     }
816 
817     /**
818      * Sends urgent data after a delay
819      */
820     static class ScheduledUrgentData implements Runnable {
821         private final Socket s;
822         private final int data;
823         private final long delay;
824 
825         ScheduledUrgentData(Socket s, int data, long delay) {
826             this.s = s;
827             this.data = data;
828             this.delay = delay;
829         }
830 
831         @Override
832         public void run() {
833             try {
834                 Thread.sleep(delay);
835                 s.sendUrgentData(data);
836             } catch (Exception e) { }
837         }
838 
839         static void scheduleUrgentData(Socket s, int data, long delay) {
840             new Thread(new ScheduledUrgentData(s, data, delay)).start();
841         }
842     }
843 }