< prev index next >

src/java.base/share/classes/sun/nio/ch/NioSocketImpl.java

Print this page
*** 26,10 ***
--- 26,11 ---
  package sun.nio.ch;
  
  import java.io.FileDescriptor;
  import java.io.IOException;
  import java.io.InputStream;
+ import java.io.InterruptedIOException;
  import java.io.OutputStream;
  import java.io.UncheckedIOException;
  import java.lang.ref.Cleaner.Cleanable;
  import java.net.InetAddress;
  import java.net.InetSocketAddress;

*** 48,10 ***
--- 49,11 ---
  import java.util.Objects;
  import java.util.Set;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.locks.ReentrantLock;
  
+ import jdk.internal.misc.VirtualThreads;
  import jdk.internal.ref.CleanerFactory;
  import sun.net.ConnectionResetException;
  import sun.net.NetHooks;
  import sun.net.PlatformSocketImpl;
  import sun.net.ResourceManager;

*** 62,17 ***
  import static java.util.concurrent.TimeUnit.NANOSECONDS;
  
  /**
   * NIO based SocketImpl.
   *
!  * The underlying socket used by this SocketImpl is initially configured
!  * blocking. If the connect method is used to establish a connection with a
!  * timeout then the socket is configured non-blocking for the connect attempt,
!  * and then restored to blocking mode when the connection is established.
!  * If the accept or read methods are used with a timeout then the socket is
-  * configured non-blocking and is never restored. When in non-blocking mode,
-  * operations that don't complete immediately will poll the socket and preserve
   * the semantics of blocking operations.
   */
  
  public final class NioSocketImpl extends SocketImpl implements PlatformSocketImpl {
      private static final NativeDispatcher nd = new SocketDispatcher();
--- 64,15 ---
  import static java.util.concurrent.TimeUnit.NANOSECONDS;
  
  /**
   * NIO based SocketImpl.
   *
!  * The underlying socket used by this SocketImpl is initially configured blocking.
!  * If a connect, accept or read is attempted with a timeout, or a virtual
!  * thread invokes a blocking operation, then the socket is changed to non-blocking
!  * When in non-blocking mode, operations that don't complete immediately will
!  * poll the socket (or park when invoked on a virtual thread) and preserve
   * the semantics of blocking operations.
   */
  
  public final class NioSocketImpl extends SocketImpl implements PlatformSocketImpl {
      private static final NativeDispatcher nd = new SocketDispatcher();

*** 98,10 ***
--- 98,13 ---
      private static final int ST_CONNECTED = 3;
      private static final int ST_CLOSING = 4;
      private static final int ST_CLOSED = 5;
      private volatile int state;  // need stateLock to change
  
+     // The file descriptor value
+     private int fdVal;
+ 
      // set by SocketImpl.create, protected by stateLock
      private boolean stream;
      private Cleanable cleaner;
  
      // set to true when the socket is in non-blocking mode

*** 161,23 ***
          if (state > ST_CONNECTED)
              throw new SocketException("Socket closed");
      }
  
      /**
!      * Disables the current thread for scheduling purposes until the socket is
!      * ready for I/O, or is asynchronously closed, for up to the specified
!      * waiting time.
       * @throws IOException if an I/O error occurs
       */
      private void park(FileDescriptor fd, int event, long nanos) throws IOException {
!         long millis;
!         if (nanos == 0) {
!             millis = -1;
          } else {
!             millis = NANOSECONDS.toMillis(nanos);
          }
-         Net.poll(fd, event, millis);
      }
  
      /**
       * Disables the current thread for scheduling purposes until the socket is
       * ready for I/O or is asynchronously closed.
--- 164,42 ---
          if (state > ST_CONNECTED)
              throw new SocketException("Socket closed");
      }
  
      /**
!      * Disables the current thread for scheduling purposes until the
!      * socket is ready for I/O or is asynchronously closed, for up to the
!      * specified waiting time.
       * @throws IOException if an I/O error occurs
       */
      private void park(FileDescriptor fd, int event, long nanos) throws IOException {
!         Thread t = Thread.currentThread();
!         if (t.isVirtual()) {
!             Poller.register(fdVal, event);
+             try {
+                 if (isOpen()) {
+                     if (nanos == 0) {
+                         VirtualThreads.park();
+                     } else {
+                         VirtualThreads.park(nanos);
+                     }
+                     if (t.isInterrupted()) {
+                         throw new InterruptedIOException();
+                     }
+                 }
+             } finally {
+                 Poller.deregister(fdVal, event);
+             }
          } else {
!             long millis;
+             if (nanos == 0) {
+                 millis = -1;
+             } else {
+                 millis = NANOSECONDS.toMillis(nanos);
+             }
+             Net.poll(fd, event, millis);
          }
      }
  
      /**
       * Disables the current thread for scheduling purposes until the socket is
       * ready for I/O or is asynchronously closed.

*** 186,38 ***
      private void park(FileDescriptor fd, int event) throws IOException {
          park(fd, event, 0);
      }
  
      /**
!      * Configures the socket to blocking mode. This method is a no-op if the
!      * socket is already in blocking mode.
!      * @throws IOException if closed or there is an I/O error changing the mode
-      */
-     private void configureBlocking(FileDescriptor fd) throws IOException {
-         assert readLock.isHeldByCurrentThread();
-         if (nonBlocking) {
-             synchronized (stateLock) {
-                 ensureOpen();
-                 IOUtil.configureBlocking(fd, true);
-                 nonBlocking = false;
-             }
-         }
-     }
- 
-     /**
-      * Configures the socket to non-blocking mode. This method is a no-op if the
-      * socket is already in non-blocking mode.
-      * @throws IOException if closed or there is an I/O error changing the mode
       */
!     private void configureNonBlocking(FileDescriptor fd) throws IOException {
!         assert readLock.isHeldByCurrentThread();
!         if (!nonBlocking) {
!             synchronized (stateLock) {
!                 ensureOpen();
!                 IOUtil.configureBlocking(fd, false);
!                 nonBlocking = true;
!             }
          }
      }
  
      /**
       * Marks the beginning of a read operation that might block.
--- 208,22 ---
      private void park(FileDescriptor fd, int event) throws IOException {
          park(fd, event, 0);
      }
  
      /**
!      * Ensures that the socket is configured non-blocking invoked on a virtual
!      * thread or the operation has a timeout
!      * @throws IOException if there is an I/O error changing the blocking mode
       */
!     private void configureNonBlockingIfNeeded(FileDescriptor fd, boolean timed)
!         throws IOException
!     {
!         if (!nonBlocking
!             && (timed || Thread.currentThread().isVirtual())) {
!             assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
!             IOUtil.configureBlocking(fd, false);
!             nonBlocking = true;
          }
      }
  
      /**
       * Marks the beginning of a read operation that might block.

*** 298,24 ***
              if (connectionReset)
                  throw new SocketException("Connection reset");
              if (isInputClosed)
                  return -1;
              int timeout = this.timeout;
              if (timeout > 0) {
                  // read with timeout
-                 configureNonBlocking(fd);
                  n = timedRead(fd, b, off, len, MILLISECONDS.toNanos(timeout));
              } else {
                  // read, no timeout
                  n = tryRead(fd, b, off, len);
                  while (IOStatus.okayToRetry(n) && isOpen()) {
                      park(fd, Net.POLLIN);
                      n = tryRead(fd, b, off, len);
                  }
              }
              return n;
!         } catch (SocketTimeoutException e) {
              throw e;
          } catch (ConnectionResetException e) {
              connectionReset = true;
              throw new SocketException("Connection reset");
          } catch (IOException ioe) {
--- 304,24 ---
              if (connectionReset)
                  throw new SocketException("Connection reset");
              if (isInputClosed)
                  return -1;
              int timeout = this.timeout;
+             configureNonBlockingIfNeeded(fd, timeout > 0);
              if (timeout > 0) {
                  // read with timeout
                  n = timedRead(fd, b, off, len, MILLISECONDS.toNanos(timeout));
              } else {
                  // read, no timeout
                  n = tryRead(fd, b, off, len);
                  while (IOStatus.okayToRetry(n) && isOpen()) {
                      park(fd, Net.POLLIN);
                      n = tryRead(fd, b, off, len);
                  }
              }
              return n;
!         } catch (InterruptedIOException e) {
              throw e;
          } catch (ConnectionResetException e) {
              connectionReset = true;
              throw new SocketException("Connection reset");
          } catch (IOException ioe) {

*** 405,16 ***
--- 411,19 ---
       */
      private int implWrite(byte[] b, int off, int len) throws IOException {
          int n = 0;
          FileDescriptor fd = beginWrite();
          try {
+             configureNonBlockingIfNeeded(fd, false);
              n = tryWrite(fd, b, off, len);
              while (IOStatus.okayToRetry(n) && isOpen()) {
                  park(fd, Net.POLLOUT);
                  n = tryWrite(fd, b, off, len);
              }
              return n;
+         } catch (InterruptedIOException e) {
+             throw e;
          } catch (IOException ioe) {
              throw new SocketException(ioe.getMessage());
          } finally {
              endWrite(n > 0);
          }

*** 467,10 ***
--- 476,11 ---
                      ResourceManager.afterUdpClose();
                  throw ioe;
              }
              Runnable closer = closerFor(fd, stream);
              this.fd = fd;
+             this.fdVal = IOUtil.fdVal(fd);
              this.stream = stream;
              this.cleaner = CleanerFactory.cleaner().register(this, closer);
              this.state = ST_UNCONNECTED;
          }
      }

