1 /*
   2  * Copyright (c) 2019, 2021, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.io.FileDescriptor;
  29 import java.io.IOException;
  30 import java.io.InputStream;
  31 import java.io.InterruptedIOException;
  32 import java.io.OutputStream;
  33 import java.io.UncheckedIOException;
  34 import java.lang.ref.Cleaner.Cleanable;
  35 import java.net.InetAddress;
  36 import java.net.InetSocketAddress;
  37 import java.net.ProtocolFamily;
  38 import java.net.SocketAddress;
  39 import java.net.SocketException;
  40 import java.net.SocketImpl;
  41 import java.net.SocketOption;
  42 import java.net.SocketTimeoutException;
  43 import java.net.StandardProtocolFamily;
  44 import java.net.StandardSocketOptions;
  45 import java.net.UnknownHostException;
  46 import java.nio.ByteBuffer;
  47 import java.util.Collections;
  48 import java.util.HashSet;
  49 import java.util.Objects;
  50 import java.util.Set;
  51 import java.util.concurrent.TimeUnit;
  52 import java.util.concurrent.locks.ReentrantLock;
  53 
  54 import jdk.internal.misc.VirtualThreads;
  55 import jdk.internal.ref.CleanerFactory;
  56 import sun.net.ConnectionResetException;
  57 import sun.net.NetHooks;
  58 import sun.net.PlatformSocketImpl;
  59 import sun.net.ResourceManager;
  60 import sun.net.ext.ExtendedSocketOptions;
  61 import sun.net.util.SocketExceptions;
  62 
  63 import static java.util.concurrent.TimeUnit.MILLISECONDS;
  64 import static java.util.concurrent.TimeUnit.NANOSECONDS;
  65 
  66 /**
  67  * NIO based SocketImpl.
  68  *
  69  * The underlying socket used by this SocketImpl is initially configured blocking.
  70  * If a connect, accept or read is attempted with a timeout, or a virtual
  71  * thread invokes a blocking operation, then the socket is changed to non-blocking
  72  * When in non-blocking mode, operations that don't complete immediately will
  73  * poll the socket (or park when invoked on a virtual thread) and preserve


  74  * the semantics of blocking operations.
  75  */
  76 
  77 public final class NioSocketImpl extends SocketImpl implements PlatformSocketImpl {
  78     private static final NativeDispatcher nd = new SocketDispatcher();
  79 
  80     // The maximum number of bytes to read/write per syscall to avoid needing
  81     // a huge buffer from the temporary buffer cache
  82     private static final int MAX_BUFFER_SIZE = 128 * 1024;
  83 
  84     // true if this is a SocketImpl for a ServerSocket
  85     private final boolean server;
  86 
  87     // Lock held when reading (also used when accepting or connecting)
  88     private final ReentrantLock readLock = new ReentrantLock();
  89 
  90     // Lock held when writing
  91     private final ReentrantLock writeLock = new ReentrantLock();
  92 
  93     // The stateLock for read/changing state
  94     private final Object stateLock = new Object();
  95     private static final int ST_NEW = 0;
  96     private static final int ST_UNCONNECTED = 1;
  97     private static final int ST_CONNECTING = 2;
  98     private static final int ST_CONNECTED = 3;
  99     private static final int ST_CLOSING = 4;
 100     private static final int ST_CLOSED = 5;
 101     private volatile int state;  // need stateLock to change
 102 
 103     // The file descriptor value
 104     private int fdVal;
 105 
 106     // set by SocketImpl.create, protected by stateLock
 107     private boolean stream;
 108     private Cleanable cleaner;
 109 
 110     // set to true when the socket is in non-blocking mode
 111     private volatile boolean nonBlocking;
 112 
 113     // used by connect/read/write/accept, protected by stateLock
 114     private long readerThread;
 115     private long writerThread;
 116 
 117     // used when SO_REUSEADDR is emulated, protected by stateLock
 118     private boolean isReuseAddress;
 119 
 120     // read or accept timeout in millis
 121     private volatile int timeout;
 122 
 123     // flags to indicate if the connection is shutdown for input and output
 124     private volatile boolean isInputClosed;
 125     private volatile boolean isOutputClosed;
 126 
 127     // used by read to emulate legacy behavior, protected by readLock
 128     private boolean readEOF;
 129     private boolean connectionReset;
 130 
 131     /**
 132      * Creates an instance of this SocketImpl.
 133      * @param server true if this is a SocketImpl for a ServerSocket
 134      */
 135     public NioSocketImpl(boolean server) {
 136         this.server = server;
 137     }
 138 
 139     /**
 140      * Returns true if the socket is open.
 141      */
 142     private boolean isOpen() {
 143         return state < ST_CLOSING;
 144     }
 145 
 146     /**
 147      * Throws SocketException if the socket is not open.
 148      */
 149     private void ensureOpen() throws SocketException {
 150         int state = this.state;
 151         if (state == ST_NEW)
 152             throw new SocketException("Socket not created");
 153         if (state >= ST_CLOSING)
 154             throw new SocketException("Socket closed");
 155     }
 156 
 157     /**
 158      * Throws SocketException if the socket is not open and connected.
 159      */
 160     private void ensureOpenAndConnected() throws SocketException {
 161         int state = this.state;
 162         if (state < ST_CONNECTED)
 163             throw new SocketException("Not connected");
 164         if (state > ST_CONNECTED)
 165             throw new SocketException("Socket closed");
 166     }
 167 
 168     /**
 169      * Disables the current thread for scheduling purposes until the
 170      * socket is ready for I/O or is asynchronously closed, for up to the
 171      * specified waiting time.
 172      * @throws IOException if an I/O error occurs
 173      */
 174     private void park(FileDescriptor fd, int event, long nanos) throws IOException {
 175         Thread t = Thread.currentThread();
 176         if (t.isVirtual()) {
 177             Poller.register(fdVal, event);
 178             try {
 179                 if (isOpen()) {
 180                     if (nanos == 0) {
 181                         VirtualThreads.park();
 182                     } else {
 183                         VirtualThreads.park(nanos);
 184                     }
 185                     if (t.isInterrupted()) {
 186                         throw new InterruptedIOException();
 187                     }
 188                 }
 189             } finally {
 190                 Poller.deregister(fdVal, event);
 191             }
 192         } else {
 193             long millis;
 194             if (nanos == 0) {
 195                 millis = -1;
 196             } else {
 197                 millis = NANOSECONDS.toMillis(nanos);
 198             }
 199             Net.poll(fd, event, millis);
 200         }

 201     }
 202 
 203     /**
 204      * Disables the current thread for scheduling purposes until the socket is
 205      * ready for I/O or is asynchronously closed.
 206      * @throws IOException if an I/O error occurs
 207      */
 208     private void park(FileDescriptor fd, int event) throws IOException {
 209         park(fd, event, 0);
 210     }
 211 
 212     /**
 213      * Ensures that the socket is configured non-blocking invoked on a virtual
 214      * thread or the operation has a timeout
 215      * @throws IOException if there is an I/O error changing the blocking mode
















 216      */
 217     private void configureNonBlockingIfNeeded(FileDescriptor fd, boolean timed)
 218         throws IOException
 219     {
 220         if (!nonBlocking
 221             && (timed || Thread.currentThread().isVirtual())) {
 222             assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
 223             IOUtil.configureBlocking(fd, false);
 224             nonBlocking = true;
 225         }
 226     }
 227 
 228     /**
 229      * Marks the beginning of a read operation that might block.
 230      * @throws SocketException if the socket is closed or not connected
 231      */
 232     private FileDescriptor beginRead() throws SocketException {
 233         synchronized (stateLock) {
 234             ensureOpenAndConnected();
 235             readerThread = NativeThread.current();
 236             return fd;
 237         }
 238     }
 239 
 240     /**
 241      * Marks the end of a read operation that may have blocked.
 242      * @throws SocketException is the socket is closed
 243      */
 244     private void endRead(boolean completed) throws SocketException {
 245         synchronized (stateLock) {
 246             readerThread = 0;
 247             int state = this.state;
 248             if (state == ST_CLOSING)
 249                 tryFinishClose();
 250             if (!completed && state >= ST_CLOSING)
 251                 throw new SocketException("Socket closed");
 252         }
 253     }
 254 
 255     /**
 256      * Attempts to read bytes from the socket into the given byte array.
 257      */
 258     private int tryRead(FileDescriptor fd, byte[] b, int off, int len)
 259         throws IOException
 260     {
 261         ByteBuffer dst = Util.getTemporaryDirectBuffer(len);
 262         assert dst.position() == 0;
 263         try {
 264             int n = nd.read(fd, ((DirectBuffer)dst).address(), len);
 265             if (n > 0) {
 266                 dst.get(b, off, n);
 267             }
 268             return n;
 269         } finally {
 270             Util.offerFirstTemporaryDirectBuffer(dst);
 271         }
 272     }
 273 
 274     /**
 275      * Reads bytes from the socket into the given byte array with a timeout.
 276      * @throws SocketTimeoutException if the read timeout elapses
 277      */
 278     private int timedRead(FileDescriptor fd, byte[] b, int off, int len, long nanos)
 279         throws IOException
 280     {
 281         long startNanos = System.nanoTime();
 282         int n = tryRead(fd, b, off, len);
 283         while (n == IOStatus.UNAVAILABLE && isOpen()) {
 284             long remainingNanos = nanos - (System.nanoTime() - startNanos);
 285             if (remainingNanos <= 0) {
 286                 throw new SocketTimeoutException("Read timed out");
 287             }
 288             park(fd, Net.POLLIN, remainingNanos);
 289             n = tryRead(fd, b, off, len);
 290         }
 291         return n;
 292     }
 293 
 294     /**
 295      * Reads bytes from the socket into the given byte array.
 296      * @return the number of bytes read or -1 at EOF
 297      * @throws SocketException if the socket is closed or a socket I/O error occurs
 298      * @throws SocketTimeoutException if the read timeout elapses
 299      */
 300     private int implRead(byte[] b, int off, int len) throws IOException {
 301         int n = 0;
 302         FileDescriptor fd = beginRead();
 303         try {
 304             if (connectionReset)
 305                 throw new SocketException("Connection reset");
 306             if (isInputClosed)
 307                 return -1;
 308             int timeout = this.timeout;
 309             configureNonBlockingIfNeeded(fd, timeout > 0);
 310             if (timeout > 0) {
 311                 // read with timeout

 312                 n = timedRead(fd, b, off, len, MILLISECONDS.toNanos(timeout));
 313             } else {
 314                 // read, no timeout
 315                 n = tryRead(fd, b, off, len);
 316                 while (IOStatus.okayToRetry(n) && isOpen()) {
 317                     park(fd, Net.POLLIN);
 318                     n = tryRead(fd, b, off, len);
 319                 }
 320             }
 321             return n;
 322         } catch (InterruptedIOException e) {
 323             throw e;
 324         } catch (ConnectionResetException e) {
 325             connectionReset = true;
 326             throw new SocketException("Connection reset");
 327         } catch (IOException ioe) {
 328             throw new SocketException(ioe.getMessage());
 329         } finally {
 330             endRead(n > 0);
 331         }
 332     }
 333 
 334     /**
 335      * Reads bytes from the socket into the given byte array.
 336      * @return the number of bytes read or -1 at EOF
 337      * @throws IndexOutOfBoundsException if the bound checks fail
 338      * @throws SocketException if the socket is closed or a socket I/O error occurs
 339      * @throws SocketTimeoutException if the read timeout elapses
 340      */
 341     private int read(byte[] b, int off, int len) throws IOException {
 342         Objects.checkFromIndexSize(off, len, b.length);
 343         if (len == 0) {
 344             return 0;
 345         } else {
 346             readLock.lock();
 347             try {
 348                 // emulate legacy behavior to return -1, even if socket is closed
 349                 if (readEOF)
 350                     return -1;
 351                 // read up to MAX_BUFFER_SIZE bytes
 352                 int size = Math.min(len, MAX_BUFFER_SIZE);
 353                 int n = implRead(b, off, size);
 354                 if (n == -1)
 355                     readEOF = true;
 356                 return n;
 357             } finally {
 358                 readLock.unlock();
 359             }
 360         }
 361     }
 362 
 363     /**
 364      * Marks the beginning of a write operation that might block.
 365      * @throws SocketException if the socket is closed or not connected
 366      */
 367     private FileDescriptor beginWrite() throws SocketException {
 368         synchronized (stateLock) {
 369             ensureOpenAndConnected();
 370             writerThread = NativeThread.current();
 371             return fd;
 372         }
 373     }
 374 
 375     /**
 376      * Marks the end of a write operation that may have blocked.
 377      * @throws SocketException is the socket is closed
 378      */
 379     private void endWrite(boolean completed) throws SocketException {
 380         synchronized (stateLock) {
 381             writerThread = 0;
 382             int state = this.state;
 383             if (state == ST_CLOSING)
 384                 tryFinishClose();
 385             if (!completed && state >= ST_CLOSING)
 386                 throw new SocketException("Socket closed");
 387         }
 388     }
 389 
 390     /**
 391      * Attempts to write a sequence of bytes to the socket from the given
 392      * byte array.
 393      */
 394     private int tryWrite(FileDescriptor fd, byte[] b, int off, int len)
 395         throws IOException
 396     {
 397         ByteBuffer src = Util.getTemporaryDirectBuffer(len);
 398         assert src.position() == 0;
 399         try {
 400             src.put(b, off, len);
 401             return nd.write(fd, ((DirectBuffer)src).address(), len);
 402         } finally {
 403             Util.offerFirstTemporaryDirectBuffer(src);
 404         }
 405     }
 406 
 407     /**
 408      * Writes a sequence of bytes to the socket from the given byte array.
 409      * @return the number of bytes written
 410      * @throws SocketException if the socket is closed or a socket I/O error occurs
 411      */
 412     private int implWrite(byte[] b, int off, int len) throws IOException {
 413         int n = 0;
 414         FileDescriptor fd = beginWrite();
 415         try {
 416             configureNonBlockingIfNeeded(fd, false);
 417             n = tryWrite(fd, b, off, len);
 418             while (IOStatus.okayToRetry(n) && isOpen()) {
 419                 park(fd, Net.POLLOUT);
 420                 n = tryWrite(fd, b, off, len);
 421             }
 422             return n;
 423         } catch (InterruptedIOException e) {
 424             throw e;
 425         } catch (IOException ioe) {
 426             throw new SocketException(ioe.getMessage());
 427         } finally {
 428             endWrite(n > 0);
 429         }
 430     }
 431 
 432     /**
 433      * Writes a sequence of bytes to the socket from the given byte array.
 434      * @throws SocketException if the socket is closed or a socket I/O error occurs
 435      */
 436     private void write(byte[] b, int off, int len) throws IOException {
 437         Objects.checkFromIndexSize(off, len, b.length);
 438         if (len > 0) {
 439             writeLock.lock();
 440             try {
 441                 int pos = off;
 442                 int end = off + len;
 443                 while (pos < end) {
 444                     // write up to MAX_BUFFER_SIZE bytes
 445                     int size = Math.min((end - pos), MAX_BUFFER_SIZE);
 446                     int n = implWrite(b, pos, size);
 447                     pos += n;
 448                 }
 449             } finally {
 450                 writeLock.unlock();
 451             }
 452         }
 453     }
 454 
 455     /**
 456      * Creates the socket.
 457      * @param stream {@code true} for a streams socket
 458      */
 459     @Override
 460     protected void create(boolean stream) throws IOException {
 461         synchronized (stateLock) {
 462             if (state != ST_NEW)
 463                 throw new IOException("Already created");
 464             if (!stream)
 465                 ResourceManager.beforeUdpCreate();
 466             FileDescriptor fd;
 467             try {
 468                 if (server) {
 469                     assert stream;
 470                     fd = Net.serverSocket(true);
 471                 } else {
 472                     fd = Net.socket(stream);
 473                 }
 474             } catch (IOException ioe) {
 475                 if (!stream)
 476                     ResourceManager.afterUdpClose();
 477                 throw ioe;
 478             }
 479             Runnable closer = closerFor(fd, stream);
 480             this.fd = fd;
 481             this.fdVal = IOUtil.fdVal(fd);
 482             this.stream = stream;
 483             this.cleaner = CleanerFactory.cleaner().register(this, closer);
 484             this.state = ST_UNCONNECTED;
 485         }
 486     }
 487 
 488     /**
 489      * Marks the beginning of a connect operation that might block.
 490      * @throws SocketException if the socket is closed or already connected
 491      */
 492     private FileDescriptor beginConnect(InetAddress address, int port)
 493         throws IOException
 494     {
 495         synchronized (stateLock) {
 496             int state = this.state;
 497             if (state != ST_UNCONNECTED) {
 498                 if (state == ST_NEW)
 499                     throw new SocketException("Not created");
 500                 if (state == ST_CONNECTING)
 501                     throw new SocketException("Connection in progress");
 502                 if (state == ST_CONNECTED)
 503                     throw new SocketException("Already connected");
 504                 if (state >= ST_CLOSING)
 505                     throw new SocketException("Socket closed");
 506                 assert false;
 507             }
 508             this.state = ST_CONNECTING;
 509 
 510             // invoke beforeTcpConnect hook if not already bound
 511             if (localport == 0) {
 512                 NetHooks.beforeTcpConnect(fd, address, port);
 513             }
 514 
 515             // save the remote address/port
 516             this.address = address;
 517             this.port = port;
 518 
 519             readerThread = NativeThread.current();
 520             return fd;
 521         }
 522     }
 523 
 524     /**
 525      * Marks the end of a connect operation that may have blocked.
 526      * @throws SocketException is the socket is closed
 527      */
 528     private void endConnect(FileDescriptor fd, boolean completed) throws IOException {
 529         synchronized (stateLock) {
 530             readerThread = 0;
 531             int state = this.state;
 532             if (state == ST_CLOSING)
 533                 tryFinishClose();
 534             if (completed && state == ST_CONNECTING) {
 535                 this.state = ST_CONNECTED;
 536                 localport = Net.localAddress(fd).getPort();
 537             } else if (!completed && state >= ST_CLOSING) {
 538                 throw new SocketException("Socket closed");
 539             }
 540         }
 541     }
 542 
 543     /**
 544      * Waits for a connection attempt to finish with a timeout
 545      * @throws SocketTimeoutException if the connect timeout elapses
 546      */
 547     private boolean timedFinishConnect(FileDescriptor fd, long nanos) throws IOException {
 548         long startNanos = System.nanoTime();
 549         boolean polled = Net.pollConnectNow(fd);
 550         while (!polled && isOpen()) {
 551             long remainingNanos = nanos - (System.nanoTime() - startNanos);
 552             if (remainingNanos <= 0) {
 553                 throw new SocketTimeoutException("Connect timed out");
 554             }
 555             park(fd, Net.POLLOUT, remainingNanos);
 556             polled = Net.pollConnectNow(fd);
 557         }
 558         return polled && isOpen();
 559     }
 560 
 561     /**
 562      * Attempts to establish a connection to the given socket address with a
 563      * timeout. Closes the socket if connection cannot be established.
 564      * @throws IOException if the address is not a resolved InetSocketAddress or
 565      *         the connection cannot be established
 566      */
 567     @Override
 568     protected void connect(SocketAddress remote, int millis) throws IOException {
 569         // SocketImpl connect only specifies IOException
 570         if (!(remote instanceof InetSocketAddress))
 571             throw new IOException("Unsupported address type");
 572         InetSocketAddress isa = (InetSocketAddress) remote;
 573         if (isa.isUnresolved()) {
 574             throw new UnknownHostException(isa.getHostName());
 575         }
 576 
 577         InetAddress address = isa.getAddress();
 578         if (address.isAnyLocalAddress())
 579             address = InetAddress.getLocalHost();
 580         int port = isa.getPort();
 581 
 582         ReentrantLock connectLock = readLock;
 583         try {
 584             connectLock.lock();
 585             try {
 586                 boolean connected = false;
 587                 FileDescriptor fd = beginConnect(address, port);
 588                 try {
 589                     configureNonBlockingIfNeeded(fd, millis > 0);





 590                     int n = Net.connect(fd, address, port);
 591                     if (n > 0) {
 592                         // connection established
 593                         connected = true;
 594                     } else {
 595                         assert IOStatus.okayToRetry(n);
 596                         if (millis > 0) {
 597                             // finish connect with timeout
 598                             long nanos = MILLISECONDS.toNanos(millis);
 599                             connected = timedFinishConnect(fd, nanos);
 600                         } else {
 601                             // finish connect, no timeout
 602                             boolean polled = false;
 603                             while (!polled && isOpen()) {
 604                                 park(fd, Net.POLLOUT);
 605                                 polled = Net.pollConnectNow(fd);
 606                             }
 607                             connected = polled && isOpen();
 608                         }
 609                     }






 610                 } finally {
 611                     endConnect(fd, connected);
 612                 }
 613             } finally {
 614                 connectLock.unlock();
 615             }
 616         } catch (IOException ioe) {
 617             close();
 618             if (ioe instanceof InterruptedIOException) {
 619                 throw ioe;
 620             } else {
 621                 throw SocketExceptions.of(ioe, isa);
 622             }
 623         }
 624     }
 625 
 626     @Override
 627     protected void connect(String host, int port) throws IOException {
 628         connect(new InetSocketAddress(host, port), 0);
 629     }
 630 
 631     @Override
 632     protected void connect(InetAddress address, int port) throws IOException {
 633         connect(new InetSocketAddress(address, port), 0);
 634     }
 635 
 636     @Override
 637     protected void bind(InetAddress host, int port) throws IOException {
 638         synchronized (stateLock) {
 639             ensureOpen();
 640             if (localport != 0)
 641                 throw new SocketException("Already bound");
 642             NetHooks.beforeTcpBind(fd, host, port);
 643             Net.bind(fd, host, port);
 644             // set the address field to the given host address to
 645             // maintain long standing behavior. When binding to 0.0.0.0
 646             // then the actual local address will be ::0 when IPv6 is enabled.
 647             address = host;
 648             localport = Net.localAddress(fd).getPort();
 649         }
 650     }
 651 
 652     @Override
 653     protected void listen(int backlog) throws IOException {
 654         synchronized (stateLock) {
 655             ensureOpen();
 656             if (localport == 0)
 657                 throw new SocketException("Not bound");
 658             Net.listen(fd, backlog < 1 ? 50 : backlog);
 659         }
 660     }
 661 
 662     /**
 663      * Marks the beginning of an accept operation that might block.
 664      * @throws SocketException if the socket is closed
 665      */
 666     private FileDescriptor beginAccept() throws SocketException {
 667         synchronized (stateLock) {
 668             ensureOpen();
 669             if (!stream)
 670                 throw new SocketException("Not a stream socket");
 671             if (localport == 0)
 672                 throw new SocketException("Not bound");
 673             readerThread = NativeThread.current();
 674             return fd;
 675         }
 676     }
 677 
 678     /**
 679      * Marks the end of an accept operation that may have blocked.
 680      * @throws SocketException is the socket is closed
 681      */
 682     private void endAccept(boolean completed) throws SocketException {
 683         synchronized (stateLock) {
 684             int state = this.state;
 685             readerThread = 0;
 686             if (state == ST_CLOSING)
 687                 tryFinishClose();
 688             if (!completed && state >= ST_CLOSING)
 689                 throw new SocketException("Socket closed");
 690         }
 691     }
 692 
 693     /**
 694      * Accepts a new connection with a timeout.
 695      * @throws SocketTimeoutException if the accept timeout elapses
 696      */
 697     private int timedAccept(FileDescriptor fd,
 698                             FileDescriptor newfd,
 699                             InetSocketAddress[] isaa,
 700                             long nanos)
 701         throws IOException
 702     {
 703         long startNanos = System.nanoTime();
 704         int n = Net.accept(fd, newfd, isaa);
 705         while (n == IOStatus.UNAVAILABLE && isOpen()) {
 706             long remainingNanos = nanos - (System.nanoTime() - startNanos);
 707             if (remainingNanos <= 0) {
 708                 throw new SocketTimeoutException("Accept timed out");
 709             }
 710             park(fd, Net.POLLIN, remainingNanos);
 711             n = Net.accept(fd, newfd, isaa);
 712         }
 713         return n;
 714     }
 715 
 716     /**
 717      * Accepts a new connection so that the given SocketImpl is connected to
 718      * the peer. The SocketImpl must be a newly created NioSocketImpl.
 719      */
 720     @Override
 721     protected void accept(SocketImpl si) throws IOException {
 722         NioSocketImpl nsi = (NioSocketImpl) si;
 723         if (nsi.state != ST_NEW)
 724             throw new SocketException("Not a newly created SocketImpl");
 725 
 726         FileDescriptor newfd = new FileDescriptor();
 727         InetSocketAddress[] isaa = new InetSocketAddress[1];
 728 
 729         // acquire the lock, adjusting the timeout for cases where several
 730         // threads are accepting connections and there is a timeout set
 731         ReentrantLock acceptLock = readLock;
 732         int timeout = this.timeout;
 733         long remainingNanos = 0;
 734         if (timeout > 0) {
 735             remainingNanos = tryLock(acceptLock, timeout, MILLISECONDS);
 736             if (remainingNanos <= 0) {
 737                 assert !acceptLock.isHeldByCurrentThread();
 738                 throw new SocketTimeoutException("Accept timed out");
 739             }
 740         } else {
 741             acceptLock.lock();
 742         }
 743 
 744         // accept a connection
 745         try {
 746             int n = 0;
 747             FileDescriptor fd = beginAccept();
 748             try {
 749                 configureNonBlockingIfNeeded(fd, remainingNanos > 0);
 750                 if (remainingNanos > 0) {
 751                     // accept with timeout

 752                     n = timedAccept(fd, newfd, isaa, remainingNanos);
 753                 } else {
 754                     // accept, no timeout
 755                     n = Net.accept(fd, newfd, isaa);
 756                     while (IOStatus.okayToRetry(n) && isOpen()) {
 757                         park(fd, Net.POLLIN);
 758                         n = Net.accept(fd, newfd, isaa);
 759                     }
 760                 }
 761             } finally {
 762                 endAccept(n > 0);
 763                 assert IOStatus.check(n);
 764             }
 765         } finally {
 766             acceptLock.unlock();
 767         }
 768 
 769         // get local address and configure accepted socket to blocking mode
 770         InetSocketAddress localAddress;
 771         try {
 772             localAddress = Net.localAddress(newfd);
 773             IOUtil.configureBlocking(newfd, true);
 774         } catch (IOException ioe) {
 775             nd.close(newfd);
 776             throw ioe;
 777         }
 778 
 779         // set the fields
 780         Runnable closer = closerFor(newfd, true);
 781         synchronized (nsi.stateLock) {
 782             nsi.fd = newfd;
 783             nsi.fdVal = IOUtil.fdVal(newfd);
 784             nsi.stream = true;
 785             nsi.cleaner = CleanerFactory.cleaner().register(nsi, closer);
 786             nsi.localport = localAddress.getPort();
 787             nsi.address = isaa[0].getAddress();
 788             nsi.port = isaa[0].getPort();
 789             nsi.state = ST_CONNECTED;
 790         }
 791     }
 792 
 793     @Override
 794     protected InputStream getInputStream() {
 795         return new InputStream() {
 796             @Override
 797             public int read() throws IOException {
 798                 byte[] a = new byte[1];
 799                 int n = read(a, 0, 1);
 800                 return (n > 0) ? (a[0] & 0xff) : -1;
 801             }
 802             @Override
 803             public int read(byte[] b, int off, int len) throws IOException {
 804                 return NioSocketImpl.this.read(b, off, len);
 805             }
 806             @Override
 807             public int available() throws IOException {
 808                 return NioSocketImpl.this.available();
 809             }
 810             @Override
 811             public void close() throws IOException {
 812                 NioSocketImpl.this.close();
 813             }
 814         };
 815     }
 816 
 817     @Override
 818     protected OutputStream getOutputStream() {
 819         return new OutputStream() {
 820             @Override
 821             public void write(int b) throws IOException {
 822                 byte[] a = new byte[]{(byte) b};
 823                 write(a, 0, 1);
 824             }
 825             @Override
 826             public void write(byte[] b, int off, int len) throws IOException {
 827                 NioSocketImpl.this.write(b, off, len);
 828             }
 829             @Override
 830             public void close() throws IOException {
 831                 NioSocketImpl.this.close();
 832             }
 833         };
 834     }
 835 
 836     @Override
 837     protected int available() throws IOException {
 838         synchronized (stateLock) {
 839             ensureOpenAndConnected();
 840             if (isInputClosed) {
 841                 return 0;
 842             } else {
 843                 return Net.available(fd);
 844             }
 845         }
 846     }
 847 
 848     /**
 849      * Closes the socket if there are no I/O operations in progress.
 850      */
 851     private boolean tryClose() throws IOException {
 852         assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
 853         if (readerThread == 0 && writerThread == 0) {
 854             try {
 855                 cleaner.clean();
 856             } catch (UncheckedIOException ioe) {
 857                 throw ioe.getCause();
 858             } finally {
 859                 state = ST_CLOSED;
 860             }
 861             return true;
 862         } else {
 863             return false;
 864         }
 865     }
 866 
 867     /**
 868      * Invokes tryClose to attempt to close the socket.
 869      *
 870      * This method is used for deferred closing by I/O operations.
 871      */
 872     private void tryFinishClose() {
 873         try {
 874             tryClose();
 875         } catch (IOException ignore) { }
 876     }
 877 
 878     /**
 879      * Closes the socket. If there are I/O operations in progress then the
 880      * socket is pre-closed and the threads are signalled. The socket will be
 881      * closed when the last I/O operation aborts.
 882      */
 883     @Override
 884     protected void close() throws IOException {
 885         synchronized (stateLock) {
 886             int state = this.state;
 887             if (state >= ST_CLOSING)
 888                 return;
 889             if (state == ST_NEW) {
 890                 // stillborn
 891                 this.state = ST_CLOSED;
 892                 return;
 893             }
 894             boolean connected = (state == ST_CONNECTED);
 895             this.state = ST_CLOSING;
 896 
 897             // shutdown output when linger interval not set to 0
 898             if (connected) {
 899                 try {
 900                     var SO_LINGER = StandardSocketOptions.SO_LINGER;
 901                     if ((int) Net.getSocketOption(fd, SO_LINGER) != 0) {
 902                         Net.shutdown(fd, Net.SHUT_WR);
 903                     }
 904                 } catch (IOException ignore) { }
 905             }
 906 
 907             // attempt to close the socket. If there are I/O operations in progress
 908             // then the socket is pre-closed and the thread(s) signalled. The
 909             // last thread will close the file descriptor.
 910             if (!tryClose()) {

 911                 long reader = readerThread;


 912                 long writer = writerThread;
 913                 if (NativeThread.isVirtualThread(reader)
 914                         || NativeThread.isVirtualThread(writer)) {
 915                     Poller.stopPoll(fdVal);
 916                 }
 917                 if (NativeThread.isKernelThread(reader)
 918                         || NativeThread.isKernelThread(writer)) {
 919                     nd.preClose(fd);
 920                     if (NativeThread.isKernelThread(reader))
 921                         NativeThread.signal(reader);
 922                     if (NativeThread.isKernelThread(writer))
 923                         NativeThread.signal(writer);
 924                 }
 925             }
 926         }
 927     }
 928 
 929     // the socket options supported by client and server sockets
 930     private static volatile Set<SocketOption<?>> clientSocketOptions;
 931     private static volatile Set<SocketOption<?>> serverSocketOptions;
 932 
 933     @Override
 934     protected Set<SocketOption<?>> supportedOptions() {
 935         Set<SocketOption<?>> options = (server) ? serverSocketOptions : clientSocketOptions;
 936         if (options == null) {
 937             options = new HashSet<>();
 938             options.add(StandardSocketOptions.SO_RCVBUF);
 939             options.add(StandardSocketOptions.SO_REUSEADDR);
 940             if (server) {
 941                 // IP_TOS added for server socket to maintain compatibility
 942                 options.add(StandardSocketOptions.IP_TOS);
 943                 options.addAll(ExtendedSocketOptions.serverSocketOptions());
 944             } else {
 945                 options.add(StandardSocketOptions.IP_TOS);
 946                 options.add(StandardSocketOptions.SO_KEEPALIVE);
 947                 options.add(StandardSocketOptions.SO_SNDBUF);
 948                 options.add(StandardSocketOptions.SO_LINGER);
 949                 options.add(StandardSocketOptions.TCP_NODELAY);
 950                 options.addAll(ExtendedSocketOptions.clientSocketOptions());
 951             }
 952             if (Net.isReusePortAvailable())
 953                 options.add(StandardSocketOptions.SO_REUSEPORT);
 954             options = Collections.unmodifiableSet(options);
 955             if (server) {
 956                 serverSocketOptions = options;
 957             } else {
 958                 clientSocketOptions = options;
 959             }
 960         }
 961         return options;
 962     }
 963 
 964     @Override
 965     protected <T> void setOption(SocketOption<T> opt, T value) throws IOException {
 966         if (!supportedOptions().contains(opt))
 967             throw new UnsupportedOperationException("'" + opt + "' not supported");
 968         if (!opt.type().isInstance(value))
 969             throw new IllegalArgumentException("Invalid value '" + value + "'");
 970         synchronized (stateLock) {
 971             ensureOpen();
 972             if (opt == StandardSocketOptions.IP_TOS) {
 973                 // maps to IP_TOS or IPV6_TCLASS
 974                 Net.setSocketOption(fd, family(), opt, value);
 975             } else if (opt == StandardSocketOptions.SO_REUSEADDR) {
 976                 boolean b = (boolean) value;
 977                 if (Net.useExclusiveBind()) {
 978                     isReuseAddress = b;
 979                 } else {
 980                     Net.setSocketOption(fd, opt, b);
 981                 }
 982             } else {
 983                 // option does not need special handling
 984                 Net.setSocketOption(fd, opt, value);
 985             }
 986         }
 987     }
 988 
 989     @SuppressWarnings("unchecked")
 990     protected <T> T getOption(SocketOption<T> opt) throws IOException {
 991         if (!supportedOptions().contains(opt))
 992             throw new UnsupportedOperationException("'" + opt + "' not supported");
 993         synchronized (stateLock) {
 994             ensureOpen();
 995             if (opt == StandardSocketOptions.IP_TOS) {
 996                 return (T) Net.getSocketOption(fd, family(), opt);
 997             } else if (opt == StandardSocketOptions.SO_REUSEADDR) {
 998                 if (Net.useExclusiveBind()) {
 999                     return (T) Boolean.valueOf(isReuseAddress);
1000                 } else {
1001                     return (T) Net.getSocketOption(fd, opt);
1002                 }
1003             } else {
1004                 // option does not need special handling
1005                 return (T) Net.getSocketOption(fd, opt);
1006             }
1007         }
1008     }
1009 
1010     private boolean booleanValue(Object value, String desc) throws SocketException {
1011         if (!(value instanceof Boolean))
1012             throw new SocketException("Bad value for " + desc);
1013         return (boolean) value;
1014     }
1015 
1016     private int intValue(Object value, String desc) throws SocketException {
1017         if (!(value instanceof Integer))
1018             throw new SocketException("Bad value for " + desc);
1019         return (int) value;
1020     }
1021 
1022     @Override
1023     public void setOption(int opt, Object value) throws SocketException {
1024         synchronized (stateLock) {
1025             ensureOpen();
1026             try {
1027                 switch (opt) {
1028                 case SO_LINGER: {
1029                     // the value is "false" to disable, or linger interval to enable
1030                     int i;
1031                     if (value instanceof Boolean && ((boolean) value) == false) {
1032                         i = -1;
1033                     } else {
1034                         i = intValue(value, "SO_LINGER");
1035                     }
1036                     Net.setSocketOption(fd, StandardSocketOptions.SO_LINGER, i);
1037                     break;
1038                 }
1039                 case SO_TIMEOUT: {
1040                     int i = intValue(value, "SO_TIMEOUT");
1041                     if (i < 0)
1042                         throw new IllegalArgumentException("timeout < 0");
1043                     timeout = i;
1044                     break;
1045                 }
1046                 case IP_TOS: {
1047                     int i = intValue(value, "IP_TOS");
1048                     Net.setSocketOption(fd, family(), StandardSocketOptions.IP_TOS, i);
1049                     break;
1050                 }
1051                 case TCP_NODELAY: {
1052                     boolean b = booleanValue(value, "TCP_NODELAY");
1053                     Net.setSocketOption(fd, StandardSocketOptions.TCP_NODELAY, b);
1054                     break;
1055                 }
1056                 case SO_SNDBUF: {
1057                     int i = intValue(value, "SO_SNDBUF");
1058                     if (i <= 0)
1059                         throw new SocketException("SO_SNDBUF <= 0");
1060                     Net.setSocketOption(fd, StandardSocketOptions.SO_SNDBUF, i);
1061                     break;
1062                 }
1063                 case SO_RCVBUF: {
1064                     int i = intValue(value, "SO_RCVBUF");
1065                     if (i <= 0)
1066                         throw new SocketException("SO_RCVBUF <= 0");
1067                     Net.setSocketOption(fd, StandardSocketOptions.SO_RCVBUF, i);
1068                     break;
1069                 }
1070                 case SO_KEEPALIVE: {
1071                     boolean b = booleanValue(value, "SO_KEEPALIVE");
1072                     Net.setSocketOption(fd, StandardSocketOptions.SO_KEEPALIVE, b);
1073                     break;
1074                 }
1075                 case SO_OOBINLINE: {
1076                     boolean b = booleanValue(value, "SO_OOBINLINE");
1077                     Net.setSocketOption(fd, ExtendedSocketOption.SO_OOBINLINE, b);
1078                     break;
1079                 }
1080                 case SO_REUSEADDR: {
1081                     boolean b = booleanValue(value, "SO_REUSEADDR");
1082                     if (Net.useExclusiveBind()) {
1083                         isReuseAddress = b;
1084                     } else {
1085                         Net.setSocketOption(fd, StandardSocketOptions.SO_REUSEADDR, b);
1086                     }
1087                     break;
1088                 }
1089                 case SO_REUSEPORT: {
1090                     if (!Net.isReusePortAvailable())
1091                         throw new SocketException("SO_REUSEPORT not supported");
1092                     boolean b = booleanValue(value, "SO_REUSEPORT");
1093                     Net.setSocketOption(fd, StandardSocketOptions.SO_REUSEPORT, b);
1094                     break;
1095                 }
1096                 default:
1097                     throw new SocketException("Unknown option " + opt);
1098                 }
1099             } catch (SocketException e) {
1100                 throw e;
1101             } catch (IllegalArgumentException | IOException e) {
1102                 throw new SocketException(e.getMessage());
1103             }
1104         }
1105     }
1106 
1107     @Override
1108     public Object getOption(int opt) throws SocketException {
1109         synchronized (stateLock) {
1110             ensureOpen();
1111             try {
1112                 switch (opt) {
1113                 case SO_TIMEOUT:
1114                     return timeout;
1115                 case TCP_NODELAY:
1116                     return Net.getSocketOption(fd, StandardSocketOptions.TCP_NODELAY);
1117                 case SO_OOBINLINE:
1118                     return Net.getSocketOption(fd, ExtendedSocketOption.SO_OOBINLINE);
1119                 case SO_LINGER: {
1120                     // return "false" when disabled, linger interval when enabled
1121                     int i = (int) Net.getSocketOption(fd, StandardSocketOptions.SO_LINGER);
1122                     if (i == -1) {
1123                         return Boolean.FALSE;
1124                     } else {
1125                         return i;
1126                     }
1127                 }
1128                 case SO_REUSEADDR:
1129                     if (Net.useExclusiveBind()) {
1130                         return isReuseAddress;
1131                     } else {
1132                         return Net.getSocketOption(fd, StandardSocketOptions.SO_REUSEADDR);
1133                     }
1134                 case SO_BINDADDR:
1135                     return Net.localAddress(fd).getAddress();
1136                 case SO_SNDBUF:
1137                     return Net.getSocketOption(fd, StandardSocketOptions.SO_SNDBUF);
1138                 case SO_RCVBUF:
1139                     return Net.getSocketOption(fd, StandardSocketOptions.SO_RCVBUF);
1140                 case IP_TOS:
1141                     return Net.getSocketOption(fd, family(), StandardSocketOptions.IP_TOS);
1142                 case SO_KEEPALIVE:
1143                     return Net.getSocketOption(fd, StandardSocketOptions.SO_KEEPALIVE);
1144                 case SO_REUSEPORT:
1145                     if (!Net.isReusePortAvailable())
1146                         throw new SocketException("SO_REUSEPORT not supported");
1147                     return Net.getSocketOption(fd, StandardSocketOptions.SO_REUSEPORT);
1148                 default:
1149                     throw new SocketException("Unknown option " + opt);
1150                 }
1151             } catch (SocketException e) {
1152                 throw e;
1153             } catch (IllegalArgumentException | IOException e) {
1154                 throw new SocketException(e.getMessage());
1155             }
1156         }
1157     }
1158 
1159     @Override
1160     protected void shutdownInput() throws IOException {
1161         synchronized (stateLock) {
1162             ensureOpenAndConnected();
1163             if (!isInputClosed) {
1164                 Net.shutdown(fd, Net.SHUT_RD);
1165                 if (NativeThread.isVirtualThread(readerThread)) {
1166                     Poller.stopPoll(fdVal, Net.POLLIN);
1167                 }
1168                 isInputClosed = true;
1169             }
1170         }
1171     }
1172 
1173     @Override
1174     protected void shutdownOutput() throws IOException {
1175         synchronized (stateLock) {
1176             ensureOpenAndConnected();
1177             if (!isOutputClosed) {
1178                 Net.shutdown(fd, Net.SHUT_WR);
1179                 if (NativeThread.isVirtualThread(writerThread)) {
1180                     Poller.stopPoll(fdVal, Net.POLLOUT);
1181                 }
1182                 isOutputClosed = true;
1183             }
1184         }
1185     }
1186 
1187     @Override
1188     protected boolean supportsUrgentData() {
1189         return true;
1190     }
1191 
1192     @Override
1193     protected void sendUrgentData(int data) throws IOException {
1194         writeLock.lock();
1195         try {
1196             int n = 0;
1197             FileDescriptor fd = beginWrite();
1198             try {
1199                 configureNonBlockingIfNeeded(fd, false);
1200                 do {
1201                     n = Net.sendOOB(fd, (byte) data);
1202                 } while (n == IOStatus.INTERRUPTED && isOpen());
1203                 if (n == IOStatus.UNAVAILABLE) {
1204                     throw new SocketException("No buffer space available");
1205                 }
1206             } finally {
1207                 endWrite(n > 0);
1208             }
1209         } finally {
1210             writeLock.unlock();
1211         }
1212     }
1213 
1214     /**
1215      * Returns an action to close the given file descriptor.
1216      */
1217     private static Runnable closerFor(FileDescriptor fd, boolean stream) {
1218         if (stream) {
1219             return () -> {
1220                 try {
1221                     nd.close(fd);
1222                 } catch (IOException ioe) {
1223                     throw new UncheckedIOException(ioe);
1224                 }
1225             };
1226         } else {
1227             return () -> {
1228                 try {
1229                     nd.close(fd);
1230                 } catch (IOException ioe) {
1231                     throw new UncheckedIOException(ioe);
1232                 } finally {
1233                     // decrement
1234                     ResourceManager.afterUdpClose();
1235                 }
1236             };
1237         }
1238     }
1239 
1240     /**
1241      * Attempts to acquire the given lock within the given waiting time.
1242      * @return the remaining time in nanoseconds when the lock is acquired, zero
1243      *         or less if the lock was not acquired before the timeout expired
1244      */
1245     private static long tryLock(ReentrantLock lock, long timeout, TimeUnit unit) {
1246         assert timeout > 0;
1247         boolean interrupted = false;
1248         long nanos = NANOSECONDS.convert(timeout, unit);
1249         long remainingNanos = nanos;
1250         long startNanos = System.nanoTime();
1251         boolean acquired = false;
1252         while (!acquired && (remainingNanos > 0)) {
1253             try {
1254                 acquired = lock.tryLock(remainingNanos, NANOSECONDS);
1255             } catch (InterruptedException e) {
1256                 interrupted = true;
1257             }
1258             remainingNanos = nanos - (System.nanoTime() - startNanos);
1259         }
1260         if (acquired && remainingNanos <= 0L)
1261             lock.unlock();  // release lock if timeout has expired
1262         if (interrupted)
1263             Thread.currentThread().interrupt();
1264         return remainingNanos;
1265     }
1266 
1267     /**
1268      * Returns the socket protocol family.
1269      */
1270     private static ProtocolFamily family() {
1271         if (Net.isIPv6Available()) {
1272             return StandardProtocolFamily.INET6;
1273         } else {
1274             return StandardProtocolFamily.INET;
1275         }
1276     }
1277 }
--- EOF ---