< prev index next >

src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java

Print this page




  38 import java.net.StandardProtocolFamily;
  39 import java.net.StandardSocketOptions;
  40 import java.nio.ByteBuffer;
  41 import java.nio.channels.AlreadyBoundException;
  42 import java.nio.channels.AlreadyConnectedException;
  43 import java.nio.channels.AsynchronousCloseException;
  44 import java.nio.channels.ClosedChannelException;
  45 import java.nio.channels.ConnectionPendingException;
  46 import java.nio.channels.IllegalBlockingModeException;
  47 import java.nio.channels.NoConnectionPendingException;
  48 import java.nio.channels.NotYetConnectedException;
  49 import java.nio.channels.SelectionKey;
  50 import java.nio.channels.SocketChannel;
  51 import java.nio.channels.spi.SelectorProvider;
  52 import java.util.Collections;
  53 import java.util.HashSet;
  54 import java.util.Objects;
  55 import java.util.Set;
  56 import java.util.concurrent.locks.ReentrantLock;
  57 

  58 import sun.net.ConnectionResetException;
  59 import sun.net.NetHooks;
  60 import sun.net.ext.ExtendedSocketOptions;
  61 import sun.net.util.SocketExceptions;
  62 
  63 /**
  64  * An implementation of SocketChannels
  65  */
  66 
  67 class SocketChannelImpl
  68     extends SocketChannel
  69     implements SelChImpl
  70 {
  71     // Used to make native read and write calls
  72     private static final NativeDispatcher nd = new SocketDispatcher();
  73 
  74     // Our file descriptor object
  75     private final FileDescriptor fd;
  76     private final int fdVal;
  77 


  99 
 100     // State, increases monotonically
 101     private static final int ST_UNCONNECTED = 0;
 102     private static final int ST_CONNECTIONPENDING = 1;
 103     private static final int ST_CONNECTED = 2;
 104     private static final int ST_CLOSING = 3;
 105     private static final int ST_CLOSED = 4;
 106     private volatile int state;  // need stateLock to change
 107 
 108     // IDs of native threads doing reads and writes, for signalling
 109     private long readerThread;
 110     private long writerThread;
 111 
 112     // Binding
 113     private InetSocketAddress localAddress;
 114     private InetSocketAddress remoteAddress;
 115 
 116     // Socket adaptor, created on demand
 117     private Socket socket;
 118 



 119     // -- End of fields protected by stateLock
 120 
 121 
 122     // Constructor for normal connecting sockets
 123     //
 124     SocketChannelImpl(SelectorProvider sp) throws IOException {
 125         super(sp);
 126         this.fd = Net.socket(true);
 127         this.fdVal = IOUtil.fdVal(fd);
 128     }
 129 
 130     SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound)
 131         throws IOException
 132     {
 133         super(sp);
 134         this.fd = fd;
 135         this.fdVal = IOUtil.fdVal(fd);
 136         if (bound) {
 137             synchronized (stateLock) {
 138                 this.localAddress = Net.localAddress(fd);


 347 
 348     @Override
 349     public int read(ByteBuffer buf) throws IOException {
 350         Objects.requireNonNull(buf);
 351 
 352         readLock.lock();
 353         try {
 354             boolean blocking = isBlocking();
 355             int n = 0;
 356             try {
 357                 beginRead(blocking);
 358 
 359                 // check if connection has been reset
 360                 if (connectionReset)
 361                     throwConnectionReset();
 362 
 363                 // check if input is shutdown
 364                 if (isInputClosed)
 365                     return IOStatus.EOF;
 366 

 367                 n = IOUtil.read(fd, buf, -1, nd);
 368                 if (blocking) {
 369                     while (IOStatus.okayToRetry(n) && isOpen()) {
 370                         park(Net.POLLIN);
 371                         n = IOUtil.read(fd, buf, -1, nd);
 372                     }
 373                 }
 374             } catch (ConnectionResetException e) {
 375                 connectionReset = true;
 376                 throwConnectionReset();
 377             } finally {
 378                 endRead(blocking, n > 0);
 379                 if (n <= 0 && isInputClosed)
 380                     return IOStatus.EOF;
 381             }
 382             return IOStatus.normalize(n);
 383         } finally {
 384             readLock.unlock();
 385         }
 386     }


 389     public long read(ByteBuffer[] dsts, int offset, int length)
 390         throws IOException
 391     {
 392         Objects.checkFromIndexSize(offset, length, dsts.length);
 393 
 394         readLock.lock();
 395         try {
 396             boolean blocking = isBlocking();
 397             long n = 0;
 398             try {
 399                 beginRead(blocking);
 400 
 401                 // check if connection has been reset
 402                 if (connectionReset)
 403                     throwConnectionReset();
 404 
 405                 // check if input is shutdown
 406                 if (isInputClosed)
 407                     return IOStatus.EOF;
 408 

 409                 n = IOUtil.read(fd, dsts, offset, length, nd);
 410                 if (blocking) {
 411                     while (IOStatus.okayToRetry(n) && isOpen()) {
 412                         park(Net.POLLIN);
 413                         n = IOUtil.read(fd, dsts, offset, length, nd);
 414                     }
 415                 }
 416             } catch (ConnectionResetException e) {
 417                 connectionReset = true;
 418                 throwConnectionReset();
 419             } finally {
 420                 endRead(blocking, n > 0);
 421                 if (n <= 0 && isInputClosed)
 422                     return IOStatus.EOF;
 423             }
 424             return IOStatus.normalize(n);
 425         } finally {
 426             readLock.unlock();
 427         }
 428     }


 464                 writerThread = 0;
 465                 if (state == ST_CLOSING) {
 466                     tryFinishClose();
 467                 }
 468             }
 469             // remove hook for Thread.interrupt
 470             end(completed);
 471         }
 472     }
 473 
 474     @Override
 475     public int write(ByteBuffer buf) throws IOException {
 476         Objects.requireNonNull(buf);
 477 
 478         writeLock.lock();
 479         try {
 480             boolean blocking = isBlocking();
 481             int n = 0;
 482             try {
 483                 beginWrite(blocking);

 484                 n = IOUtil.write(fd, buf, -1, nd);
 485                 if (blocking) {
 486                     while (IOStatus.okayToRetry(n) && isOpen()) {
 487                         park(Net.POLLOUT);
 488                         n = IOUtil.write(fd, buf, -1, nd);
 489                     }
 490                 }
 491             } finally {
 492                 endWrite(blocking, n > 0);
 493                 if (n <= 0 && isOutputClosed)
 494                     throw new AsynchronousCloseException();
 495             }
 496             return IOStatus.normalize(n);
 497         } finally {
 498             writeLock.unlock();
 499         }
 500     }
 501 
 502     @Override
 503     public long write(ByteBuffer[] srcs, int offset, int length)
 504         throws IOException
 505     {
 506         Objects.checkFromIndexSize(offset, length, srcs.length);
 507 
 508         writeLock.lock();
 509         try {
 510             boolean blocking = isBlocking();
 511             long n = 0;
 512             try {
 513                 beginWrite(blocking);

 514                 n = IOUtil.write(fd, srcs, offset, length, nd);
 515                 if (blocking) {
 516                     while (IOStatus.okayToRetry(n) && isOpen()) {
 517                         park(Net.POLLOUT);
 518                         n = IOUtil.write(fd, srcs, offset, length, nd);
 519                     }
 520                 }
 521             } finally {
 522                 endWrite(blocking, n > 0);
 523                 if (n <= 0 && isOutputClosed)
 524                     throw new AsynchronousCloseException();
 525             }
 526             return IOStatus.normalize(n);
 527         } finally {
 528             writeLock.unlock();
 529         }
 530     }
 531 
 532     /**
 533      * Writes a byte of out of band data.
 534      */
 535     int sendOutOfBandData(byte b) throws IOException {
 536         writeLock.lock();
 537         try {
 538             boolean blocking = isBlocking();
 539             int n = 0;
 540             try {
 541                 beginWrite(blocking);
 542                 if (blocking) {
 543                     do {
 544                         n = Net.sendOOB(fd, b);
 545                     } while (n == IOStatus.INTERRUPTED && isOpen());
 546                 } else {
 547                     n = Net.sendOOB(fd, b);



 548                 }
 549             } finally {
 550                 endWrite(blocking, n > 0);
 551                 if (n <= 0 && isOutputClosed)
 552                     throw new AsynchronousCloseException();
 553             }
 554             return IOStatus.normalize(n);
 555         } finally {
 556             writeLock.unlock();
 557         }
 558     }
 559 
 560     @Override
 561     protected void implConfigureBlocking(boolean block) throws IOException {
 562         readLock.lock();
 563         try {
 564             writeLock.lock();
 565             try {
 566                 lockedConfigureBlocking(block);
 567             } finally {
 568                 writeLock.unlock();
 569             }
 570         } finally {
 571             readLock.unlock();
 572         }
 573     }
 574 
 575     /**
 576      * Adjust the blocking mode while holding the readLock or writeLock.
 577      */
 578     private void lockedConfigureBlocking(boolean block) throws IOException {
 579         assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
 580         synchronized (stateLock) {
 581             ensureOpen();
 582             IOUtil.configureBlocking(fd, block);


















 583         }
 584     }
 585 
 586     /**
 587      * Returns the local address, or null if not bound
 588      */
 589     InetSocketAddress localAddress() {
 590         synchronized (stateLock) {
 591             return localAddress;
 592         }
 593     }
 594 
 595     /**
 596      * Returns the remote address, or null if not connected
 597      */
 598     InetSocketAddress remoteAddress() {
 599         synchronized (stateLock) {
 600             return remoteAddress;
 601         }
 602     }


 712         }
 713         if (isa.getAddress().isAnyLocalAddress()) {
 714             return new InetSocketAddress(InetAddress.getLocalHost(), isa.getPort());
 715         } else {
 716             return isa;
 717         }
 718     }
 719 
 720     @Override
 721     public boolean connect(SocketAddress remote) throws IOException {
 722         InetSocketAddress isa = checkRemote(remote);
 723         try {
 724             readLock.lock();
 725             try {
 726                 writeLock.lock();
 727                 try {
 728                     boolean blocking = isBlocking();
 729                     boolean connected = false;
 730                     try {
 731                         beginConnect(blocking, isa);

 732                         int n = Net.connect(fd, isa.getAddress(), isa.getPort());
 733                         if (n > 0) {
 734                             connected = true;
 735                         } else if (blocking) {
 736                             assert IOStatus.okayToRetry(n);
 737                             boolean polled = false;
 738                             while (!polled && isOpen()) {
 739                                 park(Net.POLLOUT);
 740                                 polled = Net.pollConnectNow(fd);
 741                             }
 742                             connected = polled && isOpen();
 743                         }
 744                     } finally {
 745                         endConnect(blocking, connected);
 746                     }
 747                     return connected;
 748                 } finally {
 749                     writeLock.unlock();
 750                 }
 751             } finally {


 871 
 872     /**
 873      * Closes this channel when configured in blocking mode.
 874      *
 875      * If there is an I/O operation in progress then the socket is pre-closed
 876      * and the I/O threads signalled, in which case the final close is deferred
 877      * until all I/O operations complete.
 878      *
 879      * Note that a channel configured blocking may be registered with a Selector
 880      * This arises when a key is canceled and the channel configured to blocking
 881      * mode before the key is flushed from the Selector.
 882      */
 883     private void implCloseBlockingMode() throws IOException {
 884         synchronized (stateLock) {
 885             assert state < ST_CLOSING;
 886             state = ST_CLOSING;
 887             if (!tryClose()) {
 888                 long reader = readerThread;
 889                 long writer = writerThread;
 890                 if (reader != 0 || writer != 0) {



 891                     nd.preClose(fd);
 892                     if (reader != 0)
 893                         NativeThread.signal(reader);
 894                     if (writer != 0)
 895                         NativeThread.signal(writer);
 896                 }
 897             }
 898         }
 899     }
 900 
 901     /**
 902      * Closes this channel when configured in non-blocking mode.
 903      *
 904      * If the channel is registered with a Selector then the close is deferred
 905      * until the channel is flushed from all Selectors.
 906      *
 907      * If the socket is connected and the channel is registered with a Selector
 908      * then the socket is shutdown for writing so that the peer reads EOF. In
 909      * addition, if SO_LINGER is set to a non-zero value then it is disabled so
 910      * that the deferred close does not wait.
 911      */
 912     private void implCloseNonBlockingMode() throws IOException {
 913         boolean connected;
 914         synchronized (stateLock) {


 955         }
 956     }
 957 
 958     @Override
 959     public void kill() {
 960         synchronized (stateLock) {
 961             if (state == ST_CLOSING) {
 962                 tryFinishClose();
 963             }
 964         }
 965     }
 966 
 967     @Override
 968     public SocketChannel shutdownInput() throws IOException {
 969         synchronized (stateLock) {
 970             ensureOpen();
 971             if (!isConnected())
 972                 throw new NotYetConnectedException();
 973             if (!isInputClosed) {
 974                 Net.shutdown(fd, Net.SHUT_RD);
 975                 long thread = readerThread;
 976                 if (thread != 0)
 977                     NativeThread.signal(thread);



 978                 isInputClosed = true;
 979             }
 980             return this;
 981         }
 982     }
 983 
 984     @Override
 985     public SocketChannel shutdownOutput() throws IOException {
 986         synchronized (stateLock) {
 987             ensureOpen();
 988             if (!isConnected())
 989                 throw new NotYetConnectedException();
 990             if (!isOutputClosed) {
 991                 Net.shutdown(fd, Net.SHUT_WR);
 992                 long thread = writerThread;
 993                 if (thread != 0)
 994                     NativeThread.signal(thread);



 995                 isOutputClosed = true;
 996             }
 997             return this;
 998         }
 999     }
1000 
1001     boolean isInputOpen() {
1002         return !isInputClosed;
1003     }
1004 
1005     boolean isOutputOpen() {
1006         return !isOutputClosed;
1007     }
1008 
1009     /**
1010      * Waits for a connection attempt to finish with a timeout
1011      * @throws SocketTimeoutException if the connect timeout elapses
1012      */
1013     private boolean finishTimedConnect(long nanos) throws IOException {
1014         long startNanos = System.nanoTime();


1132 
1133                 // check if connection has been reset
1134                 if (connectionReset)
1135                     throwConnectionReset();
1136 
1137                 // check if input is shutdown
1138                 if (isInputClosed)
1139                     return IOStatus.EOF;
1140 
1141                 if (nanos > 0) {
1142                     // change socket to non-blocking
1143                     lockedConfigureBlocking(false);
1144                     try {
1145                         n = timedRead(b, off, len, nanos);
1146                     } finally {
1147                         // restore socket to blocking mode
1148                         lockedConfigureBlocking(true);
1149                     }
1150                 } else {
1151                     // read, no timeout

1152                     n = tryRead(b, off, len);
1153                     while (IOStatus.okayToRetry(n) && isOpen()) {
1154                         park(Net.POLLIN);
1155                         n = tryRead(b, off, len);
1156                     }
1157                 }
1158             } catch (ConnectionResetException e) {
1159                 connectionReset = true;
1160                 throwConnectionReset();
1161             } finally {
1162                 endRead(true, n > 0);
1163                 if (n <= 0 && isInputClosed)
1164                     return IOStatus.EOF;
1165             }
1166             assert n > 0 || n == -1;
1167             return n;
1168         } finally {
1169             readLock.unlock();
1170         }
1171     }


1191      * @apiNote This method is for use by the socket adaptor.
1192      */
1193     void blockingWriteFully(byte[] b, int off, int len) throws IOException {
1194         Objects.checkFromIndexSize(off, len, b.length);
1195         if (len == 0) {
1196             // nothing to do
1197             return;
1198         }
1199 
1200         writeLock.lock();
1201         try {
1202             // check that channel is configured blocking
1203             if (!isBlocking())
1204                 throw new IllegalBlockingModeException();
1205 
1206             // loop until all bytes have been written
1207             int pos = off;
1208             int end = off + len;
1209             beginWrite(true);
1210             try {

1211                 while (pos < end && isOpen()) {
1212                     int size = end - pos;
1213                     int n = tryWrite(b, pos, size);
1214                     while (IOStatus.okayToRetry(n) && isOpen()) {
1215                         park(Net.POLLOUT);
1216                         n = tryWrite(b, pos, size);
1217                     }
1218                     if (n > 0) {
1219                         pos += n;
1220                     }
1221                 }
1222             } finally {
1223                 endWrite(true, pos >= end);
1224             }
1225         } finally {
1226             writeLock.unlock();
1227         }
1228     }
1229 
1230     /**




  38 import java.net.StandardProtocolFamily;
  39 import java.net.StandardSocketOptions;
  40 import java.nio.ByteBuffer;
  41 import java.nio.channels.AlreadyBoundException;
  42 import java.nio.channels.AlreadyConnectedException;
  43 import java.nio.channels.AsynchronousCloseException;
  44 import java.nio.channels.ClosedChannelException;
  45 import java.nio.channels.ConnectionPendingException;
  46 import java.nio.channels.IllegalBlockingModeException;
  47 import java.nio.channels.NoConnectionPendingException;
  48 import java.nio.channels.NotYetConnectedException;
  49 import java.nio.channels.SelectionKey;
  50 import java.nio.channels.SocketChannel;
  51 import java.nio.channels.spi.SelectorProvider;
  52 import java.util.Collections;
  53 import java.util.HashSet;
  54 import java.util.Objects;
  55 import java.util.Set;
  56 import java.util.concurrent.locks.ReentrantLock;
  57 
  58 import jdk.internal.misc.Strands;
  59 import sun.net.ConnectionResetException;
  60 import sun.net.NetHooks;
  61 import sun.net.ext.ExtendedSocketOptions;
  62 import sun.net.util.SocketExceptions;
  63 
  64 /**
  65  * An implementation of SocketChannels
  66  */
  67 
  68 class SocketChannelImpl
  69     extends SocketChannel
  70     implements SelChImpl
  71 {
  72     // Used to make native read and write calls
  73     private static final NativeDispatcher nd = new SocketDispatcher();
  74 
  75     // Our file descriptor object
  76     private final FileDescriptor fd;
  77     private final int fdVal;
  78 


 100 
 101     // State, increases monotonically
 102     private static final int ST_UNCONNECTED = 0;
 103     private static final int ST_CONNECTIONPENDING = 1;
 104     private static final int ST_CONNECTED = 2;
 105     private static final int ST_CLOSING = 3;
 106     private static final int ST_CLOSED = 4;
 107     private volatile int state;  // need stateLock to change
 108 
 109     // IDs of native threads doing reads and writes, for signalling
 110     private long readerThread;
 111     private long writerThread;
 112 
 113     // Binding
 114     private InetSocketAddress localAddress;
 115     private InetSocketAddress remoteAddress;
 116 
 117     // Socket adaptor, created on demand
 118     private Socket socket;
 119 
 120     // lazily set to true when the socket is configured non-blocking
 121     private volatile boolean nonBlocking;
 122 
 123     // -- End of fields protected by stateLock
 124 
 125 
 126     // Constructor for normal connecting sockets
 127     //
 128     SocketChannelImpl(SelectorProvider sp) throws IOException {
 129         super(sp);
 130         this.fd = Net.socket(true);
 131         this.fdVal = IOUtil.fdVal(fd);
 132     }
 133 
 134     SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound)
 135         throws IOException
 136     {
 137         super(sp);
 138         this.fd = fd;
 139         this.fdVal = IOUtil.fdVal(fd);
 140         if (bound) {
 141             synchronized (stateLock) {
 142                 this.localAddress = Net.localAddress(fd);


 351 
 352     @Override
 353     public int read(ByteBuffer buf) throws IOException {
 354         Objects.requireNonNull(buf);
 355 
 356         readLock.lock();
 357         try {
 358             boolean blocking = isBlocking();
 359             int n = 0;
 360             try {
 361                 beginRead(blocking);
 362 
 363                 // check if connection has been reset
 364                 if (connectionReset)
 365                     throwConnectionReset();
 366 
 367                 // check if input is shutdown
 368                 if (isInputClosed)
 369                     return IOStatus.EOF;
 370 
 371                 lockedConfigureNonBlockingIfFiber();
 372                 n = IOUtil.read(fd, buf, -1, nd);
 373                 if (blocking) {
 374                     while (IOStatus.okayToRetry(n) && isOpen()) {
 375                         park(Net.POLLIN);
 376                         n = IOUtil.read(fd, buf, -1, nd);
 377                     }
 378                 }
 379             } catch (ConnectionResetException e) {
 380                 connectionReset = true;
 381                 throwConnectionReset();
 382             } finally {
 383                 endRead(blocking, n > 0);
 384                 if (n <= 0 && isInputClosed)
 385                     return IOStatus.EOF;
 386             }
 387             return IOStatus.normalize(n);
 388         } finally {
 389             readLock.unlock();
 390         }
 391     }


 394     public long read(ByteBuffer[] dsts, int offset, int length)
 395         throws IOException
 396     {
 397         Objects.checkFromIndexSize(offset, length, dsts.length);
 398 
 399         readLock.lock();
 400         try {
 401             boolean blocking = isBlocking();
 402             long n = 0;
 403             try {
 404                 beginRead(blocking);
 405 
 406                 // check if connection has been reset
 407                 if (connectionReset)
 408                     throwConnectionReset();
 409 
 410                 // check if input is shutdown
 411                 if (isInputClosed)
 412                     return IOStatus.EOF;
 413 
 414                 lockedConfigureNonBlockingIfFiber();
 415                 n = IOUtil.read(fd, dsts, offset, length, nd);
 416                 if (blocking) {
 417                     while (IOStatus.okayToRetry(n) && isOpen()) {
 418                         park(Net.POLLIN);
 419                         n = IOUtil.read(fd, dsts, offset, length, nd);
 420                     }
 421                 }
 422             } catch (ConnectionResetException e) {
 423                 connectionReset = true;
 424                 throwConnectionReset();
 425             } finally {
 426                 endRead(blocking, n > 0);
 427                 if (n <= 0 && isInputClosed)
 428                     return IOStatus.EOF;
 429             }
 430             return IOStatus.normalize(n);
 431         } finally {
 432             readLock.unlock();
 433         }
 434     }


 470                 writerThread = 0;
 471                 if (state == ST_CLOSING) {
 472                     tryFinishClose();
 473                 }
 474             }
 475             // remove hook for Thread.interrupt
 476             end(completed);
 477         }
 478     }
 479 
 480     @Override
 481     public int write(ByteBuffer buf) throws IOException {
 482         Objects.requireNonNull(buf);
 483 
 484         writeLock.lock();
 485         try {
 486             boolean blocking = isBlocking();
 487             int n = 0;
 488             try {
 489                 beginWrite(blocking);
 490                 lockedConfigureNonBlockingIfFiber();
 491                 n = IOUtil.write(fd, buf, -1, nd);
 492                 if (blocking) {
 493                     while (IOStatus.okayToRetry(n) && isOpen()) {
 494                         park(Net.POLLOUT);
 495                         n = IOUtil.write(fd, buf, -1, nd);
 496                     }
 497                 }
 498             } finally {
 499                 endWrite(blocking, n > 0);
 500                 if (n <= 0 && isOutputClosed)
 501                     throw new AsynchronousCloseException();
 502             }
 503             return IOStatus.normalize(n);
 504         } finally {
 505             writeLock.unlock();
 506         }
 507     }
 508 
 509     @Override
 510     public long write(ByteBuffer[] srcs, int offset, int length)
 511         throws IOException
 512     {
 513         Objects.checkFromIndexSize(offset, length, srcs.length);
 514 
 515         writeLock.lock();
 516         try {
 517             boolean blocking = isBlocking();
 518             long n = 0;
 519             try {
 520                 beginWrite(blocking);
 521                 lockedConfigureNonBlockingIfFiber();
 522                 n = IOUtil.write(fd, srcs, offset, length, nd);
 523                 if (blocking) {
 524                     while (IOStatus.okayToRetry(n) && isOpen()) {
 525                         park(Net.POLLOUT);
 526                         n = IOUtil.write(fd, srcs, offset, length, nd);
 527                     }
 528                 }
 529             } finally {
 530                 endWrite(blocking, n > 0);
 531                 if (n <= 0 && isOutputClosed)
 532                     throw new AsynchronousCloseException();
 533             }
 534             return IOStatus.normalize(n);
 535         } finally {
 536             writeLock.unlock();
 537         }
 538     }
 539 
 540     /**
 541      * Writes a byte of out of band data.
 542      */
 543     int sendOutOfBandData(byte b) throws IOException {
 544         writeLock.lock();
 545         try {
 546             boolean blocking = isBlocking();
 547             int n = 0;
 548             try {
 549                 beginWrite(blocking);
 550                 lockedConfigureNonBlockingIfFiber();
 551                 do {



 552                     n = Net.sendOOB(fd, b);
 553                 } while (n == IOStatus.INTERRUPTED && isOpen());
 554                 if (blocking && n == IOStatus.UNAVAILABLE) {
 555                     throw new SocketException("No buffer space available");
 556                 }
 557             } finally {
 558                 endWrite(blocking, n > 0);
 559                 if (n <= 0 && isOutputClosed)
 560                     throw new AsynchronousCloseException();
 561             }
 562             return IOStatus.normalize(n);
 563         } finally {
 564             writeLock.unlock();
 565         }
 566     }
 567 
 568     @Override
 569     protected void implConfigureBlocking(boolean block) throws IOException {
 570         readLock.lock();
 571         try {
 572             writeLock.lock();
 573             try {
 574                 lockedConfigureBlocking(block);
 575             } finally {
 576                 writeLock.unlock();
 577             }
 578         } finally {
 579             readLock.unlock();
 580         }
 581     }
 582 
 583     /**
 584      * Adjust the blocking mode while holding readLock or writeLock.
 585      */
 586     private void lockedConfigureBlocking(boolean block) throws IOException {
 587         assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
 588         synchronized (stateLock) {
 589             ensureOpen();
 590             // do nothing if fiber has forced the socket to be non-blocking
 591             if (!nonBlocking) {
 592                 IOUtil.configureBlocking(fd, block);
 593             }
 594         }
 595     }
 596 
 597     /**
 598      * Ensures that the socket is configured non-blocking when the current
 599      * strand is a fiber.
 600      */
 601     private void lockedConfigureNonBlockingIfFiber() throws IOException {
 602         assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
 603         if (!nonBlocking && (Strands.currentStrand() instanceof Fiber)) {
 604             synchronized (stateLock) {
 605                 ensureOpen();
 606                 IOUtil.configureBlocking(fd, false);
 607                 nonBlocking = true;
 608             }
 609         }
 610     }
 611 
 612     /**
 613      * Returns the local address, or null if not bound
 614      */
 615     InetSocketAddress localAddress() {
 616         synchronized (stateLock) {
 617             return localAddress;
 618         }
 619     }
 620 
 621     /**
 622      * Returns the remote address, or null if not connected
 623      */
 624     InetSocketAddress remoteAddress() {
 625         synchronized (stateLock) {
 626             return remoteAddress;
 627         }
 628     }


 738         }
 739         if (isa.getAddress().isAnyLocalAddress()) {
 740             return new InetSocketAddress(InetAddress.getLocalHost(), isa.getPort());
 741         } else {
 742             return isa;
 743         }
 744     }
 745 
 746     @Override
 747     public boolean connect(SocketAddress remote) throws IOException {
 748         InetSocketAddress isa = checkRemote(remote);
 749         try {
 750             readLock.lock();
 751             try {
 752                 writeLock.lock();
 753                 try {
 754                     boolean blocking = isBlocking();
 755                     boolean connected = false;
 756                     try {
 757                         beginConnect(blocking, isa);
 758                         lockedConfigureNonBlockingIfFiber();
 759                         int n = Net.connect(fd, isa.getAddress(), isa.getPort());
 760                         if (n > 0) {
 761                             connected = true;
 762                         } else if (blocking) {
 763                             assert IOStatus.okayToRetry(n);
 764                             boolean polled = false;
 765                             while (!polled && isOpen()) {
 766                                 park(Net.POLLOUT);
 767                                 polled = Net.pollConnectNow(fd);
 768                             }
 769                             connected = polled && isOpen();
 770                         }
 771                     } finally {
 772                         endConnect(blocking, connected);
 773                     }
 774                     return connected;
 775                 } finally {
 776                     writeLock.unlock();
 777                 }
 778             } finally {


 898 
 899     /**
 900      * Closes this channel when configured in blocking mode.
 901      *
 902      * If there is an I/O operation in progress then the socket is pre-closed
 903      * and the I/O threads signalled, in which case the final close is deferred
 904      * until all I/O operations complete.
 905      *
 906      * Note that a channel configured blocking may be registered with a Selector
 907      * This arises when a key is canceled and the channel configured to blocking
 908      * mode before the key is flushed from the Selector.
 909      */
 910     private void implCloseBlockingMode() throws IOException {
 911         synchronized (stateLock) {
 912             assert state < ST_CLOSING;
 913             state = ST_CLOSING;
 914             if (!tryClose()) {
 915                 long reader = readerThread;
 916                 long writer = writerThread;
 917                 if (reader != 0 || writer != 0) {
 918                     if (NativeThread.isFiber(reader) || NativeThread.isFiber(writer)) {
 919                         Poller.stopPoll(fdVal);
 920                     }
 921                     nd.preClose(fd);
 922                     if (NativeThread.isKernelThread(reader))
 923                         NativeThread.signal(reader);
 924                     if (NativeThread.isKernelThread(writer))
 925                         NativeThread.signal(writer);
 926                 }
 927             }
 928         }
 929     }
 930 
 931     /**
 932      * Closes this channel when configured in non-blocking mode.
 933      *
 934      * If the channel is registered with a Selector then the close is deferred
 935      * until the channel is flushed from all Selectors.
 936      *
 937      * If the socket is connected and the channel is registered with a Selector
 938      * then the socket is shutdown for writing so that the peer reads EOF. In
 939      * addition, if SO_LINGER is set to a non-zero value then it is disabled so
 940      * that the deferred close does not wait.
 941      */
 942     private void implCloseNonBlockingMode() throws IOException {
 943         boolean connected;
 944         synchronized (stateLock) {


 985         }
 986     }
 987 
 988     @Override
 989     public void kill() {
 990         synchronized (stateLock) {
 991             if (state == ST_CLOSING) {
 992                 tryFinishClose();
 993             }
 994         }
 995     }
 996 
 997     @Override
 998     public SocketChannel shutdownInput() throws IOException {
 999         synchronized (stateLock) {
1000             ensureOpen();
1001             if (!isConnected())
1002                 throw new NotYetConnectedException();
1003             if (!isInputClosed) {
1004                 Net.shutdown(fd, Net.SHUT_RD);
1005                 long reader = readerThread;
1006                 if (NativeThread.isFiber(reader)) {
1007                     Poller.stopPoll(fdVal, Net.POLLIN);
1008                 } else if (NativeThread.isKernelThread(reader)) {
1009                     NativeThread.signal(reader);
1010                 }
1011                 isInputClosed = true;
1012             }
1013             return this;
1014         }
1015     }
1016 
1017     @Override
1018     public SocketChannel shutdownOutput() throws IOException {
1019         synchronized (stateLock) {
1020             ensureOpen();
1021             if (!isConnected())
1022                 throw new NotYetConnectedException();
1023             if (!isOutputClosed) {
1024                 Net.shutdown(fd, Net.SHUT_WR);
1025                 long writer = writerThread;
1026                 if (NativeThread.isFiber(writer)) {
1027                     Poller.stopPoll(fdVal, Net.POLLOUT);
1028                 } else if (NativeThread.isKernelThread(writer)) {
1029                     NativeThread.signal(writer);
1030                 }
1031                 isOutputClosed = true;
1032             }
1033             return this;
1034         }
1035     }
1036 
1037     boolean isInputOpen() {
1038         return !isInputClosed;
1039     }
1040 
1041     boolean isOutputOpen() {
1042         return !isOutputClosed;
1043     }
1044 
1045     /**
1046      * Waits for a connection attempt to finish with a timeout
1047      * @throws SocketTimeoutException if the connect timeout elapses
1048      */
1049     private boolean finishTimedConnect(long nanos) throws IOException {
1050         long startNanos = System.nanoTime();


1168 
1169                 // check if connection has been reset
1170                 if (connectionReset)
1171                     throwConnectionReset();
1172 
1173                 // check if input is shutdown
1174                 if (isInputClosed)
1175                     return IOStatus.EOF;
1176 
1177                 if (nanos > 0) {
1178                     // change socket to non-blocking
1179                     lockedConfigureBlocking(false);
1180                     try {
1181                         n = timedRead(b, off, len, nanos);
1182                     } finally {
1183                         // restore socket to blocking mode
1184                         lockedConfigureBlocking(true);
1185                     }
1186                 } else {
1187                     // read, no timeout
1188                     lockedConfigureNonBlockingIfFiber();
1189                     n = tryRead(b, off, len);
1190                     while (IOStatus.okayToRetry(n) && isOpen()) {
1191                         park(Net.POLLIN);
1192                         n = tryRead(b, off, len);
1193                     }
1194                 }
1195             } catch (ConnectionResetException e) {
1196                 connectionReset = true;
1197                 throwConnectionReset();
1198             } finally {
1199                 endRead(true, n > 0);
1200                 if (n <= 0 && isInputClosed)
1201                     return IOStatus.EOF;
1202             }
1203             assert n > 0 || n == -1;
1204             return n;
1205         } finally {
1206             readLock.unlock();
1207         }
1208     }


1228      * @apiNote This method is for use by the socket adaptor.
1229      */
1230     void blockingWriteFully(byte[] b, int off, int len) throws IOException {
1231         Objects.checkFromIndexSize(off, len, b.length);
1232         if (len == 0) {
1233             // nothing to do
1234             return;
1235         }
1236 
1237         writeLock.lock();
1238         try {
1239             // check that channel is configured blocking
1240             if (!isBlocking())
1241                 throw new IllegalBlockingModeException();
1242 
1243             // loop until all bytes have been written
1244             int pos = off;
1245             int end = off + len;
1246             beginWrite(true);
1247             try {
1248                 lockedConfigureNonBlockingIfFiber();
1249                 while (pos < end && isOpen()) {
1250                     int size = end - pos;
1251                     int n = tryWrite(b, pos, size);
1252                     while (IOStatus.okayToRetry(n) && isOpen()) {
1253                         park(Net.POLLOUT);
1254                         n = tryWrite(b, pos, size);
1255                     }
1256                     if (n > 0) {
1257                         pos += n;
1258                     }
1259                 }
1260             } finally {
1261                 endWrite(true, pos >= end);
1262             }
1263         } finally {
1264             writeLock.unlock();
1265         }
1266     }
1267 
1268     /**


< prev index next >