< prev index next >

src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java

Print this page




  38 import java.net.SocketAddress;
  39 import java.net.SocketOption;
  40 import java.net.StandardProtocolFamily;
  41 import java.net.StandardSocketOptions;
  42 import java.nio.ByteBuffer;
  43 import java.nio.channels.AlreadyBoundException;
  44 import java.nio.channels.AlreadyConnectedException;
  45 import java.nio.channels.AsynchronousCloseException;
  46 import java.nio.channels.ClosedChannelException;
  47 import java.nio.channels.DatagramChannel;
  48 import java.nio.channels.MembershipKey;
  49 import java.nio.channels.NotYetConnectedException;
  50 import java.nio.channels.SelectionKey;
  51 import java.nio.channels.spi.SelectorProvider;
  52 import java.util.Collections;
  53 import java.util.HashSet;
  54 import java.util.Objects;
  55 import java.util.Set;
  56 import java.util.concurrent.locks.ReentrantLock;
  57 

  58 import sun.net.ResourceManager;
  59 import sun.net.ext.ExtendedSocketOptions;
  60 import sun.net.util.IPAddressUtil;
  61 
  62 /**
  63  * An implementation of DatagramChannels.
  64  */
  65 
  66 class DatagramChannelImpl
  67     extends DatagramChannel
  68     implements SelChImpl
  69 {
  70     // Used to make native read and write calls
  71     private static NativeDispatcher nd = new DatagramDispatcher();
  72 
  73     // The protocol family of the socket
  74     private final ProtocolFamily family;
  75 
  76     // Our file descriptor
  77     private final FileDescriptor fd;


 104     // IDs of native threads doing reads and writes, for signalling
 105     private long readerThread;
 106     private long writerThread;
 107 
 108     // Binding and remote address (when connected)
 109     private InetSocketAddress localAddress;
 110     private InetSocketAddress remoteAddress;
 111 
 112     // Our socket adaptor, if any
 113     private DatagramSocket socket;
 114 
 115     // Multicast support
 116     private MembershipRegistry registry;
 117 
 118     // set true when socket is bound and SO_REUSEADDRESS is emulated
 119     private boolean reuseAddressEmulated;
 120 
 121     // set true/false when socket is already bound and SO_REUSEADDR is emulated
 122     private boolean isReuseAddress;
 123 



 124     // -- End of fields protected by stateLock
 125 
 126     public DatagramChannelImpl(SelectorProvider sp)
 127         throws IOException
 128     {
 129         super(sp);
 130         ResourceManager.beforeUdpCreate();
 131         try {
 132             this.family = Net.isIPv6Available()
 133                     ? StandardProtocolFamily.INET6
 134                     : StandardProtocolFamily.INET;
 135             this.fd = Net.socket(family, false);
 136             this.fdVal = IOUtil.fdVal(fd);
 137         } catch (IOException ioe) {
 138             ResourceManager.afterUdpClose();
 139             throw ioe;
 140         }
 141     }
 142 
 143     public DatagramChannelImpl(SelectorProvider sp, ProtocolFamily family)


 392             }
 393             // remove hook for Thread.interrupt
 394             end(completed);
 395         }
 396     }
 397 
 398     private SocketAddress sender;       // Set by receive0 (## ugh)
 399 
 400     @Override
 401     public SocketAddress receive(ByteBuffer dst) throws IOException {
 402         if (dst.isReadOnly())
 403             throw new IllegalArgumentException("Read-only buffer");
 404 
 405         readLock.lock();
 406         try {
 407             boolean blocking = isBlocking();
 408             int n = 0;
 409             ByteBuffer bb = null;
 410             try {
 411                 SocketAddress remote = beginRead(blocking, false);

 412                 boolean connected = (remote != null);
 413                 SecurityManager sm = System.getSecurityManager();
 414                 if (connected || (sm == null)) {
 415                     // connected or no security manager
 416                     n = receive(fd, dst, connected);
 417                     if (blocking) {
 418                         while (IOStatus.okayToRetry(n) && isOpen()) {
 419                             park(Net.POLLIN);
 420                             n = receive(fd, dst, connected);
 421                         }
 422                     } else if (n == IOStatus.UNAVAILABLE) {
 423                         return null;
 424                     }
 425                 } else {
 426                     // Cannot receive into user's buffer when running with a
 427                     // security manager and not connected
 428                     bb = Util.getTemporaryDirectBuffer(dst.remaining());
 429                     for (;;) {
 430                         n = receive(fd, bb, connected);
 431                         if (blocking) {


 496     {
 497         int n = receive0(fd, ((DirectBuffer)bb).address() + pos, rem, connected);
 498         if (n > 0)
 499             bb.position(pos + n);
 500         return n;
 501     }
 502 
 503     @Override
 504     public int send(ByteBuffer src, SocketAddress target)
 505         throws IOException
 506     {
 507         Objects.requireNonNull(src);
 508         InetSocketAddress isa = Net.checkAddress(target, family);
 509 
 510         writeLock.lock();
 511         try {
 512             boolean blocking = isBlocking();
 513             int n = 0;
 514             try {
 515                 SocketAddress remote = beginWrite(blocking, false);

 516                 if (remote != null) {
 517                     // connected
 518                     if (!target.equals(remote)) {
 519                         throw new AlreadyConnectedException();
 520                     }
 521                     n = IOUtil.write(fd, src, -1, nd);
 522                     if (blocking) {
 523                         while (IOStatus.okayToRetry(n) && isOpen()) {
 524                             park(Net.POLLOUT);
 525                             n = IOUtil.write(fd, src, -1, nd);
 526                         }
 527                     }
 528                 } else {
 529                     // not connected
 530                     SecurityManager sm = System.getSecurityManager();
 531                     InetAddress ia = isa.getAddress();
 532                     if (sm != null) {
 533                         if (ia.isMulticastAddress()) {
 534                             sm.checkMulticast(ia);
 535                         } else {


 603         } catch (PortUnreachableException pue) {
 604             if (isConnected())
 605                 throw pue;
 606             written = rem;
 607         }
 608         if (written > 0)
 609             bb.position(pos + written);
 610         return written;
 611     }
 612 
 613     @Override
 614     public int read(ByteBuffer buf) throws IOException {
 615         Objects.requireNonNull(buf);
 616 
 617         readLock.lock();
 618         try {
 619             boolean blocking = isBlocking();
 620             int n = 0;
 621             try {
 622                 beginRead(blocking, true);

 623                 n = IOUtil.read(fd, buf, -1, nd);
 624                 if (blocking) {
 625                     while (IOStatus.okayToRetry(n) && isOpen()) {
 626                         park(Net.POLLIN);
 627                         n = IOUtil.read(fd, buf, -1, nd);
 628                     }
 629                 }
 630             } finally {
 631                 endRead(blocking, n > 0);
 632                 assert IOStatus.check(n);
 633             }
 634             return IOStatus.normalize(n);
 635         } finally {
 636             readLock.unlock();
 637         }
 638     }
 639 
 640     @Override
 641     public long read(ByteBuffer[] dsts, int offset, int length)
 642         throws IOException
 643     {
 644         Objects.checkFromIndexSize(offset, length, dsts.length);
 645 
 646         readLock.lock();
 647         try {
 648             boolean blocking = isBlocking();
 649             long n = 0;
 650             try {
 651                 beginRead(blocking, true);

 652                 n = IOUtil.read(fd, dsts, offset, length, nd);
 653                 if (blocking) {
 654                     while (IOStatus.okayToRetry(n) && isOpen()) {
 655                         park(Net.POLLIN);
 656                         n = IOUtil.read(fd, dsts, offset, length, nd);
 657                     }
 658                 }
 659             } finally {
 660                 endRead(blocking, n > 0);
 661                 assert IOStatus.check(n);
 662             }
 663             return IOStatus.normalize(n);
 664         } finally {
 665             readLock.unlock();
 666         }
 667     }
 668 
 669     /**
 670      * Marks the beginning of a write operation that might block.
 671      * @param blocking true if configured blocking


 709                 writerThread = 0;
 710                 if (state == ST_CLOSING) {
 711                     tryFinishClose();
 712                 }
 713             }
 714             // remove hook for Thread.interrupt
 715             end(completed);
 716         }
 717     }
 718 
 719     @Override
 720     public int write(ByteBuffer buf) throws IOException {
 721         Objects.requireNonNull(buf);
 722 
 723         writeLock.lock();
 724         try {
 725             boolean blocking = isBlocking();
 726             int n = 0;
 727             try {
 728                 beginWrite(blocking, true);

 729                 n = IOUtil.write(fd, buf, -1, nd);
 730                 if (blocking) {
 731                     while (IOStatus.okayToRetry(n) && isOpen()) {
 732                         park(Net.POLLOUT);
 733                         n = IOUtil.write(fd, buf, -1, nd);
 734                     }
 735                 }
 736             } finally {
 737                 endWrite(blocking, n > 0);
 738                 assert IOStatus.check(n);
 739             }
 740             return IOStatus.normalize(n);
 741         } finally {
 742             writeLock.unlock();
 743         }
 744     }
 745 
 746     @Override
 747     public long write(ByteBuffer[] srcs, int offset, int length)
 748         throws IOException
 749     {
 750         Objects.checkFromIndexSize(offset, length, srcs.length);
 751 
 752         writeLock.lock();
 753         try {
 754             boolean blocking = isBlocking();
 755             long n = 0;
 756             try {
 757                 beginWrite(blocking, true);

 758                 n = IOUtil.write(fd, srcs, offset, length, nd);
 759                 if (blocking) {
 760                     while (IOStatus.okayToRetry(n) && isOpen()) {
 761                         park(Net.POLLOUT);
 762                         n = IOUtil.write(fd, srcs, offset, length, nd);
 763                     }
 764                 }
 765             } finally {
 766                 endWrite(blocking, n > 0);
 767                 assert IOStatus.check(n);
 768             }
 769             return IOStatus.normalize(n);
 770         } finally {
 771             writeLock.unlock();
 772         }
 773     }
 774 
 775     @Override
 776     protected void implConfigureBlocking(boolean block) throws IOException {
 777         readLock.lock();
 778         try {
 779             writeLock.lock();
 780             try {
 781                 synchronized (stateLock) {
 782                     ensureOpen();
 783                     IOUtil.configureBlocking(fd, block);
 784                 }
 785             } finally {
 786                 writeLock.unlock();
 787             }
 788         } finally {
 789             readLock.unlock();
 790         }
 791     }
 792 






























 793     InetSocketAddress localAddress() {
 794         synchronized (stateLock) {
 795             return localAddress;
 796         }
 797     }
 798 
 799     InetSocketAddress remoteAddress() {
 800         synchronized (stateLock) {
 801             return remoteAddress;
 802         }
 803     }
 804 
 805     @Override
 806     public DatagramChannel bind(SocketAddress local) throws IOException {
 807         readLock.lock();
 808         try {
 809             writeLock.lock();
 810             try {
 811                 synchronized (stateLock) {
 812                     ensureOpen();


 875                     if (state == ST_CONNECTED)
 876                         throw new AlreadyConnectedException();
 877 
 878                     int n = Net.connect(family,
 879                                         fd,
 880                                         isa.getAddress(),
 881                                         isa.getPort());
 882                     if (n <= 0)
 883                         throw new Error();      // Can't happen
 884 
 885                     // connected
 886                     remoteAddress = isa;
 887                     state = ST_CONNECTED;
 888 
 889                     // refresh local address
 890                     localAddress = Net.localAddress(fd);
 891 
 892                     // flush any packets already received.
 893                     boolean blocking = isBlocking();
 894                     if (blocking) {
 895                         IOUtil.configureBlocking(fd, false);
 896                     }
 897                     try {
 898                         ByteBuffer buf = ByteBuffer.allocate(100);
 899                         while (receive(fd, buf, false) > 0) {
 900                             buf.clear();
 901                         }
 902                     } finally {
 903                         if (blocking) {
 904                             IOUtil.configureBlocking(fd, true);
 905                         }
 906                     }
 907                 }
 908             } finally {
 909                 writeLock.unlock();
 910             }
 911         } finally {
 912             readLock.unlock();
 913         }
 914         return this;
 915     }
 916 
 917     @Override
 918     public DatagramChannel disconnect() throws IOException {
 919         readLock.lock();
 920         try {
 921             writeLock.lock();
 922             try {
 923                 synchronized (stateLock) {
 924                     if (!isOpen() || (state != ST_CONNECTED))


1188     /**
1189      * Closes this channel when configured in blocking mode.
1190      *
1191      * If there is an I/O operation in progress then the socket is pre-closed
1192      * and the I/O threads signalled, in which case the final close is deferred
1193      * until all I/O operations complete.
1194      */
1195     private void implCloseBlockingMode() throws IOException {
1196         synchronized (stateLock) {
1197             assert state < ST_CLOSING;
1198             state = ST_CLOSING;
1199 
1200             // if member of any multicast groups then invalidate the keys
1201             if (registry != null)
1202                 registry.invalidateAll();
1203 
1204             if (!tryClose()) {
1205                 long reader = readerThread;
1206                 long writer = writerThread;
1207                 if (reader != 0 || writer != 0) {



1208                     nd.preClose(fd);
1209                     if (reader != 0)
1210                         NativeThread.signal(reader);
1211                     if (writer != 0)
1212                         NativeThread.signal(writer);
1213                 }
1214             }
1215         }
1216     }
1217 
1218     /**
1219      * Closes this channel when configured in non-blocking mode.
1220      *
1221      * If the channel is registered with a Selector then the close is deferred
1222      * until the channel is flushed from all Selectors.
1223      */
1224     private void implCloseNonBlockingMode() throws IOException {
1225         synchronized (stateLock) {
1226             assert state < ST_CLOSING;
1227             state = ST_CLOSING;
1228 
1229             // if member of any multicast groups then invalidate the keys
1230             if (registry != null)
1231                 registry.invalidateAll();




  38 import java.net.SocketAddress;
  39 import java.net.SocketOption;
  40 import java.net.StandardProtocolFamily;
  41 import java.net.StandardSocketOptions;
  42 import java.nio.ByteBuffer;
  43 import java.nio.channels.AlreadyBoundException;
  44 import java.nio.channels.AlreadyConnectedException;
  45 import java.nio.channels.AsynchronousCloseException;
  46 import java.nio.channels.ClosedChannelException;
  47 import java.nio.channels.DatagramChannel;
  48 import java.nio.channels.MembershipKey;
  49 import java.nio.channels.NotYetConnectedException;
  50 import java.nio.channels.SelectionKey;
  51 import java.nio.channels.spi.SelectorProvider;
  52 import java.util.Collections;
  53 import java.util.HashSet;
  54 import java.util.Objects;
  55 import java.util.Set;
  56 import java.util.concurrent.locks.ReentrantLock;
  57 
  58 import jdk.internal.misc.Strands;
  59 import sun.net.ResourceManager;
  60 import sun.net.ext.ExtendedSocketOptions;
  61 import sun.net.util.IPAddressUtil;
  62 
  63 /**
  64  * An implementation of DatagramChannels.
  65  */
  66 
  67 class DatagramChannelImpl
  68     extends DatagramChannel
  69     implements SelChImpl
  70 {
  71     // Used to make native read and write calls
  72     private static NativeDispatcher nd = new DatagramDispatcher();
  73 
  74     // The protocol family of the socket
  75     private final ProtocolFamily family;
  76 
  77     // Our file descriptor
  78     private final FileDescriptor fd;


 105     // IDs of native threads doing reads and writes, for signalling
 106     private long readerThread;
 107     private long writerThread;
 108 
 109     // Binding and remote address (when connected)
 110     private InetSocketAddress localAddress;
 111     private InetSocketAddress remoteAddress;
 112 
 113     // Our socket adaptor, if any
 114     private DatagramSocket socket;
 115 
 116     // Multicast support
 117     private MembershipRegistry registry;
 118 
 119     // set true when socket is bound and SO_REUSEADDRESS is emulated
 120     private boolean reuseAddressEmulated;
 121 
 122     // set true/false when socket is already bound and SO_REUSEADDR is emulated
 123     private boolean isReuseAddress;
 124 
 125     // lazily set to true when the socket is configured non-blocking
 126     private volatile boolean nonBlocking;
 127 
 128     // -- End of fields protected by stateLock
 129 
 130     public DatagramChannelImpl(SelectorProvider sp)
 131         throws IOException
 132     {
 133         super(sp);
 134         ResourceManager.beforeUdpCreate();
 135         try {
 136             this.family = Net.isIPv6Available()
 137                     ? StandardProtocolFamily.INET6
 138                     : StandardProtocolFamily.INET;
 139             this.fd = Net.socket(family, false);
 140             this.fdVal = IOUtil.fdVal(fd);
 141         } catch (IOException ioe) {
 142             ResourceManager.afterUdpClose();
 143             throw ioe;
 144         }
 145     }
 146 
 147     public DatagramChannelImpl(SelectorProvider sp, ProtocolFamily family)


 396             }
 397             // remove hook for Thread.interrupt
 398             end(completed);
 399         }
 400     }
 401 
 402     private SocketAddress sender;       // Set by receive0 (## ugh)
 403 
 404     @Override
 405     public SocketAddress receive(ByteBuffer dst) throws IOException {
 406         if (dst.isReadOnly())
 407             throw new IllegalArgumentException("Read-only buffer");
 408 
 409         readLock.lock();
 410         try {
 411             boolean blocking = isBlocking();
 412             int n = 0;
 413             ByteBuffer bb = null;
 414             try {
 415                 SocketAddress remote = beginRead(blocking, false);
 416                 lockedConfigureNonBlockingIfFiber();
 417                 boolean connected = (remote != null);
 418                 SecurityManager sm = System.getSecurityManager();
 419                 if (connected || (sm == null)) {
 420                     // connected or no security manager
 421                     n = receive(fd, dst, connected);
 422                     if (blocking) {
 423                         while (IOStatus.okayToRetry(n) && isOpen()) {
 424                             park(Net.POLLIN);
 425                             n = receive(fd, dst, connected);
 426                         }
 427                     } else if (n == IOStatus.UNAVAILABLE) {
 428                         return null;
 429                     }
 430                 } else {
 431                     // Cannot receive into user's buffer when running with a
 432                     // security manager and not connected
 433                     bb = Util.getTemporaryDirectBuffer(dst.remaining());
 434                     for (;;) {
 435                         n = receive(fd, bb, connected);
 436                         if (blocking) {


 501     {
 502         int n = receive0(fd, ((DirectBuffer)bb).address() + pos, rem, connected);
 503         if (n > 0)
 504             bb.position(pos + n);
 505         return n;
 506     }
 507 
 508     @Override
 509     public int send(ByteBuffer src, SocketAddress target)
 510         throws IOException
 511     {
 512         Objects.requireNonNull(src);
 513         InetSocketAddress isa = Net.checkAddress(target, family);
 514 
 515         writeLock.lock();
 516         try {
 517             boolean blocking = isBlocking();
 518             int n = 0;
 519             try {
 520                 SocketAddress remote = beginWrite(blocking, false);
 521                 lockedConfigureNonBlockingIfFiber();
 522                 if (remote != null) {
 523                     // connected
 524                     if (!target.equals(remote)) {
 525                         throw new AlreadyConnectedException();
 526                     }
 527                     n = IOUtil.write(fd, src, -1, nd);
 528                     if (blocking) {
 529                         while (IOStatus.okayToRetry(n) && isOpen()) {
 530                             park(Net.POLLOUT);
 531                             n = IOUtil.write(fd, src, -1, nd);
 532                         }
 533                     }
 534                 } else {
 535                     // not connected
 536                     SecurityManager sm = System.getSecurityManager();
 537                     InetAddress ia = isa.getAddress();
 538                     if (sm != null) {
 539                         if (ia.isMulticastAddress()) {
 540                             sm.checkMulticast(ia);
 541                         } else {


 609         } catch (PortUnreachableException pue) {
 610             if (isConnected())
 611                 throw pue;
 612             written = rem;
 613         }
 614         if (written > 0)
 615             bb.position(pos + written);
 616         return written;
 617     }
 618 
 619     @Override
 620     public int read(ByteBuffer buf) throws IOException {
 621         Objects.requireNonNull(buf);
 622 
 623         readLock.lock();
 624         try {
 625             boolean blocking = isBlocking();
 626             int n = 0;
 627             try {
 628                 beginRead(blocking, true);
 629                 lockedConfigureNonBlockingIfFiber();
 630                 n = IOUtil.read(fd, buf, -1, nd);
 631                 if (blocking) {
 632                     while (IOStatus.okayToRetry(n) && isOpen()) {
 633                         park(Net.POLLIN);
 634                         n = IOUtil.read(fd, buf, -1, nd);
 635                     }
 636                 }
 637             } finally {
 638                 endRead(blocking, n > 0);
 639                 assert IOStatus.check(n);
 640             }
 641             return IOStatus.normalize(n);
 642         } finally {
 643             readLock.unlock();
 644         }
 645     }
 646 
 647     @Override
 648     public long read(ByteBuffer[] dsts, int offset, int length)
 649         throws IOException
 650     {
 651         Objects.checkFromIndexSize(offset, length, dsts.length);
 652 
 653         readLock.lock();
 654         try {
 655             boolean blocking = isBlocking();
 656             long n = 0;
 657             try {
 658                 beginRead(blocking, true);
 659                 lockedConfigureNonBlockingIfFiber();
 660                 n = IOUtil.read(fd, dsts, offset, length, nd);
 661                 if (blocking) {
 662                     while (IOStatus.okayToRetry(n) && isOpen()) {
 663                         park(Net.POLLIN);
 664                         n = IOUtil.read(fd, dsts, offset, length, nd);
 665                     }
 666                 }
 667             } finally {
 668                 endRead(blocking, n > 0);
 669                 assert IOStatus.check(n);
 670             }
 671             return IOStatus.normalize(n);
 672         } finally {
 673             readLock.unlock();
 674         }
 675     }
 676 
 677     /**
 678      * Marks the beginning of a write operation that might block.
 679      * @param blocking true if configured blocking


 717                 writerThread = 0;
 718                 if (state == ST_CLOSING) {
 719                     tryFinishClose();
 720                 }
 721             }
 722             // remove hook for Thread.interrupt
 723             end(completed);
 724         }
 725     }
 726 
 727     @Override
 728     public int write(ByteBuffer buf) throws IOException {
 729         Objects.requireNonNull(buf);
 730 
 731         writeLock.lock();
 732         try {
 733             boolean blocking = isBlocking();
 734             int n = 0;
 735             try {
 736                 beginWrite(blocking, true);
 737                 lockedConfigureNonBlockingIfFiber();
 738                 n = IOUtil.write(fd, buf, -1, nd);
 739                 if (blocking) {
 740                     while (IOStatus.okayToRetry(n) && isOpen()) {
 741                         park(Net.POLLOUT);
 742                         n = IOUtil.write(fd, buf, -1, nd);
 743                     }
 744                 }
 745             } finally {
 746                 endWrite(blocking, n > 0);
 747                 assert IOStatus.check(n);
 748             }
 749             return IOStatus.normalize(n);
 750         } finally {
 751             writeLock.unlock();
 752         }
 753     }
 754 
 755     @Override
 756     public long write(ByteBuffer[] srcs, int offset, int length)
 757         throws IOException
 758     {
 759         Objects.checkFromIndexSize(offset, length, srcs.length);
 760 
 761         writeLock.lock();
 762         try {
 763             boolean blocking = isBlocking();
 764             long n = 0;
 765             try {
 766                 beginWrite(blocking, true);
 767                 lockedConfigureNonBlockingIfFiber();
 768                 n = IOUtil.write(fd, srcs, offset, length, nd);
 769                 if (blocking) {
 770                     while (IOStatus.okayToRetry(n) && isOpen()) {
 771                         park(Net.POLLOUT);
 772                         n = IOUtil.write(fd, srcs, offset, length, nd);
 773                     }
 774                 }
 775             } finally {
 776                 endWrite(blocking, n > 0);
 777                 assert IOStatus.check(n);
 778             }
 779             return IOStatus.normalize(n);
 780         } finally {
 781             writeLock.unlock();
 782         }
 783     }
 784 
 785     @Override
 786     protected void implConfigureBlocking(boolean block) throws IOException {
 787         readLock.lock();
 788         try {
 789             writeLock.lock();
 790             try {
 791                 lockedConfigureBlocking(block);



 792             } finally {
 793                 writeLock.unlock();
 794             }
 795         } finally {
 796             readLock.unlock();
 797         }
 798     }
 799 
 800     /**
 801      * Adjust the blocking mode while holding readLock or writeLock.
 802      */
 803     private void lockedConfigureBlocking(boolean block) throws IOException {
 804         assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
 805         synchronized (stateLock) {
 806             ensureOpen();
 807             // do nothing if fiber has forced the socket to be non-blocking
 808             if (!nonBlocking) {
 809                 IOUtil.configureBlocking(fd, block);
 810             }
 811         }
 812     }
 813 
 814     /**
 815      * Ensures that the socket is configured non-blocking when the current
 816      * strand is a fiber or a timeout is specified.
 817      * @throws IOException if there is an I/O error changing the blocking mode
 818      */
 819     private void lockedConfigureNonBlockingIfFiber() throws IOException {
 820         assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
 821         if (!nonBlocking && (Strands.currentStrand() instanceof Fiber)) {
 822             synchronized (stateLock) {
 823                 ensureOpen();
 824                 IOUtil.configureBlocking(fd, false);
 825                 nonBlocking = true;
 826             }
 827         }
 828     }
 829 
 830     InetSocketAddress localAddress() {
 831         synchronized (stateLock) {
 832             return localAddress;
 833         }
 834     }
 835 
 836     InetSocketAddress remoteAddress() {
 837         synchronized (stateLock) {
 838             return remoteAddress;
 839         }
 840     }
 841 
 842     @Override
 843     public DatagramChannel bind(SocketAddress local) throws IOException {
 844         readLock.lock();
 845         try {
 846             writeLock.lock();
 847             try {
 848                 synchronized (stateLock) {
 849                     ensureOpen();


 912                     if (state == ST_CONNECTED)
 913                         throw new AlreadyConnectedException();
 914 
 915                     int n = Net.connect(family,
 916                                         fd,
 917                                         isa.getAddress(),
 918                                         isa.getPort());
 919                     if (n <= 0)
 920                         throw new Error();      // Can't happen
 921 
 922                     // connected
 923                     remoteAddress = isa;
 924                     state = ST_CONNECTED;
 925 
 926                     // refresh local address
 927                     localAddress = Net.localAddress(fd);
 928 
 929                     // flush any packets already received.
 930                     boolean blocking = isBlocking();
 931                     if (blocking) {
 932                         lockedConfigureBlocking(false);
 933                     }
 934                     try {
 935                         ByteBuffer buf = ByteBuffer.allocate(100);
 936                         while (receive(fd, buf, false) > 0) {
 937                             buf.clear();
 938                         }
 939                     } finally {
 940                         if (blocking) {
 941                             lockedConfigureBlocking(true);
 942                         }
 943                     }
 944                 }
 945             } finally {
 946                 writeLock.unlock();
 947             }
 948         } finally {
 949             readLock.unlock();
 950         }
 951         return this;
 952     }
 953 
 954     @Override
 955     public DatagramChannel disconnect() throws IOException {
 956         readLock.lock();
 957         try {
 958             writeLock.lock();
 959             try {
 960                 synchronized (stateLock) {
 961                     if (!isOpen() || (state != ST_CONNECTED))


1225     /**
1226      * Closes this channel when configured in blocking mode.
1227      *
1228      * If there is an I/O operation in progress then the socket is pre-closed
1229      * and the I/O threads signalled, in which case the final close is deferred
1230      * until all I/O operations complete.
1231      */
1232     private void implCloseBlockingMode() throws IOException {
1233         synchronized (stateLock) {
1234             assert state < ST_CLOSING;
1235             state = ST_CLOSING;
1236 
1237             // if member of any multicast groups then invalidate the keys
1238             if (registry != null)
1239                 registry.invalidateAll();
1240 
1241             if (!tryClose()) {
1242                 long reader = readerThread;
1243                 long writer = writerThread;
1244                 if (reader != 0 || writer != 0) {
1245                     if (NativeThread.isFiber(reader) || NativeThread.isFiber(writer)) {
1246                         Poller.stopPoll(fdVal);
1247                     }
1248                     nd.preClose(fd);
1249                     if (NativeThread.isKernelThread(reader))
1250                         NativeThread.signal(reader);
1251                     if (NativeThread.isKernelThread(writer))
1252                         NativeThread.signal(writer);
1253                 }
1254             }
1255         }
1256     }
1257 
1258     /**
1259      * Closes this channel when configured in non-blocking mode.
1260      *
1261      * If the channel is registered with a Selector then the close is deferred
1262      * until the channel is flushed from all Selectors.
1263      */
1264     private void implCloseNonBlockingMode() throws IOException {
1265         synchronized (stateLock) {
1266             assert state < ST_CLOSING;
1267             state = ST_CLOSING;
1268 
1269             // if member of any multicast groups then invalidate the keys
1270             if (registry != null)
1271                 registry.invalidateAll();


< prev index next >