1 /*
   2  * Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /**
  25  * @test
  26  * @run testng NioChannels
  27  * @summary Basic tests for Fibers doing blocking I/O with NIO channels
  28  */
  29 
  30 import java.io.Closeable;
  31 import java.io.IOException;
  32 import java.net.InetAddress;
  33 import java.net.InetSocketAddress;
  34 import java.net.SocketAddress;
  35 import java.nio.ByteBuffer;
  36 import java.nio.channels.AsynchronousCloseException;
  37 import java.nio.channels.ClosedByInterruptException;
  38 import java.nio.channels.DatagramChannel;
  39 import java.nio.channels.Pipe;
  40 import java.nio.channels.ReadableByteChannel;
  41 import java.nio.channels.ServerSocketChannel;
  42 import java.nio.channels.SocketChannel;
  43 import java.nio.channels.WritableByteChannel;
  44 
  45 import org.testng.annotations.Test;
  46 import org.testng.annotations.DataProvider;
  47 import static org.testng.Assert.*;
  48 
  49 @Test
  50 public class NioChannels {
  51 
  52     private static final long DELAY = 2000;
  53 
  54     private interface TestCase {
  55         void run() throws IOException;
  56     }
  57 
  58     private void test(TestCase test) throws Exception {
  59         try (var scope = FiberScope.open()) {
  60             scope.schedule(() -> {
  61                 test.run();
  62                 return null;
  63             }).join();
  64         }
  65     }
  66 
  67     /**
  68      * SocketChannel read/write, no blocking.
  69      */
  70     public void testSocketChannelReadWrite1() throws Exception {
  71         test(() -> {
  72             try (var connection = new Connection()) {
  73                 SocketChannel sc1 = connection.channel1();
  74                 SocketChannel sc2 = connection.channel2();
  75 
  76                 // write should not block
  77                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
  78                 int n = sc1.write(bb);
  79                 assertTrue(n > 0);
  80 
  81                 // read should not block
  82                 bb = ByteBuffer.allocate(10);
  83                 n = sc2.read(bb);
  84                 assertTrue(n > 0);
  85                 assertTrue(bb.get(0) == 'X');
  86             }
  87         });
  88     }
  89 
  90     /**
  91      * Fiber blocks in SocketChannel read.
  92      */
  93     public void testSocketChannelReadWrite2() throws Exception {
  94         test(() -> {
  95             try (var connection = new Connection()) {
  96                 SocketChannel sc1 = connection.channel1();
  97                 SocketChannel sc2 = connection.channel2();
  98 
  99                 // schedule write
 100                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 101                 ScheduledWriter.schedule(sc1, bb, DELAY);
 102 
 103                 // read should block
 104                 bb = ByteBuffer.allocate(10);
 105                 int n = sc2.read(bb);
 106                 assertTrue(n > 0);
 107                 assertTrue(bb.get(0) == 'X');
 108             }
 109         });
 110     }
 111 
 112     /**
 113      * Fiber blocks in SocketChannel write.
 114      */
 115     public void testSocketChannelReadWrite3() throws Exception {
 116         test(() -> {
 117             try (var connection = new Connection()) {
 118                 SocketChannel sc1 = connection.channel1();
 119                 SocketChannel sc2 = connection.channel2();
 120 
 121                 // schedule thread to read to EOF
 122                 ScheduledReader.schedule(sc2, true, DELAY);
 123 
 124                 // write should block
 125                 ByteBuffer bb = ByteBuffer.allocate(100*10024);
 126                 for (int i=0; i<1000; i++) {
 127                     int n = sc1.write(bb);
 128                     assertTrue(n > 0);
 129                     bb.clear();
 130                 }
 131             }
 132         });
 133     }
 134 
 135     /**
 136      * SocketChannel close while Fiber blocked in read.
 137      */
 138     public void testSocketChannelReadAsyncClose() throws Exception {
 139         test(() -> {
 140             try (var connection = new Connection()) {
 141                 SocketChannel sc = connection.channel1();
 142                 ScheduledCloser.schedule(sc, DELAY);
 143                 try {
 144                     int n = sc.read(ByteBuffer.allocate(100));
 145                     throw new RuntimeException("read returned " + n);
 146                 } catch (AsynchronousCloseException expected) { }
 147             }
 148         });
 149     }
 150 
 151     /**
 152      * Fiber interrupted while blocked in SocketChannel read.
 153      */
 154     public void testSocketChannelReadInterrupt() throws Exception {
 155         test(() -> {
 156             try (var connection = new Connection()) {
 157                 SocketChannel sc = connection.channel1();
 158                 ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
 159                 try {
 160                     int n = sc.read(ByteBuffer.allocate(100));
 161                     throw new RuntimeException("read returned " + n);
 162                 } catch (ClosedByInterruptException expected) { }
 163             }
 164         });
 165     }
 166 
 167     /**
 168      * Fiber cancelled while blocked in SocketChannel read.
 169      */
 170     public void testSocketChannelReadCancel() throws Exception {
 171         test(() -> {
 172             try (var connection = new Connection()) {
 173                 SocketChannel sc = connection.channel1();
 174                 var fiber = Fiber.current().orElseThrow();
 175                 ScheduledCanceller.schedule(fiber, DELAY);
 176                 try {
 177                     int n = sc.read(ByteBuffer.allocate(100));
 178                     throw new RuntimeException("read returned " + n);
 179                 } catch (IOException expected) { }
 180             }
 181         });
 182     }
 183 
 184     /**
 185      * SocketChannel close while Fiber blocked in write.
 186      */
 187     public void testSocketChannelWriteAsyncClose() throws Exception {
 188         test(() -> {
 189             try (var connection = new Connection()) {
 190                 SocketChannel sc = connection.channel1();
 191                 ScheduledCloser.schedule(sc, DELAY);
 192                 try {
 193                     ByteBuffer bb = ByteBuffer.allocate(100*10024);
 194                     for (;;) {
 195                         int n = sc.write(bb);
 196                         assertTrue(n > 0);
 197                         bb.clear();
 198                     }
 199                 } catch (AsynchronousCloseException expected) { }
 200             }
 201         });
 202     }
 203 
 204     /**
 205      * Fiber interrupted while blocked in SocketChannel write.
 206      */
 207     public void testSocketChannelWriteInterrupt() throws Exception {
 208         test(() -> {
 209             try (var connection = new Connection()) {
 210                 SocketChannel sc = connection.channel1();
 211                 ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
 212                 try {
 213                     ByteBuffer bb = ByteBuffer.allocate(100*10024);
 214                     for (;;) {
 215                         int n = sc.write(bb);
 216                         assertTrue(n > 0);
 217                         bb.clear();
 218                     }
 219                 } catch (ClosedByInterruptException expected) { }
 220             }
 221         });
 222     }
 223 
 224     /**
 225      * Fiber cancelled while blocked in SocketChannel write.
 226      */
 227     public void testSocketChannelWritCeancel() throws Exception {
 228         test(() -> {
 229             try (var connection = new Connection()) {
 230                 SocketChannel sc = connection.channel1();
 231                 var fiber = Fiber.current().orElseThrow();
 232                 ScheduledCanceller.schedule(fiber, DELAY);
 233                 try {
 234                     ByteBuffer bb = ByteBuffer.allocate(100*10024);
 235                     for (;;) {
 236                         int n = sc.write(bb);
 237                         assertTrue(n > 0);
 238                         bb.clear();
 239                     }
 240                 } catch (IOException expected) { }
 241             }
 242         });
 243     }
 244 
 245     /**
 246      * ServerSocketChannel accept, no blocking.
 247      */
 248     public void testServerSocketChannelAccept1() throws Exception {
 249         test(() -> {
 250             try (var ssc = ServerSocketChannel.open()) {
 251                 ssc.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0));
 252                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
 253                 // accept should not block
 254                 var sc2 = ssc.accept();
 255                 sc1.close();
 256                 sc2.close();
 257             }
 258         });
 259     }
 260 
 261     /**
 262      * Fiber blocks in ServerSocketChannel accept.
 263      */
 264     public void testServerSocketChannelAccept2() throws Exception {
 265         test(() -> {
 266             try (var ssc = ServerSocketChannel.open()) {
 267                 ssc.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0));
 268                 var sc1 = SocketChannel.open();
 269                 ScheduledConnector.schedule(sc1, ssc.getLocalAddress(), DELAY);
 270                 // accept will block
 271                 var sc2 = ssc.accept();
 272                 sc1.close();
 273                 sc2.close();
 274             }
 275         });
 276     }
 277 
 278     /**
 279      * SeverSocketChannel close while Fiber blocked in accept.
 280      */
 281     public void testServerSocketChannelAcceptAsyncClose() throws Exception {
 282         test(() -> {
 283             try (var ssc = ServerSocketChannel.open()) {
 284                 InetAddress lh = InetAddress.getLocalHost();
 285                 ssc.bind(new InetSocketAddress(lh, 0));
 286                 ScheduledCloser.schedule(ssc, DELAY);
 287                 try {
 288                     SocketChannel sc = ssc.accept();
 289                     sc.close();
 290                     throw new RuntimeException("connection accepted???");
 291                 } catch (AsynchronousCloseException expected) { }
 292             }
 293         });
 294     }
 295 
 296     /**
 297      * Fiber interrupted while blocked in ServerSocketChannel accept.
 298      */
 299     public void testServerSocketChannelAcceptInterrupt() throws Exception {
 300         test(() -> {
 301             try (var ssc = ServerSocketChannel.open()) {
 302                 InetAddress lh = InetAddress.getLocalHost();
 303                 ssc.bind(new InetSocketAddress(lh, 0));
 304                 ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
 305                 try {
 306                     SocketChannel sc = ssc.accept();
 307                     sc.close();
 308                     throw new RuntimeException("connection accepted???");
 309                 } catch (ClosedByInterruptException expected) { }
 310             }
 311         });
 312     }
 313 
 314     /**
 315      * Fiber cancelled while blocked in ServerSocketChannel accept.
 316      */
 317     public void testServerSocketChannelAcceptCancel() throws Exception {
 318         test(() -> {
 319             try (var ssc = ServerSocketChannel.open()) {
 320                 InetAddress lh = InetAddress.getLocalHost();
 321                 ssc.bind(new InetSocketAddress(lh, 0));
 322                 var fiber = Fiber.current().orElseThrow();
 323                 ScheduledCanceller.schedule(fiber, DELAY);
 324                 try {
 325                     SocketChannel sc = ssc.accept();
 326                     sc.close();
 327                     throw new RuntimeException("connection accepted???");
 328                 } catch (IOException expected) { }
 329             }
 330         });
 331     }
 332 
 333     /**
 334      * DatagramChannel receive/send, no blocking.
 335      */
 336     public void testDatagramhannelSendReceive1() throws Exception {
 337         test(() -> {
 338             try (DatagramChannel dc1 = DatagramChannel.open();
 339                  DatagramChannel dc2 = DatagramChannel.open()) {
 340 
 341                 InetAddress lh = InetAddress.getLocalHost();
 342                 dc2.bind(new InetSocketAddress(lh, 0));
 343 
 344                 // send should not block
 345                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 346                 int n = dc1.send(bb, dc2.getLocalAddress());
 347                 assertTrue(n > 0);
 348 
 349                 // receive should not block
 350                 bb = ByteBuffer.allocate(10);
 351                 dc2.receive(bb);
 352                 assertTrue(bb.get(0) == 'X');
 353             }
 354         });
 355     }
 356 
 357     /**
 358      * Fiber blocks in DatagramChannel receive.
 359      */
 360     public void testDatagramhannelSendReceive2() throws Exception {
 361         test(() -> {
 362             try (DatagramChannel dc1 = DatagramChannel.open();
 363                  DatagramChannel dc2 = DatagramChannel.open()) {
 364 
 365                 InetAddress lh = InetAddress.getLocalHost();
 366                 dc2.bind(new InetSocketAddress(lh, 0));
 367 
 368                 // schedule send
 369                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 370                 ScheduledSender.schedule(dc1, bb, dc2.getLocalAddress(), DELAY);
 371 
 372                 // read should block
 373                 bb = ByteBuffer.allocate(10);
 374                 dc2.receive(bb);
 375                 assertTrue(bb.get(0) == 'X');
 376             }
 377         });
 378     }
 379 
 380     /**
 381      * DatagramChannel close while Fiber blocked in receive.
 382      */
 383     public void testDatagramhannelReceiveAsyncClose() throws Exception {
 384         test(() -> {
 385             try (DatagramChannel dc = DatagramChannel.open()) {
 386                 InetAddress lh = InetAddress.getLocalHost();
 387                 dc.bind(new InetSocketAddress(lh, 0));
 388                 ScheduledCloser.schedule(dc, DELAY);
 389                 try {
 390                     dc.receive(ByteBuffer.allocate(100));
 391                     throw new RuntimeException("receive returned");
 392                 } catch (AsynchronousCloseException expected) { }
 393             }
 394         });
 395     }
 396 
 397     /**
 398      * Fiber interrupted while blocked in DatagramChannel receive.
 399      */
 400     public void testDatagramhannelReceiveInterrupt() throws Exception {
 401         test(() -> {
 402             try (DatagramChannel dc = DatagramChannel.open()) {
 403                 InetAddress lh = InetAddress.getLocalHost();
 404                 dc.bind(new InetSocketAddress(lh, 0));
 405                 ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
 406                 try {
 407                     dc.receive(ByteBuffer.allocate(100));
 408                     throw new RuntimeException("receive returned");
 409                 } catch (ClosedByInterruptException expected) { }
 410             }
 411         });
 412     }
 413 
 414     /**
 415      * Fiber cancelled while blocked in DatagramChannel receive.
 416      */
 417     public void testDatagramhannelReceiveCancel() throws Exception {
 418         test(() -> {
 419             try (DatagramChannel dc = DatagramChannel.open()) {
 420                 InetAddress lh = InetAddress.getLocalHost();
 421                 dc.bind(new InetSocketAddress(lh, 0));
 422                 var fiber = Fiber.current().orElseThrow();
 423                 ScheduledCanceller.schedule(fiber, DELAY);
 424                 try {
 425                     dc.receive(ByteBuffer.allocate(100));
 426                     throw new RuntimeException("receive returned");
 427                 } catch (IOException expected) { }
 428             }
 429         });
 430     }
 431 
 432     /**
 433      * Pipe read/write, no blocking.
 434      */
 435     public void testPipeReadWrite1() throws Exception {
 436         test(() -> {
 437             Pipe p = Pipe.open();
 438             try (Pipe.SinkChannel sink = p.sink();
 439                  Pipe.SourceChannel source = p.source()) {
 440 
 441                 // write should not block
 442                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 443                 int n = sink.write(bb);
 444                 assertTrue(n > 0);
 445 
 446                 // read should not block
 447                 bb = ByteBuffer.allocate(10);
 448                 n = source.read(bb);
 449                 assertTrue(n > 0);
 450                 assertTrue(bb.get(0) == 'X');
 451             }
 452         });
 453     }
 454 
 455     /**
 456      * Fiber blocks in Pipe.SourceChannel read.
 457      */
 458     public void testPipeReadWrite2() throws Exception {
 459         test(() -> {
 460             Pipe p = Pipe.open();
 461             try (Pipe.SinkChannel sink = p.sink();
 462                  Pipe.SourceChannel source = p.source()) {
 463 
 464                 // schedule write
 465                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 466                 ScheduledWriter.schedule(sink, bb, DELAY);
 467 
 468                 // read should block
 469                 bb = ByteBuffer.allocate(10);
 470                 int n = source.read(bb);
 471                 assertTrue(n > 0);
 472                 assertTrue(bb.get(0) == 'X');
 473             }
 474         });
 475     }
 476 
 477     /**
 478      * Fiber blocks in Pipe.SinkChannel write.
 479      */
 480     public void testPipeReadWrite3() throws Exception {
 481         test(() -> {
 482             Pipe p = Pipe.open();
 483             try (Pipe.SinkChannel sink = p.sink();
 484                  Pipe.SourceChannel source = p.source()) {
 485 
 486                 // schedule thread to read to EOF
 487                 ScheduledReader.schedule(source, true, DELAY);
 488 
 489                 // write should block
 490                 ByteBuffer bb = ByteBuffer.allocate(100*10024);
 491                 for (int i=0; i<1000; i++) {
 492                     int n = sink.write(bb);
 493                     assertTrue(n > 0);
 494                     bb.clear();
 495                 }
 496             }
 497         });
 498     }
 499 
 500     /**
 501      * Pipe.SourceChannel close while Fiber blocked in read.
 502      */
 503     public void testPipeReadAsyncClose() throws Exception {
 504         test(() -> {
 505             Pipe p = Pipe.open();
 506             try (Pipe.SourceChannel source = p.source()) {
 507                 ScheduledCloser.schedule(source, DELAY);
 508                 try {
 509                     int n = source.read(ByteBuffer.allocate(100));
 510                     throw new RuntimeException("read returned " + n);
 511                 } catch (AsynchronousCloseException expected) { }
 512             }
 513         });
 514     }
 515 
 516     /**
 517      * Fiber interrupted while blocked in Pipe.SourceChannel read.
 518      */
 519     public void testPipeReadInterrupt() throws Exception {
 520         test(() -> {
 521             Pipe p = Pipe.open();
 522             try (Pipe.SourceChannel source = p.source()) {
 523                 ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
 524                 try {
 525                     int n = source.read(ByteBuffer.allocate(100));
 526                     throw new RuntimeException("read returned " + n);
 527                 } catch (ClosedByInterruptException expected) { }
 528             }
 529         });
 530     }
 531 
 532     /**
 533      * Fiber cancelled while blocked in Pipe.SourceChannel read.
 534      */
 535     public void testPipeReadCancel() throws Exception {
 536         test(() -> {
 537             Pipe p = Pipe.open();
 538             try (Pipe.SourceChannel source = p.source()) {
 539                 var fiber = Fiber.current().orElseThrow();
 540                 ScheduledCanceller.schedule(fiber, DELAY);
 541                 try {
 542                     int n = source.read(ByteBuffer.allocate(100));
 543                     throw new RuntimeException("read returned " + n);
 544                 } catch (IOException expected) { }
 545             }
 546         });
 547     }
 548 
 549     /**
 550      * Pipe.SinkChannel close while Fiber blocked in write.
 551      */
 552     public void testPipeWriteAsyncClose() throws Exception {
 553         test(() -> {
 554             Pipe p = Pipe.open();
 555             try (Pipe.SinkChannel sink = p.sink()) {
 556                 ScheduledCloser.schedule(sink, DELAY);
 557                 try {
 558                     ByteBuffer bb = ByteBuffer.allocate(100*10024);
 559                     for (;;) {
 560                         int n = sink.write(bb);
 561                         assertTrue(n > 0);
 562                         bb.clear();
 563                     }
 564                 } catch (AsynchronousCloseException expected) { }
 565             }
 566         });
 567     }
 568 
 569     /**
 570      * Fiber interrupted while blocked in Pipe.SinkChannel write.
 571      */
 572     public void testPipeWriteInterrupt() throws Exception {
 573         test(() -> {
 574             Pipe p = Pipe.open();
 575             try (Pipe.SinkChannel sink = p.sink()) {
 576                 ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
 577                 try {
 578                     ByteBuffer bb = ByteBuffer.allocate(100*10024);
 579                     for (;;) {
 580                         int n = sink.write(bb);
 581                         assertTrue(n > 0);
 582                         bb.clear();
 583                     }
 584                 } catch (ClosedByInterruptException expected) { }
 585             }
 586         });
 587     }
 588 
 589     /**
 590      * Fiber cancelled while blocked in Pipe.SinkChannel write.
 591      */
 592     public void testPipeWriteCancel() throws Exception {
 593         test(() -> {
 594             Pipe p = Pipe.open();
 595             try (Pipe.SinkChannel sink = p.sink()) {
 596                 var fiber = Fiber.current().orElseThrow();
 597                 ScheduledCanceller.schedule(fiber, DELAY);
 598                 try {
 599                     ByteBuffer bb = ByteBuffer.allocate(100*10024);
 600                     for (;;) {
 601                         int n = sink.write(bb);
 602                         assertTrue(n > 0);
 603                         bb.clear();
 604                     }
 605                 } catch (IOException expected) { }
 606             }
 607         });
 608     }
 609 
 610     // -- supporting classes --
 611 
 612 
 613     /**
 614      * Creates a loopback connection
 615      */
 616     static class Connection implements Closeable {
 617         private final ServerSocketChannel ssc;
 618         private final SocketChannel sc1;
 619         private final SocketChannel sc2;
 620         Connection() throws IOException {
 621             var lh = InetAddress.getLocalHost();
 622             this.ssc = ServerSocketChannel.open().bind(new InetSocketAddress(lh, 0));
 623             this.sc1 = SocketChannel.open(ssc.getLocalAddress());
 624             this.sc2 = ssc.accept();
 625         }
 626         SocketChannel channel1() {
 627             return sc1;
 628         }
 629         SocketChannel channel2() {
 630             return sc2;
 631         }
 632         @Override
 633         public void close() throws IOException {
 634             if (ssc != null) ssc.close();
 635             if (sc1 != null) sc1.close();
 636             if (sc2 != null) sc2.close();
 637         }
 638     }
 639 
 640     /**
 641      * Closes a channel after a delay
 642      */
 643     static class ScheduledCloser implements Runnable {
 644         private final Closeable c;
 645         private final long delay;
 646         ScheduledCloser(Closeable c, long delay) {
 647             this.c = c;
 648             this.delay = delay;
 649         }
 650         @Override
 651         public void run() {
 652             try {
 653                 Thread.sleep(delay);
 654                 c.close();
 655             } catch (Exception e) { }
 656         }
 657         static void schedule(Closeable c, long delay) {
 658             new Thread(new ScheduledCloser(c, delay)).start();
 659         }
 660     }
 661 
 662     /**
 663      * Interrupts a thread or fiber after a delay
 664      */
 665     static class ScheduledInterrupter implements Runnable {
 666         private final Thread thread;
 667         private final long delay;
 668 
 669         ScheduledInterrupter(Thread thread, long delay) {
 670             this.thread = thread;
 671             this.delay = delay;
 672         }
 673 
 674         @Override
 675         public void run() {
 676             try {
 677                 Thread.sleep(delay);
 678                 thread.interrupt();
 679             } catch (Exception e) { }
 680         }
 681 
 682         static void schedule(Thread thread, long delay) {
 683             new Thread(new ScheduledInterrupter(thread, delay)).start();
 684         }
 685     }
 686 
 687     /**
 688      * Cancel a fiber after a delay
 689      */
 690     static class ScheduledCanceller implements Runnable {
 691         private final Fiber fiber;
 692         private final long delay;
 693 
 694         ScheduledCanceller(Fiber fiber, long delay) {
 695             this.fiber = fiber;
 696             this.delay = delay;
 697         }
 698 
 699         @Override
 700         public void run() {
 701             try {
 702                 Thread.sleep(delay);
 703                 fiber.cancel();
 704             } catch (Exception e) { }
 705         }
 706 
 707         static void schedule(Fiber fiber, long delay) {
 708             new Thread(new ScheduledCanceller(fiber, delay)).start();
 709         }
 710     }
 711 
 712     /**
 713      * Establish a connection to a socket address after a delay
 714      */
 715     static class ScheduledConnector implements Runnable {
 716         private final SocketChannel sc;
 717         private final SocketAddress address;
 718         private final long delay;
 719 
 720         ScheduledConnector(SocketChannel sc, SocketAddress address, long delay) {
 721             this.sc = sc;
 722             this.address = address;
 723             this.delay = delay;
 724         }
 725 
 726         @Override
 727         public void run() {
 728             try {
 729                 Thread.sleep(delay);
 730                 sc.connect(address);
 731             } catch (Exception e) { }
 732         }
 733 
 734         static void schedule(SocketChannel sc, SocketAddress address, long delay) {
 735             new Thread(new ScheduledConnector(sc, address, delay)).start();
 736         }
 737     }
 738 
 739     /**
 740      * Reads from a connection, and to EOF, after a delay
 741      */
 742     static class ScheduledReader implements Runnable {
 743         private final ReadableByteChannel rbc;
 744         private final boolean readAll;
 745         private final long delay;
 746 
 747         ScheduledReader(ReadableByteChannel rbc, boolean readAll, long delay) {
 748             this.rbc = rbc;
 749             this.readAll = readAll;
 750             this.delay = delay;
 751         }
 752 
 753         @Override
 754         public void run() {
 755             try {
 756                 Thread.sleep(delay);
 757                 ByteBuffer bb = ByteBuffer.allocate(8192);
 758                 for (;;) {
 759                     int n = rbc.read(bb);
 760                     if (n == -1 || !readAll)
 761                         break;
 762                     bb.clear();
 763                 }
 764             } catch (Exception e) { }
 765         }
 766 
 767         static void schedule(ReadableByteChannel rbc, boolean readAll, long delay) {
 768             new Thread(new ScheduledReader(rbc, readAll, delay)).start();
 769         }
 770     }
 771 
 772     /**
 773      * Writes to a connection after a delay
 774      */
 775     static class ScheduledWriter implements Runnable {
 776         private final WritableByteChannel wbc;
 777         private final ByteBuffer buf;
 778         private final long delay;
 779 
 780         ScheduledWriter(WritableByteChannel wbc, ByteBuffer buf, long delay) {
 781             this.wbc = wbc;
 782             this.buf = buf;
 783             this.delay = delay;
 784         }
 785 
 786         @Override
 787         public void run() {
 788             try {
 789                 Thread.sleep(delay);
 790                 wbc.write(buf);
 791             } catch (Exception e) { }
 792         }
 793 
 794         static void schedule(WritableByteChannel wbc, ByteBuffer buf, long delay) {
 795             new Thread(new ScheduledWriter(wbc, buf, delay)).start();
 796         }
 797     }
 798 
 799     /**
 800      * Sends a datagram to a target address after a delay
 801      */
 802     static class ScheduledSender implements Runnable {
 803         private final DatagramChannel dc;
 804         private final ByteBuffer buf;
 805         private final SocketAddress address;
 806         private final long delay;
 807 
 808         ScheduledSender(DatagramChannel dc, ByteBuffer buf, SocketAddress address, long delay) {
 809             this.dc = dc;
 810             this.buf = buf;
 811             this.address = address;
 812             this.delay = delay;
 813         }
 814 
 815         @Override
 816         public void run() {
 817             try {
 818                 Thread.sleep(delay);
 819                 dc.send(buf, address);
 820             } catch (Exception e) { }
 821         }
 822 
 823         static void schedule(DatagramChannel dc, ByteBuffer buf,
 824                              SocketAddress address, long delay) {
 825             new Thread(new ScheduledSender(dc, buf, address, delay)).start();
 826         }
 827     }
 828 
 829 }