< prev index next >

src/java.base/share/classes/sun/nio/ch/NioSocketImpl.java

Print this page

  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.io.FileDescriptor;
  29 import java.io.IOException;
  30 import java.io.InputStream;

  31 import java.io.OutputStream;
  32 import java.io.UncheckedIOException;
  33 import java.lang.ref.Cleaner.Cleanable;
  34 import java.net.InetAddress;
  35 import java.net.InetSocketAddress;
  36 import java.net.ProtocolFamily;
  37 import java.net.SocketAddress;
  38 import java.net.SocketException;
  39 import java.net.SocketImpl;
  40 import java.net.SocketOption;
  41 import java.net.SocketTimeoutException;
  42 import java.net.StandardProtocolFamily;
  43 import java.net.StandardSocketOptions;
  44 import java.net.UnknownHostException;
  45 import java.nio.ByteBuffer;
  46 import java.util.Collections;
  47 import java.util.HashSet;
  48 import java.util.Objects;
  49 import java.util.Set;
  50 import java.util.concurrent.TimeUnit;
  51 import java.util.concurrent.locks.ReentrantLock;
  52 

  53 import jdk.internal.ref.CleanerFactory;
  54 import sun.net.ConnectionResetException;
  55 import sun.net.NetHooks;
  56 import sun.net.PlatformSocketImpl;
  57 import sun.net.ResourceManager;
  58 import sun.net.ext.ExtendedSocketOptions;
  59 import sun.net.util.SocketExceptions;
  60 
  61 import static java.util.concurrent.TimeUnit.MILLISECONDS;
  62 import static java.util.concurrent.TimeUnit.NANOSECONDS;
  63 
  64 /**
  65  * NIO based SocketImpl.
  66  *
  67  * The underlying socket used by this SocketImpl is initially configured
  68  * blocking. If the connect method is used to establish a connection with a
  69  * timeout then the socket is configured non-blocking for the connect attempt,
  70  * and then restored to blocking mode when the connection is established.
  71  * If the accept or read methods are used with a timeout then the socket is
  72  * configured non-blocking and is never restored. When in non-blocking mode,
  73  * operations that don't complete immediately will poll the socket and preserve
  74  * the semantics of blocking operations.
  75  */
  76 
  77 public final class NioSocketImpl extends SocketImpl implements PlatformSocketImpl {
  78     private static final NativeDispatcher nd = new SocketDispatcher();
  79 
  80     // The maximum number of bytes to read/write per syscall to avoid needing
  81     // a huge buffer from the temporary buffer cache
  82     private static final int MAX_BUFFER_SIZE = 128 * 1024;
  83 
  84     // true if this is a SocketImpl for a ServerSocket
  85     private final boolean server;
  86 
  87     // Lock held when reading (also used when accepting or connecting)
  88     private final ReentrantLock readLock = new ReentrantLock();
  89 
  90     // Lock held when writing
  91     private final ReentrantLock writeLock = new ReentrantLock();
  92 
  93     // The stateLock for read/changing state
  94     private final Object stateLock = new Object();
  95     private static final int ST_NEW = 0;
  96     private static final int ST_UNCONNECTED = 1;
  97     private static final int ST_CONNECTING = 2;
  98     private static final int ST_CONNECTED = 3;
  99     private static final int ST_CLOSING = 4;
 100     private static final int ST_CLOSED = 5;
 101     private volatile int state;  // need stateLock to change
 102 



 103     // set by SocketImpl.create, protected by stateLock
 104     private boolean stream;
 105     private Cleanable cleaner;
 106 
 107     // set to true when the socket is in non-blocking mode
 108     private volatile boolean nonBlocking;
 109 
 110     // used by connect/read/write/accept, protected by stateLock
 111     private long readerThread;
 112     private long writerThread;
 113 
 114     // used when SO_REUSEADDR is emulated, protected by stateLock
 115     private boolean isReuseAddress;
 116 
 117     // read or accept timeout in millis
 118     private volatile int timeout;
 119 
 120     // flags to indicate if the connection is shutdown for input and output
 121     private volatile boolean isInputClosed;
 122     private volatile boolean isOutputClosed;

 146     private void ensureOpen() throws SocketException {
 147         int state = this.state;
 148         if (state == ST_NEW)
 149             throw new SocketException("Socket not created");
 150         if (state >= ST_CLOSING)
 151             throw new SocketException("Socket closed");
 152     }
 153 
 154     /**
 155      * Throws SocketException if the socket is not open and connected.
 156      */
 157     private void ensureOpenAndConnected() throws SocketException {
 158         int state = this.state;
 159         if (state < ST_CONNECTED)
 160             throw new SocketException("Not connected");
 161         if (state > ST_CONNECTED)
 162             throw new SocketException("Socket closed");
 163     }
 164 
 165     /**
 166      * Disables the current thread for scheduling purposes until the socket is
 167      * ready for I/O, or is asynchronously closed, for up to the specified
 168      * waiting time.
 169      * @throws IOException if an I/O error occurs
 170      */
 171     private void park(FileDescriptor fd, int event, long nanos) throws IOException {
 172         long millis;
 173         if (nanos == 0) {
 174             millis = -1;














 175         } else {
 176             millis = NANOSECONDS.toMillis(nanos);






 177         }
 178         Net.poll(fd, event, millis);
 179     }
 180 
 181     /**
 182      * Disables the current thread for scheduling purposes until the socket is
 183      * ready for I/O or is asynchronously closed.
 184      * @throws IOException if an I/O error occurs
 185      */
 186     private void park(FileDescriptor fd, int event) throws IOException {
 187         park(fd, event, 0);
 188     }
 189 
 190     /**
 191      * Configures the socket to blocking mode. This method is a no-op if the
 192      * socket is already in blocking mode.
 193      * @throws IOException if closed or there is an I/O error changing the mode
 194      */
 195     private void configureBlocking(FileDescriptor fd) throws IOException {
 196         assert readLock.isHeldByCurrentThread();
 197         if (nonBlocking) {
 198             synchronized (stateLock) {
 199                 ensureOpen();
 200                 IOUtil.configureBlocking(fd, true);
 201                 nonBlocking = false;
 202             }
 203         }
 204     }
 205 
 206     /**
 207      * Configures the socket to non-blocking mode. This method is a no-op if the
 208      * socket is already in non-blocking mode.
 209      * @throws IOException if closed or there is an I/O error changing the mode
 210      */
 211     private void configureNonBlocking(FileDescriptor fd) throws IOException {
 212         assert readLock.isHeldByCurrentThread();
 213         if (!nonBlocking) {
 214             synchronized (stateLock) {
 215                 ensureOpen();
 216                 IOUtil.configureBlocking(fd, false);
 217                 nonBlocking = true;
 218             }
 219         }
 220     }
 221 
 222     /**
 223      * Marks the beginning of a read operation that might block.
 224      * @throws SocketException if the socket is closed or not connected
 225      */
 226     private FileDescriptor beginRead() throws SocketException {
 227         synchronized (stateLock) {
 228             ensureOpenAndConnected();
 229             readerThread = NativeThread.current();
 230             return fd;
 231         }
 232     }
 233 
 234     /**
 235      * Marks the end of a read operation that may have blocked.
 236      * @throws SocketException is the socket is closed
 237      */
 238     private void endRead(boolean completed) throws SocketException {

 283             n = tryRead(fd, b, off, len);
 284         }
 285         return n;
 286     }
 287 
 288     /**
 289      * Reads bytes from the socket into the given byte array.
 290      * @return the number of bytes read or -1 at EOF
 291      * @throws SocketException if the socket is closed or a socket I/O error occurs
 292      * @throws SocketTimeoutException if the read timeout elapses
 293      */
 294     private int implRead(byte[] b, int off, int len) throws IOException {
 295         int n = 0;
 296         FileDescriptor fd = beginRead();
 297         try {
 298             if (connectionReset)
 299                 throw new SocketException("Connection reset");
 300             if (isInputClosed)
 301                 return -1;
 302             int timeout = this.timeout;

 303             if (timeout > 0) {
 304                 // read with timeout
 305                 configureNonBlocking(fd);
 306                 n = timedRead(fd, b, off, len, MILLISECONDS.toNanos(timeout));
 307             } else {
 308                 // read, no timeout
 309                 n = tryRead(fd, b, off, len);
 310                 while (IOStatus.okayToRetry(n) && isOpen()) {
 311                     park(fd, Net.POLLIN);
 312                     n = tryRead(fd, b, off, len);
 313                 }
 314             }
 315             return n;
 316         } catch (SocketTimeoutException e) {
 317             throw e;
 318         } catch (ConnectionResetException e) {
 319             connectionReset = true;
 320             throw new SocketException("Connection reset");
 321         } catch (IOException ioe) {
 322             throw new SocketException(ioe.getMessage());
 323         } finally {
 324             endRead(n > 0);
 325         }
 326     }
 327 
 328     /**
 329      * Reads bytes from the socket into the given byte array.
 330      * @return the number of bytes read or -1 at EOF
 331      * @throws IndexOutOfBoundsException if the bound checks fail
 332      * @throws SocketException if the socket is closed or a socket I/O error occurs
 333      * @throws SocketTimeoutException if the read timeout elapses
 334      */
 335     private int read(byte[] b, int off, int len) throws IOException {
 336         Objects.checkFromIndexSize(off, len, b.length);

 390     {
 391         ByteBuffer src = Util.getTemporaryDirectBuffer(len);
 392         assert src.position() == 0;
 393         try {
 394             src.put(b, off, len);
 395             return nd.write(fd, ((DirectBuffer)src).address(), len);
 396         } finally {
 397             Util.offerFirstTemporaryDirectBuffer(src);
 398         }
 399     }
 400 
 401     /**
 402      * Writes a sequence of bytes to the socket from the given byte array.
 403      * @return the number of bytes written
 404      * @throws SocketException if the socket is closed or a socket I/O error occurs
 405      */
 406     private int implWrite(byte[] b, int off, int len) throws IOException {
 407         int n = 0;
 408         FileDescriptor fd = beginWrite();
 409         try {

 410             n = tryWrite(fd, b, off, len);
 411             while (IOStatus.okayToRetry(n) && isOpen()) {
 412                 park(fd, Net.POLLOUT);
 413                 n = tryWrite(fd, b, off, len);
 414             }
 415             return n;


 416         } catch (IOException ioe) {
 417             throw new SocketException(ioe.getMessage());
 418         } finally {
 419             endWrite(n > 0);
 420         }
 421     }
 422 
 423     /**
 424      * Writes a sequence of bytes to the socket from the given byte array.
 425      * @throws SocketException if the socket is closed or a socket I/O error occurs
 426      */
 427     private void write(byte[] b, int off, int len) throws IOException {
 428         Objects.checkFromIndexSize(off, len, b.length);
 429         if (len > 0) {
 430             writeLock.lock();
 431             try {
 432                 int pos = off;
 433                 int end = off + len;
 434                 while (pos < end) {
 435                     // write up to MAX_BUFFER_SIZE bytes

 452         synchronized (stateLock) {
 453             if (state != ST_NEW)
 454                 throw new IOException("Already created");
 455             if (!stream)
 456                 ResourceManager.beforeUdpCreate();
 457             FileDescriptor fd;
 458             try {
 459                 if (server) {
 460                     assert stream;
 461                     fd = Net.serverSocket(true);
 462                 } else {
 463                     fd = Net.socket(stream);
 464                 }
 465             } catch (IOException ioe) {
 466                 if (!stream)
 467                     ResourceManager.afterUdpClose();
 468                 throw ioe;
 469             }
 470             Runnable closer = closerFor(fd, stream);
 471             this.fd = fd;

 472             this.stream = stream;
 473             this.cleaner = CleanerFactory.cleaner().register(this, closer);
 474             this.state = ST_UNCONNECTED;
 475         }
 476     }
 477 
 478     /**
 479      * Marks the beginning of a connect operation that might block.
 480      * @throws SocketException if the socket is closed or already connected
 481      */
 482     private FileDescriptor beginConnect(InetAddress address, int port)
 483         throws IOException
 484     {
 485         synchronized (stateLock) {
 486             int state = this.state;
 487             if (state != ST_UNCONNECTED) {
 488                 if (state == ST_NEW)
 489                     throw new SocketException("Not created");
 490                 if (state == ST_CONNECTING)
 491                     throw new SocketException("Connection in progress");

 559         // SocketImpl connect only specifies IOException
 560         if (!(remote instanceof InetSocketAddress))
 561             throw new IOException("Unsupported address type");
 562         InetSocketAddress isa = (InetSocketAddress) remote;
 563         if (isa.isUnresolved()) {
 564             throw new UnknownHostException(isa.getHostName());
 565         }
 566 
 567         InetAddress address = isa.getAddress();
 568         if (address.isAnyLocalAddress())
 569             address = InetAddress.getLocalHost();
 570         int port = isa.getPort();
 571 
 572         ReentrantLock connectLock = readLock;
 573         try {
 574             connectLock.lock();
 575             try {
 576                 boolean connected = false;
 577                 FileDescriptor fd = beginConnect(address, port);
 578                 try {
 579 
 580                     // configure socket to non-blocking mode when there is a timeout
 581                     if (millis > 0) {
 582                         configureNonBlocking(fd);
 583                     }
 584 
 585                     int n = Net.connect(fd, address, port);
 586                     if (n > 0) {
 587                         // connection established
 588                         connected = true;
 589                     } else {
 590                         assert IOStatus.okayToRetry(n);
 591                         if (millis > 0) {
 592                             // finish connect with timeout
 593                             long nanos = MILLISECONDS.toNanos(millis);
 594                             connected = timedFinishConnect(fd, nanos);
 595                         } else {
 596                             // finish connect, no timeout
 597                             boolean polled = false;
 598                             while (!polled && isOpen()) {
 599                                 park(fd, Net.POLLOUT);
 600                                 polled = Net.pollConnectNow(fd);
 601                             }
 602                             connected = polled && isOpen();
 603                         }
 604                     }
 605 
 606                     // restore socket to blocking mode
 607                     if (connected && millis > 0) {
 608                         configureBlocking(fd);
 609                     }
 610 
 611                 } finally {
 612                     endConnect(fd, connected);
 613                 }
 614             } finally {
 615                 connectLock.unlock();
 616             }
 617         } catch (IOException ioe) {
 618             close();
 619             throw SocketExceptions.of(ioe, isa);




 620         }
 621     }
 622 
 623     @Override
 624     protected void connect(String host, int port) throws IOException {
 625         connect(new InetSocketAddress(host, port), 0);
 626     }
 627 
 628     @Override
 629     protected void connect(InetAddress address, int port) throws IOException {
 630         connect(new InetSocketAddress(address, port), 0);
 631     }
 632 
 633     @Override
 634     protected void bind(InetAddress host, int port) throws IOException {
 635         synchronized (stateLock) {
 636             ensureOpen();
 637             if (localport != 0)
 638                 throw new SocketException("Already bound");
 639             NetHooks.beforeTcpBind(fd, host, port);

 726         // acquire the lock, adjusting the timeout for cases where several
 727         // threads are accepting connections and there is a timeout set
 728         ReentrantLock acceptLock = readLock;
 729         int timeout = this.timeout;
 730         long remainingNanos = 0;
 731         if (timeout > 0) {
 732             remainingNanos = tryLock(acceptLock, timeout, MILLISECONDS);
 733             if (remainingNanos <= 0) {
 734                 assert !acceptLock.isHeldByCurrentThread();
 735                 throw new SocketTimeoutException("Accept timed out");
 736             }
 737         } else {
 738             acceptLock.lock();
 739         }
 740 
 741         // accept a connection
 742         try {
 743             int n = 0;
 744             FileDescriptor fd = beginAccept();
 745             try {

 746                 if (remainingNanos > 0) {
 747                     // accept with timeout
 748                     configureNonBlocking(fd);
 749                     n = timedAccept(fd, newfd, isaa, remainingNanos);
 750                 } else {
 751                     // accept, no timeout
 752                     n = Net.accept(fd, newfd, isaa);
 753                     while (IOStatus.okayToRetry(n) && isOpen()) {
 754                         park(fd, Net.POLLIN);
 755                         n = Net.accept(fd, newfd, isaa);
 756                     }
 757                 }
 758             } finally {
 759                 endAccept(n > 0);
 760                 assert IOStatus.check(n);
 761             }
 762         } finally {
 763             acceptLock.unlock();
 764         }
 765 
 766         // get local address and configure accepted socket to blocking mode
 767         InetSocketAddress localAddress;
 768         try {
 769             localAddress = Net.localAddress(newfd);
 770             IOUtil.configureBlocking(newfd, true);
 771         } catch (IOException ioe) {
 772             nd.close(newfd);
 773             throw ioe;
 774         }
 775 
 776         // set the fields
 777         Runnable closer = closerFor(newfd, true);
 778         synchronized (nsi.stateLock) {
 779             nsi.fd = newfd;

 780             nsi.stream = true;
 781             nsi.cleaner = CleanerFactory.cleaner().register(nsi, closer);
 782             nsi.localport = localAddress.getPort();
 783             nsi.address = isaa[0].getAddress();
 784             nsi.port = isaa[0].getPort();
 785             nsi.state = ST_CONNECTED;
 786         }
 787     }
 788 
 789     @Override
 790     protected InputStream getInputStream() {
 791         return new InputStream() {
 792             @Override
 793             public int read() throws IOException {
 794                 byte[] a = new byte[1];
 795                 int n = read(a, 0, 1);
 796                 return (n > 0) ? (a[0] & 0xff) : -1;
 797             }
 798             @Override
 799             public int read(byte[] b, int off, int len) throws IOException {

 870             tryClose();
 871         } catch (IOException ignore) { }
 872     }
 873 
 874     /**
 875      * Closes the socket. If there are I/O operations in progress then the
 876      * socket is pre-closed and the threads are signalled. The socket will be
 877      * closed when the last I/O operation aborts.
 878      */
 879     @Override
 880     protected void close() throws IOException {
 881         synchronized (stateLock) {
 882             int state = this.state;
 883             if (state >= ST_CLOSING)
 884                 return;
 885             if (state == ST_NEW) {
 886                 // stillborn
 887                 this.state = ST_CLOSED;
 888                 return;
 889             }

 890             this.state = ST_CLOSING;
 891 
 892             // shutdown output when linger interval not set to 0
 893             try {
 894                 var SO_LINGER = StandardSocketOptions.SO_LINGER;
 895                 if ((int) Net.getSocketOption(fd, SO_LINGER) != 0) {
 896                     Net.shutdown(fd, Net.SHUT_WR);
 897                 }
 898             } catch (IOException ignore) { }


 899 
 900             // attempt to close the socket. If there are I/O operations in progress
 901             // then the socket is pre-closed and the thread(s) signalled. The
 902             // last thread will close the file descriptor.
 903             if (!tryClose()) {
 904                 nd.preClose(fd);
 905                 long reader = readerThread;
 906                 if (reader != 0)
 907                     NativeThread.signal(reader);
 908                 long writer = writerThread;
 909                 if (writer != 0)
 910                     NativeThread.signal(writer);










 911             }
 912         }
 913     }
 914 
 915     // the socket options supported by client and server sockets
 916     private static volatile Set<SocketOption<?>> clientSocketOptions;
 917     private static volatile Set<SocketOption<?>> serverSocketOptions;
 918 
 919     @Override
 920     protected Set<SocketOption<?>> supportedOptions() {
 921         Set<SocketOption<?>> options = (server) ? serverSocketOptions : clientSocketOptions;
 922         if (options == null) {
 923             options = new HashSet<>();
 924             options.add(StandardSocketOptions.SO_RCVBUF);
 925             options.add(StandardSocketOptions.SO_REUSEADDR);
 926             if (server) {
 927                 // IP_TOS added for server socket to maintain compatibility
 928                 options.add(StandardSocketOptions.IP_TOS);
 929                 options.addAll(ExtendedSocketOptions.serverSocketOptions());
 930             } else {

1131                     if (!Net.isReusePortAvailable())
1132                         throw new SocketException("SO_REUSEPORT not supported");
1133                     return Net.getSocketOption(fd, StandardSocketOptions.SO_REUSEPORT);
1134                 default:
1135                     throw new SocketException("Unknown option " + opt);
1136                 }
1137             } catch (SocketException e) {
1138                 throw e;
1139             } catch (IllegalArgumentException | IOException e) {
1140                 throw new SocketException(e.getMessage());
1141             }
1142         }
1143     }
1144 
1145     @Override
1146     protected void shutdownInput() throws IOException {
1147         synchronized (stateLock) {
1148             ensureOpenAndConnected();
1149             if (!isInputClosed) {
1150                 Net.shutdown(fd, Net.SHUT_RD);



1151                 isInputClosed = true;
1152             }
1153         }
1154     }
1155 
1156     @Override
1157     protected void shutdownOutput() throws IOException {
1158         synchronized (stateLock) {
1159             ensureOpenAndConnected();
1160             if (!isOutputClosed) {
1161                 Net.shutdown(fd, Net.SHUT_WR);



1162                 isOutputClosed = true;
1163             }
1164         }
1165     }
1166 
1167     @Override
1168     protected boolean supportsUrgentData() {
1169         return true;
1170     }
1171 
1172     @Override
1173     protected void sendUrgentData(int data) throws IOException {
1174         writeLock.lock();
1175         try {
1176             int n = 0;
1177             FileDescriptor fd = beginWrite();
1178             try {

1179                 do {
1180                     n = Net.sendOOB(fd, (byte) data);
1181                 } while (n == IOStatus.INTERRUPTED && isOpen());
1182                 if (n == IOStatus.UNAVAILABLE) {
1183                     throw new SocketException("No buffer space available");
1184                 }
1185             } finally {
1186                 endWrite(n > 0);
1187             }
1188         } finally {
1189             writeLock.unlock();
1190         }
1191     }
1192 
1193     /**
1194      * Returns an action to close the given file descriptor.
1195      */
1196     private static Runnable closerFor(FileDescriptor fd, boolean stream) {
1197         if (stream) {
1198             return () -> {

  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.io.FileDescriptor;
  29 import java.io.IOException;
  30 import java.io.InputStream;
  31 import java.io.InterruptedIOException;
  32 import java.io.OutputStream;
  33 import java.io.UncheckedIOException;
  34 import java.lang.ref.Cleaner.Cleanable;
  35 import java.net.InetAddress;
  36 import java.net.InetSocketAddress;
  37 import java.net.ProtocolFamily;
  38 import java.net.SocketAddress;
  39 import java.net.SocketException;
  40 import java.net.SocketImpl;
  41 import java.net.SocketOption;
  42 import java.net.SocketTimeoutException;
  43 import java.net.StandardProtocolFamily;
  44 import java.net.StandardSocketOptions;
  45 import java.net.UnknownHostException;
  46 import java.nio.ByteBuffer;
  47 import java.util.Collections;
  48 import java.util.HashSet;
  49 import java.util.Objects;
  50 import java.util.Set;
  51 import java.util.concurrent.TimeUnit;
  52 import java.util.concurrent.locks.ReentrantLock;
  53 
  54 import jdk.internal.misc.VirtualThreads;
  55 import jdk.internal.ref.CleanerFactory;
  56 import sun.net.ConnectionResetException;
  57 import sun.net.NetHooks;
  58 import sun.net.PlatformSocketImpl;
  59 import sun.net.ResourceManager;
  60 import sun.net.ext.ExtendedSocketOptions;
  61 import sun.net.util.SocketExceptions;
  62 
  63 import static java.util.concurrent.TimeUnit.MILLISECONDS;
  64 import static java.util.concurrent.TimeUnit.NANOSECONDS;
  65 
  66 /**
  67  * NIO based SocketImpl.
  68  *
  69  * The underlying socket used by this SocketImpl is initially configured blocking.
  70  * If a connect, accept or read is attempted with a timeout, or a virtual
  71  * thread invokes a blocking operation, then the socket is changed to non-blocking
  72  * When in non-blocking mode, operations that don't complete immediately will
  73  * poll the socket (or park when invoked on a virtual thread) and preserve


  74  * the semantics of blocking operations.
  75  */
  76 
  77 public final class NioSocketImpl extends SocketImpl implements PlatformSocketImpl {
  78     private static final NativeDispatcher nd = new SocketDispatcher();
  79 
  80     // The maximum number of bytes to read/write per syscall to avoid needing
  81     // a huge buffer from the temporary buffer cache
  82     private static final int MAX_BUFFER_SIZE = 128 * 1024;
  83 
  84     // true if this is a SocketImpl for a ServerSocket
  85     private final boolean server;
  86 
  87     // Lock held when reading (also used when accepting or connecting)
  88     private final ReentrantLock readLock = new ReentrantLock();
  89 
  90     // Lock held when writing
  91     private final ReentrantLock writeLock = new ReentrantLock();
  92 
  93     // The stateLock for read/changing state
  94     private final Object stateLock = new Object();
  95     private static final int ST_NEW = 0;
  96     private static final int ST_UNCONNECTED = 1;
  97     private static final int ST_CONNECTING = 2;
  98     private static final int ST_CONNECTED = 3;
  99     private static final int ST_CLOSING = 4;
 100     private static final int ST_CLOSED = 5;
 101     private volatile int state;  // need stateLock to change
 102 
 103     // The file descriptor value
 104     private int fdVal;
 105 
 106     // set by SocketImpl.create, protected by stateLock
 107     private boolean stream;
 108     private Cleanable cleaner;
 109 
 110     // set to true when the socket is in non-blocking mode
 111     private volatile boolean nonBlocking;
 112 
 113     // used by connect/read/write/accept, protected by stateLock
 114     private long readerThread;
 115     private long writerThread;
 116 
 117     // used when SO_REUSEADDR is emulated, protected by stateLock
 118     private boolean isReuseAddress;
 119 
 120     // read or accept timeout in millis
 121     private volatile int timeout;
 122 
 123     // flags to indicate if the connection is shutdown for input and output
 124     private volatile boolean isInputClosed;
 125     private volatile boolean isOutputClosed;

 149     private void ensureOpen() throws SocketException {
 150         int state = this.state;
 151         if (state == ST_NEW)
 152             throw new SocketException("Socket not created");
 153         if (state >= ST_CLOSING)
 154             throw new SocketException("Socket closed");
 155     }
 156 
 157     /**
 158      * Throws SocketException if the socket is not open and connected.
 159      */
 160     private void ensureOpenAndConnected() throws SocketException {
 161         int state = this.state;
 162         if (state < ST_CONNECTED)
 163             throw new SocketException("Not connected");
 164         if (state > ST_CONNECTED)
 165             throw new SocketException("Socket closed");
 166     }
 167 
 168     /**
 169      * Disables the current thread for scheduling purposes until the
 170      * socket is ready for I/O or is asynchronously closed, for up to the
 171      * specified waiting time.
 172      * @throws IOException if an I/O error occurs
 173      */
 174     private void park(FileDescriptor fd, int event, long nanos) throws IOException {
 175         Thread t = Thread.currentThread();
 176         if (t.isVirtual()) {
 177             Poller.register(fdVal, event);
 178             try {
 179                 if (isOpen()) {
 180                     if (nanos == 0) {
 181                         VirtualThreads.park();
 182                     } else {
 183                         VirtualThreads.park(nanos);
 184                     }
 185                     if (t.isInterrupted()) {
 186                         throw new InterruptedIOException();
 187                     }
 188                 }
 189             } finally {
 190                 Poller.deregister(fdVal, event);
 191             }
 192         } else {
 193             long millis;
 194             if (nanos == 0) {
 195                 millis = -1;
 196             } else {
 197                 millis = NANOSECONDS.toMillis(nanos);
 198             }
 199             Net.poll(fd, event, millis);
 200         }

 201     }
 202 
 203     /**
 204      * Disables the current thread for scheduling purposes until the socket is
 205      * ready for I/O or is asynchronously closed.
 206      * @throws IOException if an I/O error occurs
 207      */
 208     private void park(FileDescriptor fd, int event) throws IOException {
 209         park(fd, event, 0);
 210     }
 211 
 212     /**
 213      * Ensures that the socket is configured non-blocking invoked on a virtual
 214      * thread or the operation has a timeout
 215      * @throws IOException if there is an I/O error changing the blocking mode
















 216      */
 217     private void configureNonBlockingIfNeeded(FileDescriptor fd, boolean timed)
 218         throws IOException
 219     {
 220         if (!nonBlocking
 221             && (timed || Thread.currentThread().isVirtual())) {
 222             assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
 223             IOUtil.configureBlocking(fd, false);
 224             nonBlocking = true;
 225         }
 226     }
 227 
 228     /**
 229      * Marks the beginning of a read operation that might block.
 230      * @throws SocketException if the socket is closed or not connected
 231      */
 232     private FileDescriptor beginRead() throws SocketException {
 233         synchronized (stateLock) {
 234             ensureOpenAndConnected();
 235             readerThread = NativeThread.current();
 236             return fd;
 237         }
 238     }
 239 
 240     /**
 241      * Marks the end of a read operation that may have blocked.
 242      * @throws SocketException is the socket is closed
 243      */
 244     private void endRead(boolean completed) throws SocketException {

 289             n = tryRead(fd, b, off, len);
 290         }
 291         return n;
 292     }
 293 
 294     /**
 295      * Reads bytes from the socket into the given byte array.
 296      * @return the number of bytes read or -1 at EOF
 297      * @throws SocketException if the socket is closed or a socket I/O error occurs
 298      * @throws SocketTimeoutException if the read timeout elapses
 299      */
 300     private int implRead(byte[] b, int off, int len) throws IOException {
 301         int n = 0;
 302         FileDescriptor fd = beginRead();
 303         try {
 304             if (connectionReset)
 305                 throw new SocketException("Connection reset");
 306             if (isInputClosed)
 307                 return -1;
 308             int timeout = this.timeout;
 309             configureNonBlockingIfNeeded(fd, timeout > 0);
 310             if (timeout > 0) {
 311                 // read with timeout

 312                 n = timedRead(fd, b, off, len, MILLISECONDS.toNanos(timeout));
 313             } else {
 314                 // read, no timeout
 315                 n = tryRead(fd, b, off, len);
 316                 while (IOStatus.okayToRetry(n) && isOpen()) {
 317                     park(fd, Net.POLLIN);
 318                     n = tryRead(fd, b, off, len);
 319                 }
 320             }
 321             return n;
 322         } catch (InterruptedIOException e) {
 323             throw e;
 324         } catch (ConnectionResetException e) {
 325             connectionReset = true;
 326             throw new SocketException("Connection reset");
 327         } catch (IOException ioe) {
 328             throw new SocketException(ioe.getMessage());
 329         } finally {
 330             endRead(n > 0);
 331         }
 332     }
 333 
 334     /**
 335      * Reads bytes from the socket into the given byte array.
 336      * @return the number of bytes read or -1 at EOF
 337      * @throws IndexOutOfBoundsException if the bound checks fail
 338      * @throws SocketException if the socket is closed or a socket I/O error occurs
 339      * @throws SocketTimeoutException if the read timeout elapses
 340      */
 341     private int read(byte[] b, int off, int len) throws IOException {
 342         Objects.checkFromIndexSize(off, len, b.length);

 396     {
 397         ByteBuffer src = Util.getTemporaryDirectBuffer(len);
 398         assert src.position() == 0;
 399         try {
 400             src.put(b, off, len);
 401             return nd.write(fd, ((DirectBuffer)src).address(), len);
 402         } finally {
 403             Util.offerFirstTemporaryDirectBuffer(src);
 404         }
 405     }
 406 
 407     /**
 408      * Writes a sequence of bytes to the socket from the given byte array.
 409      * @return the number of bytes written
 410      * @throws SocketException if the socket is closed or a socket I/O error occurs
 411      */
 412     private int implWrite(byte[] b, int off, int len) throws IOException {
 413         int n = 0;
 414         FileDescriptor fd = beginWrite();
 415         try {
 416             configureNonBlockingIfNeeded(fd, false);
 417             n = tryWrite(fd, b, off, len);
 418             while (IOStatus.okayToRetry(n) && isOpen()) {
 419                 park(fd, Net.POLLOUT);
 420                 n = tryWrite(fd, b, off, len);
 421             }
 422             return n;
 423         } catch (InterruptedIOException e) {
 424             throw e;
 425         } catch (IOException ioe) {
 426             throw new SocketException(ioe.getMessage());
 427         } finally {
 428             endWrite(n > 0);
 429         }
 430     }
 431 
 432     /**
 433      * Writes a sequence of bytes to the socket from the given byte array.
 434      * @throws SocketException if the socket is closed or a socket I/O error occurs
 435      */
 436     private void write(byte[] b, int off, int len) throws IOException {
 437         Objects.checkFromIndexSize(off, len, b.length);
 438         if (len > 0) {
 439             writeLock.lock();
 440             try {
 441                 int pos = off;
 442                 int end = off + len;
 443                 while (pos < end) {
 444                     // write up to MAX_BUFFER_SIZE bytes

 461         synchronized (stateLock) {
 462             if (state != ST_NEW)
 463                 throw new IOException("Already created");
 464             if (!stream)
 465                 ResourceManager.beforeUdpCreate();
 466             FileDescriptor fd;
 467             try {
 468                 if (server) {
 469                     assert stream;
 470                     fd = Net.serverSocket(true);
 471                 } else {
 472                     fd = Net.socket(stream);
 473                 }
 474             } catch (IOException ioe) {
 475                 if (!stream)
 476                     ResourceManager.afterUdpClose();
 477                 throw ioe;
 478             }
 479             Runnable closer = closerFor(fd, stream);
 480             this.fd = fd;
 481             this.fdVal = IOUtil.fdVal(fd);
 482             this.stream = stream;
 483             this.cleaner = CleanerFactory.cleaner().register(this, closer);
 484             this.state = ST_UNCONNECTED;
 485         }
 486     }
 487 
 488     /**
 489      * Marks the beginning of a connect operation that might block.
 490      * @throws SocketException if the socket is closed or already connected
 491      */
 492     private FileDescriptor beginConnect(InetAddress address, int port)
 493         throws IOException
 494     {
 495         synchronized (stateLock) {
 496             int state = this.state;
 497             if (state != ST_UNCONNECTED) {
 498                 if (state == ST_NEW)
 499                     throw new SocketException("Not created");
 500                 if (state == ST_CONNECTING)
 501                     throw new SocketException("Connection in progress");

 569         // SocketImpl connect only specifies IOException
 570         if (!(remote instanceof InetSocketAddress))
 571             throw new IOException("Unsupported address type");
 572         InetSocketAddress isa = (InetSocketAddress) remote;
 573         if (isa.isUnresolved()) {
 574             throw new UnknownHostException(isa.getHostName());
 575         }
 576 
 577         InetAddress address = isa.getAddress();
 578         if (address.isAnyLocalAddress())
 579             address = InetAddress.getLocalHost();
 580         int port = isa.getPort();
 581 
 582         ReentrantLock connectLock = readLock;
 583         try {
 584             connectLock.lock();
 585             try {
 586                 boolean connected = false;
 587                 FileDescriptor fd = beginConnect(address, port);
 588                 try {
 589                     configureNonBlockingIfNeeded(fd, millis > 0);





 590                     int n = Net.connect(fd, address, port);
 591                     if (n > 0) {
 592                         // connection established
 593                         connected = true;
 594                     } else {
 595                         assert IOStatus.okayToRetry(n);
 596                         if (millis > 0) {
 597                             // finish connect with timeout
 598                             long nanos = MILLISECONDS.toNanos(millis);
 599                             connected = timedFinishConnect(fd, nanos);
 600                         } else {
 601                             // finish connect, no timeout
 602                             boolean polled = false;
 603                             while (!polled && isOpen()) {
 604                                 park(fd, Net.POLLOUT);
 605                                 polled = Net.pollConnectNow(fd);
 606                             }
 607                             connected = polled && isOpen();
 608                         }
 609                     }






 610                 } finally {
 611                     endConnect(fd, connected);
 612                 }
 613             } finally {
 614                 connectLock.unlock();
 615             }
 616         } catch (IOException ioe) {
 617             close();
 618             if (ioe instanceof InterruptedIOException) {
 619                 throw ioe;
 620             } else {
 621                 throw SocketExceptions.of(ioe, isa);
 622             }
 623         }
 624     }
 625 
 626     @Override
 627     protected void connect(String host, int port) throws IOException {
 628         connect(new InetSocketAddress(host, port), 0);
 629     }
 630 
 631     @Override
 632     protected void connect(InetAddress address, int port) throws IOException {
 633         connect(new InetSocketAddress(address, port), 0);
 634     }
 635 
 636     @Override
 637     protected void bind(InetAddress host, int port) throws IOException {
 638         synchronized (stateLock) {
 639             ensureOpen();
 640             if (localport != 0)
 641                 throw new SocketException("Already bound");
 642             NetHooks.beforeTcpBind(fd, host, port);

 729         // acquire the lock, adjusting the timeout for cases where several
 730         // threads are accepting connections and there is a timeout set
 731         ReentrantLock acceptLock = readLock;
 732         int timeout = this.timeout;
 733         long remainingNanos = 0;
 734         if (timeout > 0) {
 735             remainingNanos = tryLock(acceptLock, timeout, MILLISECONDS);
 736             if (remainingNanos <= 0) {
 737                 assert !acceptLock.isHeldByCurrentThread();
 738                 throw new SocketTimeoutException("Accept timed out");
 739             }
 740         } else {
 741             acceptLock.lock();
 742         }
 743 
 744         // accept a connection
 745         try {
 746             int n = 0;
 747             FileDescriptor fd = beginAccept();
 748             try {
 749                 configureNonBlockingIfNeeded(fd, remainingNanos > 0);
 750                 if (remainingNanos > 0) {
 751                     // accept with timeout

 752                     n = timedAccept(fd, newfd, isaa, remainingNanos);
 753                 } else {
 754                     // accept, no timeout
 755                     n = Net.accept(fd, newfd, isaa);
 756                     while (IOStatus.okayToRetry(n) && isOpen()) {
 757                         park(fd, Net.POLLIN);
 758                         n = Net.accept(fd, newfd, isaa);
 759                     }
 760                 }
 761             } finally {
 762                 endAccept(n > 0);
 763                 assert IOStatus.check(n);
 764             }
 765         } finally {
 766             acceptLock.unlock();
 767         }
 768 
 769         // get local address and configure accepted socket to blocking mode
 770         InetSocketAddress localAddress;
 771         try {
 772             localAddress = Net.localAddress(newfd);
 773             IOUtil.configureBlocking(newfd, true);
 774         } catch (IOException ioe) {
 775             nd.close(newfd);
 776             throw ioe;
 777         }
 778 
 779         // set the fields
 780         Runnable closer = closerFor(newfd, true);
 781         synchronized (nsi.stateLock) {
 782             nsi.fd = newfd;
 783             nsi.fdVal = IOUtil.fdVal(newfd);
 784             nsi.stream = true;
 785             nsi.cleaner = CleanerFactory.cleaner().register(nsi, closer);
 786             nsi.localport = localAddress.getPort();
 787             nsi.address = isaa[0].getAddress();
 788             nsi.port = isaa[0].getPort();
 789             nsi.state = ST_CONNECTED;
 790         }
 791     }
 792 
 793     @Override
 794     protected InputStream getInputStream() {
 795         return new InputStream() {
 796             @Override
 797             public int read() throws IOException {
 798                 byte[] a = new byte[1];
 799                 int n = read(a, 0, 1);
 800                 return (n > 0) ? (a[0] & 0xff) : -1;
 801             }
 802             @Override
 803             public int read(byte[] b, int off, int len) throws IOException {

 874             tryClose();
 875         } catch (IOException ignore) { }
 876     }
 877 
 878     /**
 879      * Closes the socket. If there are I/O operations in progress then the
 880      * socket is pre-closed and the threads are signalled. The socket will be
 881      * closed when the last I/O operation aborts.
 882      */
 883     @Override
 884     protected void close() throws IOException {
 885         synchronized (stateLock) {
 886             int state = this.state;
 887             if (state >= ST_CLOSING)
 888                 return;
 889             if (state == ST_NEW) {
 890                 // stillborn
 891                 this.state = ST_CLOSED;
 892                 return;
 893             }
 894             boolean connected = (state == ST_CONNECTED);
 895             this.state = ST_CLOSING;
 896 
 897             // shutdown output when linger interval not set to 0
 898             if (connected) {
 899                 try {
 900                     var SO_LINGER = StandardSocketOptions.SO_LINGER;
 901                     if ((int) Net.getSocketOption(fd, SO_LINGER) != 0) {
 902                         Net.shutdown(fd, Net.SHUT_WR);
 903                     }
 904                 } catch (IOException ignore) { }
 905             }
 906 
 907             // attempt to close the socket. If there are I/O operations in progress
 908             // then the socket is pre-closed and the thread(s) signalled. The
 909             // last thread will close the file descriptor.
 910             if (!tryClose()) {

 911                 long reader = readerThread;


 912                 long writer = writerThread;
 913                 if (NativeThread.isVirtualThread(reader)
 914                         || NativeThread.isVirtualThread(writer)) {
 915                     Poller.stopPoll(fdVal);
 916                 }
 917                 if (NativeThread.isKernelThread(reader)
 918                         || NativeThread.isKernelThread(writer)) {
 919                     nd.preClose(fd);
 920                     if (NativeThread.isKernelThread(reader))
 921                         NativeThread.signal(reader);
 922                     if (NativeThread.isKernelThread(writer))
 923                         NativeThread.signal(writer);
 924                 }
 925             }
 926         }
 927     }
 928 
 929     // the socket options supported by client and server sockets
 930     private static volatile Set<SocketOption<?>> clientSocketOptions;
 931     private static volatile Set<SocketOption<?>> serverSocketOptions;
 932 
 933     @Override
 934     protected Set<SocketOption<?>> supportedOptions() {
 935         Set<SocketOption<?>> options = (server) ? serverSocketOptions : clientSocketOptions;
 936         if (options == null) {
 937             options = new HashSet<>();
 938             options.add(StandardSocketOptions.SO_RCVBUF);
 939             options.add(StandardSocketOptions.SO_REUSEADDR);
 940             if (server) {
 941                 // IP_TOS added for server socket to maintain compatibility
 942                 options.add(StandardSocketOptions.IP_TOS);
 943                 options.addAll(ExtendedSocketOptions.serverSocketOptions());
 944             } else {

1145                     if (!Net.isReusePortAvailable())
1146                         throw new SocketException("SO_REUSEPORT not supported");
1147                     return Net.getSocketOption(fd, StandardSocketOptions.SO_REUSEPORT);
1148                 default:
1149                     throw new SocketException("Unknown option " + opt);
1150                 }
1151             } catch (SocketException e) {
1152                 throw e;
1153             } catch (IllegalArgumentException | IOException e) {
1154                 throw new SocketException(e.getMessage());
1155             }
1156         }
1157     }
1158 
1159     @Override
1160     protected void shutdownInput() throws IOException {
1161         synchronized (stateLock) {
1162             ensureOpenAndConnected();
1163             if (!isInputClosed) {
1164                 Net.shutdown(fd, Net.SHUT_RD);
1165                 if (NativeThread.isVirtualThread(readerThread)) {
1166                     Poller.stopPoll(fdVal, Net.POLLIN);
1167                 }
1168                 isInputClosed = true;
1169             }
1170         }
1171     }
1172 
1173     @Override
1174     protected void shutdownOutput() throws IOException {
1175         synchronized (stateLock) {
1176             ensureOpenAndConnected();
1177             if (!isOutputClosed) {
1178                 Net.shutdown(fd, Net.SHUT_WR);
1179                 if (NativeThread.isVirtualThread(writerThread)) {
1180                     Poller.stopPoll(fdVal, Net.POLLOUT);
1181                 }
1182                 isOutputClosed = true;
1183             }
1184         }
1185     }
1186 
1187     @Override
1188     protected boolean supportsUrgentData() {
1189         return true;
1190     }
1191 
1192     @Override
1193     protected void sendUrgentData(int data) throws IOException {
1194         writeLock.lock();
1195         try {
1196             int n = 0;
1197             FileDescriptor fd = beginWrite();
1198             try {
1199                 configureNonBlockingIfNeeded(fd, false);
1200                 do {
1201                     n = Net.sendOOB(fd, (byte) data);
1202                 } while (n == IOStatus.INTERRUPTED && isOpen());
1203                 if (n == IOStatus.UNAVAILABLE) {
1204                     throw new SocketException("No buffer space available");
1205                 }
1206             } finally {
1207                 endWrite(n > 0);
1208             }
1209         } finally {
1210             writeLock.unlock();
1211         }
1212     }
1213 
1214     /**
1215      * Returns an action to close the given file descriptor.
1216      */
1217     private static Runnable closerFor(FileDescriptor fd, boolean stream) {
1218         if (stream) {
1219             return () -> {
< prev index next >