*** 574,16 ***
              connectLock.lock();
              try {
                  boolean connected = false;
                  FileDescriptor fd = beginConnect(address, port);
                  try {
! 
-                     // configure socket to non-blocking mode when there is a timeout
-                     if (millis > 0) {
-                         configureNonBlocking(fd);
-                     }
- 
                      int n = Net.connect(fd, address, port);
                      if (n > 0) {
                          // connection established
                          connected = true;
                      } else {
--- 584,11 ---
              connectLock.lock();
              try {
                  boolean connected = false;
                  FileDescriptor fd = beginConnect(address, port);
                  try {
!                     configureNonBlockingIfNeeded(fd, millis > 0);
                      int n = Net.connect(fd, address, port);
                      if (n > 0) {
                          // connection established
                          connected = true;
                      } else {

*** 600,25 ***
                                  polled = Net.pollConnectNow(fd);
                              }
                              connected = polled && isOpen();
                          }
                      }
- 
-                     // restore socket to blocking mode
-                     if (connected && millis > 0) {
-                         configureBlocking(fd);
-                     }
- 
                  } finally {
                      endConnect(fd, connected);
                  }
              } finally {
                  connectLock.unlock();
              }
          } catch (IOException ioe) {
              close();
!             throw SocketExceptions.of(ioe, isa);
          }
      }
  
      @Override
      protected void connect(String host, int port) throws IOException {
--- 605,23 ---
                                  polled = Net.pollConnectNow(fd);
                              }
                              connected = polled && isOpen();
                          }
                      }
                  } finally {
                      endConnect(fd, connected);
                  }
              } finally {
                  connectLock.unlock();
              }
          } catch (IOException ioe) {
              close();
!             if (ioe instanceof InterruptedIOException) {
+                 throw ioe;
+             } else {
+                 throw SocketExceptions.of(ioe, isa);
+             }
          }
      }
  
      @Override
      protected void connect(String host, int port) throws IOException {

*** 741,13 ***
          // accept a connection
          try {
              int n = 0;
              FileDescriptor fd = beginAccept();
              try {
                  if (remainingNanos > 0) {
                      // accept with timeout
-                     configureNonBlocking(fd);
                      n = timedAccept(fd, newfd, isaa, remainingNanos);
                  } else {
                      // accept, no timeout
                      n = Net.accept(fd, newfd, isaa);
                      while (IOStatus.okayToRetry(n) && isOpen()) {
--- 744,13 ---
          // accept a connection
          try {
              int n = 0;
              FileDescriptor fd = beginAccept();
              try {
+                 configureNonBlockingIfNeeded(fd, remainingNanos > 0);
                  if (remainingNanos > 0) {
                      // accept with timeout
                      n = timedAccept(fd, newfd, isaa, remainingNanos);
                  } else {
                      // accept, no timeout
                      n = Net.accept(fd, newfd, isaa);
                      while (IOStatus.okayToRetry(n) && isOpen()) {

*** 775,10 ***
--- 778,11 ---
  
          // set the fields
          Runnable closer = closerFor(newfd, true);
          synchronized (nsi.stateLock) {
              nsi.fd = newfd;
+             nsi.fdVal = IOUtil.fdVal(newfd);
              nsi.stream = true;
              nsi.cleaner = CleanerFactory.cleaner().register(nsi, closer);
              nsi.localport = localAddress.getPort();
              nsi.address = isaa[0].getAddress();
              nsi.port = isaa[0].getPort();

*** 885,31 ***
              if (state == ST_NEW) {
                  // stillborn
                  this.state = ST_CLOSED;
                  return;
              }
              this.state = ST_CLOSING;
  
              // shutdown output when linger interval not set to 0
!             try {
!                 var SO_LINGER = StandardSocketOptions.SO_LINGER;
!                 if ((int) Net.getSocketOption(fd, SO_LINGER) != 0) {
!                     Net.shutdown(fd, Net.SHUT_WR);
!                 }
!             } catch (IOException ignore) { }
  
              // attempt to close the socket. If there are I/O operations in progress
              // then the socket is pre-closed and the thread(s) signalled. The
              // last thread will close the file descriptor.
              if (!tryClose()) {
-                 nd.preClose(fd);
                  long reader = readerThread;
-                 if (reader != 0)
-                     NativeThread.signal(reader);
                  long writer = writerThread;
!                 if (writer != 0)
!                     NativeThread.signal(writer);
              }
          }
      }
  
      // the socket options supported by client and server sockets
--- 889,41 ---
              if (state == ST_NEW) {
                  // stillborn
                  this.state = ST_CLOSED;
                  return;
              }
+             boolean connected = (state == ST_CONNECTED);
              this.state = ST_CLOSING;
  
              // shutdown output when linger interval not set to 0
!             if (connected) {
!                 try {
!                     var SO_LINGER = StandardSocketOptions.SO_LINGER;
!                     if ((int) Net.getSocketOption(fd, SO_LINGER) != 0) {
!                         Net.shutdown(fd, Net.SHUT_WR);
!                     }
+                 } catch (IOException ignore) { }
+             }
  
              // attempt to close the socket. If there are I/O operations in progress
              // then the socket is pre-closed and the thread(s) signalled. The
              // last thread will close the file descriptor.
              if (!tryClose()) {
                  long reader = readerThread;
                  long writer = writerThread;
!                 if (NativeThread.isVirtualThread(reader)
!                         || NativeThread.isVirtualThread(writer)) {
+                     Poller.stopPoll(fdVal);
+                 }
+                 if (NativeThread.isKernelThread(reader)
+                         || NativeThread.isKernelThread(writer)) {
+                     nd.preClose(fd);
+                     if (NativeThread.isKernelThread(reader))
+                         NativeThread.signal(reader);
+                     if (NativeThread.isKernelThread(writer))
+                         NativeThread.signal(writer);
+                 }
              }
          }
      }
  
      // the socket options supported by client and server sockets

*** 1146,10 ***
--- 1160,13 ---
      protected void shutdownInput() throws IOException {
          synchronized (stateLock) {
              ensureOpenAndConnected();
              if (!isInputClosed) {
                  Net.shutdown(fd, Net.SHUT_RD);
+                 if (NativeThread.isVirtualThread(readerThread)) {
+                     Poller.stopPoll(fdVal, Net.POLLIN);
+                 }
                  isInputClosed = true;
              }
          }
      }
  

*** 1157,10 ***
--- 1174,13 ---
      protected void shutdownOutput() throws IOException {
          synchronized (stateLock) {
              ensureOpenAndConnected();
              if (!isOutputClosed) {
                  Net.shutdown(fd, Net.SHUT_WR);
+                 if (NativeThread.isVirtualThread(writerThread)) {
+                     Poller.stopPoll(fdVal, Net.POLLOUT);
+                 }
                  isOutputClosed = true;
              }
          }
      }
  

*** 1174,10 ***
--- 1194,11 ---
          writeLock.lock();
          try {
              int n = 0;
              FileDescriptor fd = beginWrite();
              try {
+                 configureNonBlockingIfNeeded(fd, false);
                  do {
                      n = Net.sendOOB(fd, (byte) data);
                  } while (n == IOStatus.INTERRUPTED && isOpen());
                  if (n == IOStatus.UNAVAILABLE) {
                      throw new SocketException("No buffer space available");
< prev index next >