1 /*
  2  * Copyright (c) 2018, 2022, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /**
 25  * @test
 26  * @summary Basic tests for virtual threads doing blocking I/O with NIO channels
 27  * @compile --enable-preview -source ${jdk.version} NioChannels.java
 28  * @run testng/othervm/timeout=300 --enable-preview NioChannels
 29  * @run testng/othervm/timeout=300 --enable-preview -Djdk.useDirectRegister NioChannels
 30  */
 31 
 32 /**
 33  * @test
 34  * @requires (os.family == "windows")
 35  * @compile --enable-preview -source ${jdk.version} NioChannels.java
 36  * @run testng/othervm/timeout=300 --enable-preview
 37  *     -Djdk.PollerProvider=sun.nio.ch.WSAPollPollerProvider NioChannels
 38  */
 39 
 40 import java.io.Closeable;
 41 import java.io.IOException;
 42 import java.net.DatagramPacket;
 43 import java.net.InetAddress;
 44 import java.net.InetSocketAddress;
 45 import java.net.Socket;
 46 import java.net.SocketAddress;
 47 import java.nio.ByteBuffer;
 48 import java.nio.channels.AsynchronousCloseException;
 49 import java.nio.channels.ClosedByInterruptException;
 50 import java.nio.channels.DatagramChannel;
 51 import java.nio.channels.Pipe;
 52 import java.nio.channels.ReadableByteChannel;
 53 import java.nio.channels.ServerSocketChannel;
 54 import java.nio.channels.SocketChannel;
 55 import java.nio.channels.WritableByteChannel;
 56 
 57 import org.testng.annotations.Test;
 58 import static org.testng.Assert.*;
 59 
 60 public class NioChannels {
 61     private static final long DELAY = 4000;
 62 
 63     /**
 64      * SocketChannel read/write, no blocking.
 65      */
 66     @Test
 67     public void testSocketChannelReadWrite1() throws Exception {
 68         TestHelper.runInVirtualThread(() -> {
 69             try (var connection = new Connection()) {
 70                 SocketChannel sc1 = connection.channel1();
 71                 SocketChannel sc2 = connection.channel2();
 72 
 73                 // write should not block
 74                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 75                 int n = sc1.write(bb);
 76                 assertTrue(n > 0);
 77 
 78                 // read should not block
 79                 bb = ByteBuffer.allocate(10);
 80                 n = sc2.read(bb);
 81                 assertTrue(n > 0);
 82                 assertTrue(bb.get(0) == 'X');
 83             }
 84         });
 85     }
 86 
 87     /**
 88      * Virtual thread blocks in SocketChannel read.
 89      */
 90     @Test
 91     public void testSocketChannelRead() throws Exception {
 92         TestHelper.runInVirtualThread(() -> {
 93             try (var connection = new Connection()) {
 94                 SocketChannel sc1 = connection.channel1();
 95                 SocketChannel sc2 = connection.channel2();
 96 
 97                 // schedule write
 98                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 99                 ScheduledWriter.schedule(sc1, bb, DELAY);
100 
101                 // read should block
102                 bb = ByteBuffer.allocate(10);
103                 int n = sc2.read(bb);
104                 assertTrue(n > 0);
105                 assertTrue(bb.get(0) == 'X');
106             }
107         });
108     }
109 
110     /**
111      * Virtual thread blocks in SocketChannel write.
112      */
113     @Test
114     public void testSocketChannelWrite() throws Exception {
115         TestHelper.runInVirtualThread(() -> {
116             try (var connection = new Connection()) {
117                 SocketChannel sc1 = connection.channel1();
118                 SocketChannel sc2 = connection.channel2();
119 
120                 // schedule thread to read to EOF
121                 ScheduledReader.schedule(sc2, true, DELAY);
122 
123                 // write should block
124                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
125                 for (int i=0; i<1000; i++) {
126                     int n = sc1.write(bb);
127                     assertTrue(n > 0);
128                     bb.clear();
129                 }
130             }
131         });
132     }
133 
134     /**
135      * SocketChannel close while virtual thread blocked in read.
136      */
137     @Test
138     public void testSocketChannelReadAsyncClose() throws Exception {
139         TestHelper.runInVirtualThread(() -> {
140             try (var connection = new Connection()) {
141                 SocketChannel sc = connection.channel1();
142                 ScheduledCloser.schedule(sc, DELAY);
143                 try {
144                     int n = sc.read(ByteBuffer.allocate(100));
145                     throw new RuntimeException("read returned " + n);
146                 } catch (AsynchronousCloseException expected) { }
147             }
148         });
149     }
150 
151     /**
152      * Virtual thread interrupted while blocked in SocketChannel read.
153      */
154     @Test
155     public void testSocketChannelReadInterrupt() throws Exception {
156         TestHelper.runInVirtualThread(() -> {
157             try (var connection = new Connection()) {
158                 SocketChannel sc = connection.channel1();
159                 ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
160                 try {
161                     int n = sc.read(ByteBuffer.allocate(100));
162                     throw new RuntimeException("read returned " + n);
163                 } catch (ClosedByInterruptException expected) {
164                     assertTrue(Thread.interrupted());
165                 }
166             }
167         });
168     }
169 
170     /**
171      * SocketChannel close while virtual thread blocked in write.
172      */
173     @Test
174     public void testSocketChannelWriteAsyncClose() throws Exception {
175         TestHelper.runInVirtualThread(() -> {
176             try (var connection = new Connection()) {
177                 SocketChannel sc = connection.channel1();
178                 ScheduledCloser.schedule(sc, DELAY);
179                 try {
180                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
181                     for (;;) {
182                         int n = sc.write(bb);
183                         assertTrue(n > 0);
184                         bb.clear();
185                     }
186                 } catch (AsynchronousCloseException expected) { }
187             }
188         });
189     }
190 
191     /**
192      * Virtual thread interrupted while blocked in SocketChannel write.
193      */
194     @Test
195     public void testSocketChannelWriteInterrupt() throws Exception {
196         TestHelper.runInVirtualThread(() -> {
197             try (var connection = new Connection()) {
198                 SocketChannel sc = connection.channel1();
199                 ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
200                 try {
201                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
202                     for (;;) {
203                         int n = sc.write(bb);
204                         assertTrue(n > 0);
205                         bb.clear();
206                     }
207                 } catch (ClosedByInterruptException expected) {
208                     assertTrue(Thread.interrupted());
209                 }
210             }
211         });
212     }
213 
214     /**
215      * Virtual thread blocks in SocketChannel adaptor read.
216      */
217     @Test
218     public void testSocketAdaptorRead1() throws Exception {
219         testSocketAdaptorRead(0);
220     }
221 
222     /**
223      * Virtual thread blocks in SocketChannel adaptor read with timeout.
224      */
225     @Test
226     public void testSocketAdaptorRead2() throws Exception {
227         testSocketAdaptorRead(60_000);
228     }
229 
230     private void testSocketAdaptorRead(int timeout) throws Exception {
231         TestHelper.runInVirtualThread(() -> {
232             try (var connection = new Connection()) {
233                 SocketChannel sc1 = connection.channel1();
234                 SocketChannel sc2 = connection.channel2();
235 
236                 // schedule write
237                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
238                 ScheduledWriter.schedule(sc1, bb, DELAY);
239 
240                 // read should block
241                 if (timeout > 0)
242                     sc2.socket().setSoTimeout(timeout);
243 
244                 byte[] array = new byte[100];
245                 int n = sc2.socket().getInputStream().read(array);
246                 assertTrue(n > 0);
247                 assertTrue(array[0] == 'X');
248             }
249         });
250     }
251     
252     /**
253      * ServerSocketChannel accept, no blocking.
254      */
255     @Test
256     public void testServerSocketChannelAccept1() throws Exception {
257         TestHelper.runInVirtualThread(() -> {
258             try (var ssc = ServerSocketChannel.open()) {
259                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
260                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
261                 // accept should not block
262                 var sc2 = ssc.accept();
263                 sc1.close();
264                 sc2.close();
265             }
266         });
267     }
268 
269     /**
270      * Virtual thread blocks in ServerSocketChannel accept.
271      */
272     @Test
273     public void testServerSocketChannelAccept2() throws Exception {
274         TestHelper.runInVirtualThread(() -> {
275             try (var ssc = ServerSocketChannel.open()) {
276                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
277                 var sc1 = SocketChannel.open();
278                 ScheduledConnector.schedule(sc1, ssc.getLocalAddress(), DELAY);
279                 // accept will block
280                 var sc2 = ssc.accept();
281                 sc1.close();
282                 sc2.close();
283             }
284         });
285     }
286 
287     /**
288      * SeverSocketChannel close while virtual thread blocked in accept.
289      */
290     @Test
291     public void testServerSocketChannelAcceptAsyncClose() throws Exception {
292         TestHelper.runInVirtualThread(() -> {
293             try (var ssc = ServerSocketChannel.open()) {
294                 InetAddress lh = InetAddress.getLoopbackAddress();
295                 ssc.bind(new InetSocketAddress(lh, 0));
296                 ScheduledCloser.schedule(ssc, DELAY);
297                 try {
298                     SocketChannel sc = ssc.accept();
299                     sc.close();
300                     throw new RuntimeException("connection accepted???");
301                 } catch (AsynchronousCloseException expected) { }
302             }
303         });
304     }
305 
306     /**
307      * Virtual thread interrupted while blocked in ServerSocketChannel accept.
308      */
309     @Test
310     public void testServerSocketChannelAcceptInterrupt() throws Exception {
311         TestHelper.runInVirtualThread(() -> {
312             try (var ssc = ServerSocketChannel.open()) {
313                 InetAddress lh = InetAddress.getLoopbackAddress();
314                 ssc.bind(new InetSocketAddress(lh, 0));
315                 ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
316                 try {
317                     SocketChannel sc = ssc.accept();
318                     sc.close();
319                     throw new RuntimeException("connection accepted???");
320                 } catch (ClosedByInterruptException expected) {
321                     assertTrue(Thread.interrupted());
322                 }
323             }
324         });
325     }
326 
327     /**
328      * Virtual thread blocks in ServerSocketChannel adaptor accept.
329      */
330     @Test
331     public void testSocketChannelAdaptorAccept1() throws Exception {
332         testSocketChannelAdaptorAccept(0);
333     }
334 
335     /**
336      * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
337      */
338     @Test
339     public void testSocketChannelAdaptorAccept2() throws Exception {
340         testSocketChannelAdaptorAccept(60_000);
341     }
342 
343     private void testSocketChannelAdaptorAccept(int timeout) throws Exception {
344         TestHelper.runInVirtualThread(() -> {
345             try (var ssc = ServerSocketChannel.open()) {
346                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
347                 var sc1 = SocketChannel.open();
348                 ScheduledConnector.schedule(sc1, ssc.getLocalAddress(), DELAY);
349 
350                 if (timeout > 0)
351                     ssc.socket().setSoTimeout(timeout);
352 
353                 // accept will block
354                 Socket s = ssc.socket().accept();
355                 sc1.close();
356                 s.close();
357             }
358         });
359     }
360 
361     /**
362      * DatagramChannel receive/send, no blocking.
363      */
364     @Test
365     public void testDatagramChannelSendReceive1() throws Exception {
366         TestHelper.runInVirtualThread(() -> {
367             try (DatagramChannel dc1 = DatagramChannel.open();
368                  DatagramChannel dc2 = DatagramChannel.open()) {
369 
370                 InetAddress lh = InetAddress.getLoopbackAddress();
371                 dc2.bind(new InetSocketAddress(lh, 0));
372 
373                 // send should not block
374                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
375                 int n = dc1.send(bb, dc2.getLocalAddress());
376                 assertTrue(n > 0);
377 
378                 // receive should not block
379                 bb = ByteBuffer.allocate(10);
380                 dc2.receive(bb);
381                 assertTrue(bb.get(0) == 'X');
382             }
383         });
384     }
385 
386     /**
387      * Virtual thread blocks in DatagramChannel receive.
388      */
389     @Test
390     public void testDatagramChannelSendReceive2() throws Exception {
391         TestHelper.runInVirtualThread(() -> {
392             try (DatagramChannel dc1 = DatagramChannel.open();
393                  DatagramChannel dc2 = DatagramChannel.open()) {
394 
395                 InetAddress lh = InetAddress.getLoopbackAddress();
396                 dc2.bind(new InetSocketAddress(lh, 0));
397 
398                 // schedule send
399                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
400                 ScheduledSender.schedule(dc1, bb, dc2.getLocalAddress(), DELAY);
401 
402                 // read should block
403                 bb = ByteBuffer.allocate(10);
404                 dc2.receive(bb);
405                 assertTrue(bb.get(0) == 'X');
406             }
407         });
408     }
409 
410     /**
411      * DatagramChannel close while virtual thread blocked in receive.
412      */
413     @Test
414     public void testDatagramChannelReceiveAsyncClose() throws Exception {
415         TestHelper.runInVirtualThread(() -> {
416             try (DatagramChannel dc = DatagramChannel.open()) {
417                 InetAddress lh = InetAddress.getLoopbackAddress();
418                 dc.bind(new InetSocketAddress(lh, 0));
419                 ScheduledCloser.schedule(dc, DELAY);
420                 try {
421                     dc.receive(ByteBuffer.allocate(100));
422                     throw new RuntimeException("receive returned");
423                 } catch (AsynchronousCloseException expected) { }
424             }
425         });
426     }
427 
428     /**
429      * Virtual thread interrupted while blocked in DatagramChannel receive.
430      */
431     @Test
432     public void testDatagramChannelReceiveInterrupt() throws Exception {
433         TestHelper.runInVirtualThread(() -> {
434             try (DatagramChannel dc = DatagramChannel.open()) {
435                 InetAddress lh = InetAddress.getLoopbackAddress();
436                 dc.bind(new InetSocketAddress(lh, 0));
437                 ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
438                 try {
439                     dc.receive(ByteBuffer.allocate(100));
440                     throw new RuntimeException("receive returned");
441                 } catch (ClosedByInterruptException expected) {
442                     assertTrue(Thread.interrupted());
443                 }
444             }
445         });
446     }
447 
448     /**
449      * Virtual thread blocks in DatagramSocket adaptor receive
450      */
451     @Test
452     public void testDatagramSocketAdaptorReceive1() throws Exception {
453         testDatagramSocketAdaptorReceive(0);
454     }
455 
456     /**
457      * Virtual thread blocks in DatagramSocket adaptor receive with timeout
458      */
459     @Test
460     public void testDatagramSocketAdaptorReceive2() throws Exception {
461         testDatagramSocketAdaptorReceive(60_1000);
462     }
463 
464     private void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
465         TestHelper.runInVirtualThread(() -> {
466             try (DatagramChannel dc1 = DatagramChannel.open();
467                  DatagramChannel dc2 = DatagramChannel.open()) {
468 
469                 InetAddress lh = InetAddress.getLoopbackAddress();
470                 dc2.bind(new InetSocketAddress(lh, 0));
471 
472                 // schedule send
473                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
474                 ScheduledSender.schedule(dc1, bb, dc2.getLocalAddress(), DELAY);
475 
476                 // receive should block
477                 byte[] array = new byte[100];
478                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
479                 if (timeout > 0)
480                     dc2.socket().setSoTimeout(timeout);
481                 dc2.socket().receive(p);
482                 assertTrue(p.getLength() == 3 && array[0] == 'X');
483             }
484         });
485     }
486 
487     /**
488      * Pipe read/write, no blocking.
489      */
490     @Test
491     public void testPipeReadWrite1() throws Exception {
492         TestHelper.runInVirtualThread(() -> {
493             Pipe p = Pipe.open();
494             try (Pipe.SinkChannel sink = p.sink();
495                  Pipe.SourceChannel source = p.source()) {
496 
497                 // write should not block
498                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
499                 int n = sink.write(bb);
500                 assertTrue(n > 0);
501 
502                 // read should not block
503                 bb = ByteBuffer.allocate(10);
504                 n = source.read(bb);
505                 assertTrue(n > 0);
506                 assertTrue(bb.get(0) == 'X');
507             }
508         });
509     }
510 
511     /**
512      * Virtual thread blocks in Pipe.SourceChannel read.
513      */
514     @Test
515     public void testPipeReadWrite2() throws Exception {
516         TestHelper.runInVirtualThread(() -> {
517             Pipe p = Pipe.open();
518             try (Pipe.SinkChannel sink = p.sink();
519                  Pipe.SourceChannel source = p.source()) {
520 
521                 // schedule write
522                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
523                 ScheduledWriter.schedule(sink, bb, DELAY);
524 
525                 // read should block
526                 bb = ByteBuffer.allocate(10);
527                 int n = source.read(bb);
528                 assertTrue(n > 0);
529                 assertTrue(bb.get(0) == 'X');
530             }
531         });
532     }
533 
534     /**
535      * Virtual thread blocks in Pipe.SinkChannel write.
536      */
537     @Test
538     public void testPipeReadWrite3() throws Exception {
539         TestHelper.runInVirtualThread(() -> {
540             Pipe p = Pipe.open();
541             try (Pipe.SinkChannel sink = p.sink();
542                  Pipe.SourceChannel source = p.source()) {
543 
544                 // schedule thread to read to EOF
545                 ScheduledReader.schedule(source, true, DELAY);
546 
547                 // write should block
548                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
549                 for (int i=0; i<1000; i++) {
550                     int n = sink.write(bb);
551                     assertTrue(n > 0);
552                     bb.clear();
553                 }
554             }
555         });
556     }
557 
558     /**
559      * Pipe.SourceChannel close while virtual thread blocked in read.
560      */
561     @Test
562     public void testPipeReadAsyncClose() throws Exception {
563         TestHelper.runInVirtualThread(() -> {
564             Pipe p = Pipe.open();
565             try (Pipe.SourceChannel source = p.source()) {
566                 ScheduledCloser.schedule(source, DELAY);
567                 try {
568                     int n = source.read(ByteBuffer.allocate(100));
569                     throw new RuntimeException("read returned " + n);
570                 } catch (AsynchronousCloseException expected) { }
571             }
572         });
573     }
574 
575     /**
576      * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
577      */
578     @Test
579     public void testPipeReadInterrupt() throws Exception {
580         TestHelper.runInVirtualThread(() -> {
581             Pipe p = Pipe.open();
582             try (Pipe.SourceChannel source = p.source()) {
583                 ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
584                 try {
585                     int n = source.read(ByteBuffer.allocate(100));
586                     throw new RuntimeException("read returned " + n);
587                 } catch (ClosedByInterruptException expected) {
588                     assertTrue(Thread.interrupted());
589                 }
590             }
591         });
592     }
593 
594     /**
595      * Pipe.SinkChannel close while virtual thread blocked in write.
596      */
597     @Test
598     public void testPipeWriteAsyncClose() throws Exception {
599         TestHelper.runInVirtualThread(() -> {
600             Pipe p = Pipe.open();
601             try (Pipe.SinkChannel sink = p.sink()) {
602                 ScheduledCloser.schedule(sink, DELAY);
603                 try {
604                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
605                     for (;;) {
606                         int n = sink.write(bb);
607                         assertTrue(n > 0);
608                         bb.clear();
609                     }
610                 } catch (AsynchronousCloseException expected) { }
611             }
612         });
613     }
614 
615     /**
616      * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
617      */
618     @Test
619     public void testPipeWriteInterrupt() throws Exception {
620         TestHelper.runInVirtualThread(() -> {
621             Pipe p = Pipe.open();
622             try (Pipe.SinkChannel sink = p.sink()) {
623                 ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
624                 try {
625                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
626                     for (;;) {
627                         int n = sink.write(bb);
628                         assertTrue(n > 0);
629                         bb.clear();
630                     }
631                 } catch (ClosedByInterruptException expected) {
632                     assertTrue(Thread.interrupted());
633                 }
634             }
635         });
636     }
637 
638     // -- supporting classes --
639     
640     /**
641      * Creates a loopback connection
642      */
643     static class Connection implements Closeable {
644         private final ServerSocketChannel ssc;
645         private final SocketChannel sc1;
646         private final SocketChannel sc2;
647         Connection() throws IOException {
648             var lh = InetAddress.getLoopbackAddress();
649             this.ssc = ServerSocketChannel.open().bind(new InetSocketAddress(lh, 0));
650             this.sc1 = SocketChannel.open(ssc.getLocalAddress());
651             this.sc2 = ssc.accept();
652         }
653         SocketChannel channel1() {
654             return sc1;
655         }
656         SocketChannel channel2() {
657             return sc2;
658         }
659         @Override
660         public void close() throws IOException {
661             if (ssc != null) ssc.close();
662             if (sc1 != null) sc1.close();
663             if (sc2 != null) sc2.close();
664         }
665     }
666 
667     /**
668      * Closes a channel after a delay
669      */
670     static class ScheduledCloser implements Runnable {
671         private final Closeable c;
672         private final long delay;
673         ScheduledCloser(Closeable c, long delay) {
674             this.c = c;
675             this.delay = delay;
676         }
677         @Override
678         public void run() {
679             try {
680                 Thread.sleep(delay);
681                 c.close();
682             } catch (Exception e) { }
683         }
684         static void schedule(Closeable c, long delay) {
685             new Thread(new ScheduledCloser(c, delay)).start();
686         }
687     }
688 
689     /**
690      * Interrupts a thread after a delay
691      */
692     static class ScheduledInterrupter implements Runnable {
693         private final Thread thread;
694         private final long delay;
695 
696         ScheduledInterrupter(Thread thread, long delay) {
697             this.thread = thread;
698             this.delay = delay;
699         }
700 
701         @Override
702         public void run() {
703             try {
704                 Thread.sleep(delay);
705                 thread.interrupt();
706             } catch (Exception e) { }
707         }
708 
709         static void schedule(Thread thread, long delay) {
710             new Thread(new ScheduledInterrupter(thread, delay)).start();
711         }
712     }
713 
714     /**
715      * Establish a connection to a socket address after a delay
716      */
717     static class ScheduledConnector implements Runnable {
718         private final SocketChannel sc;
719         private final SocketAddress address;
720         private final long delay;
721 
722         ScheduledConnector(SocketChannel sc, SocketAddress address, long delay) {
723             this.sc = sc;
724             this.address = address;
725             this.delay = delay;
726         }
727 
728         @Override
729         public void run() {
730             try {
731                 Thread.sleep(delay);
732                 sc.connect(address);
733             } catch (Exception e) { }
734         }
735 
736         static void schedule(SocketChannel sc, SocketAddress address, long delay) {
737             new Thread(new ScheduledConnector(sc, address, delay)).start();
738         }
739     }
740 
741     /**
742      * Reads from a connection, and to EOF, after a delay
743      */
744     static class ScheduledReader implements Runnable {
745         private final ReadableByteChannel rbc;
746         private final boolean readAll;
747         private final long delay;
748 
749         ScheduledReader(ReadableByteChannel rbc, boolean readAll, long delay) {
750             this.rbc = rbc;
751             this.readAll = readAll;
752             this.delay = delay;
753         }
754 
755         @Override
756         public void run() {
757             try {
758                 Thread.sleep(delay);
759                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
760                 for (;;) {
761                     int n = rbc.read(bb);
762                     if (n == -1 || !readAll)
763                         break;
764                     bb.clear();
765                 }
766             } catch (Exception e) { }
767         }
768 
769         static void schedule(ReadableByteChannel rbc, boolean readAll, long delay) {
770             new Thread(new ScheduledReader(rbc, readAll, delay)).start();
771         }
772     }
773 
774     /**
775      * Writes to a connection after a delay
776      */
777     static class ScheduledWriter implements Runnable {
778         private final WritableByteChannel wbc;
779         private final ByteBuffer buf;
780         private final long delay;
781 
782         ScheduledWriter(WritableByteChannel wbc, ByteBuffer buf, long delay) {
783             this.wbc = wbc;
784             this.buf = buf;
785             this.delay = delay;
786         }
787 
788         @Override
789         public void run() {
790             try {
791                 Thread.sleep(delay);
792                 wbc.write(buf);
793             } catch (Exception e) { }
794         }
795 
796         static void schedule(WritableByteChannel wbc, ByteBuffer buf, long delay) {
797             new Thread(new ScheduledWriter(wbc, buf, delay)).start();
798         }
799     }
800 
801     /**
802      * Sends a datagram to a target address after a delay
803      */
804     static class ScheduledSender implements Runnable {
805         private final DatagramChannel dc;
806         private final ByteBuffer buf;
807         private final SocketAddress address;
808         private final long delay;
809 
810         ScheduledSender(DatagramChannel dc, ByteBuffer buf, SocketAddress address, long delay) {
811             this.dc = dc;
812             this.buf = buf;
813             this.address = address;
814             this.delay = delay;
815         }
816 
817         @Override
818         public void run() {
819             try {
820                 Thread.sleep(delay);
821                 dc.send(buf, address);
822             } catch (Exception e) { }
823         }
824 
825         static void schedule(DatagramChannel dc, ByteBuffer buf,
826                              SocketAddress address, long delay) {
827             new Thread(new ScheduledSender(dc, buf, address, delay)).start();
828         }
829     }
830 
831 }