< prev index next >

src/java.base/share/classes/sun/nio/ch/NioSocketImpl.java

Print this page
@@ -26,10 +26,11 @@
  package sun.nio.ch;
  
  import java.io.FileDescriptor;
  import java.io.IOException;
  import java.io.InputStream;
+ import java.io.InterruptedIOException;
  import java.io.OutputStream;
  import java.io.UncheckedIOException;
  import java.lang.ref.Cleaner.Cleanable;
  import java.net.InetAddress;
  import java.net.InetSocketAddress;

@@ -48,10 +49,11 @@
  import java.util.Objects;
  import java.util.Set;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.locks.ReentrantLock;
  
+ import jdk.internal.misc.VirtualThreads;
  import jdk.internal.ref.CleanerFactory;
  import sun.net.ConnectionResetException;
  import sun.net.NetHooks;
  import sun.net.PlatformSocketImpl;
  import sun.net.ResourceManager;

@@ -62,17 +64,15 @@
  import static java.util.concurrent.TimeUnit.NANOSECONDS;
  
  /**
   * NIO based SocketImpl.
   *
-  * The underlying socket used by this SocketImpl is initially configured
-  * blocking. If the connect method is used to establish a connection with a
-  * timeout then the socket is configured non-blocking for the connect attempt,
-  * and then restored to blocking mode when the connection is established.
-  * If the accept or read methods are used with a timeout then the socket is
-  * configured non-blocking and is never restored. When in non-blocking mode,
-  * operations that don't complete immediately will poll the socket and preserve
+  * The underlying socket used by this SocketImpl is initially configured blocking.
+  * If a connect, accept or read is attempted with a timeout, or a virtual
+  * thread invokes a blocking operation, then the socket is changed to non-blocking
+  * When in non-blocking mode, operations that don't complete immediately will
+  * poll the socket (or park when invoked on a virtual thread) and preserve
   * the semantics of blocking operations.
   */
  
  public final class NioSocketImpl extends SocketImpl implements PlatformSocketImpl {
      private static final NativeDispatcher nd = new SocketDispatcher();

@@ -98,10 +98,13 @@
      private static final int ST_CONNECTED = 3;
      private static final int ST_CLOSING = 4;
      private static final int ST_CLOSED = 5;
      private volatile int state;  // need stateLock to change
  
+     // The file descriptor value
+     private int fdVal;
+ 
      // set by SocketImpl.create, protected by stateLock
      private boolean stream;
      private Cleanable cleaner;
  
      // set to true when the socket is in non-blocking mode

@@ -161,23 +164,42 @@
          if (state > ST_CONNECTED)
              throw new SocketException("Socket closed");
      }
  
      /**
-      * Disables the current thread for scheduling purposes until the socket is
-      * ready for I/O, or is asynchronously closed, for up to the specified
-      * waiting time.
+      * Disables the current thread for scheduling purposes until the
+      * socket is ready for I/O or is asynchronously closed, for up to the
+      * specified waiting time.
       * @throws IOException if an I/O error occurs
       */
      private void park(FileDescriptor fd, int event, long nanos) throws IOException {
-         long millis;
-         if (nanos == 0) {
-             millis = -1;
+         Thread t = Thread.currentThread();
+         if (t.isVirtual()) {
+             Poller.register(fdVal, event);
+             try {
+                 if (isOpen()) {
+                     if (nanos == 0) {
+                         VirtualThreads.park();
+                     } else {
+                         VirtualThreads.park(nanos);
+                     }
+                     if (t.isInterrupted()) {
+                         throw new InterruptedIOException();
+                     }
+                 }
+             } finally {
+                 Poller.deregister(fdVal, event);
+             }
          } else {
-             millis = NANOSECONDS.toMillis(nanos);
+             long millis;
+             if (nanos == 0) {
+                 millis = -1;
+             } else {
+                 millis = NANOSECONDS.toMillis(nanos);
+             }
+             Net.poll(fd, event, millis);
          }
-         Net.poll(fd, event, millis);
      }
  
      /**
       * Disables the current thread for scheduling purposes until the socket is
       * ready for I/O or is asynchronously closed.

@@ -186,38 +208,22 @@
      private void park(FileDescriptor fd, int event) throws IOException {
          park(fd, event, 0);
      }
  
      /**
-      * Configures the socket to blocking mode. This method is a no-op if the
-      * socket is already in blocking mode.
-      * @throws IOException if closed or there is an I/O error changing the mode
-      */
-     private void configureBlocking(FileDescriptor fd) throws IOException {
-         assert readLock.isHeldByCurrentThread();
-         if (nonBlocking) {
-             synchronized (stateLock) {
-                 ensureOpen();
-                 IOUtil.configureBlocking(fd, true);
-                 nonBlocking = false;
-             }
-         }
-     }
- 
-     /**
-      * Configures the socket to non-blocking mode. This method is a no-op if the
-      * socket is already in non-blocking mode.
-      * @throws IOException if closed or there is an I/O error changing the mode
+      * Ensures that the socket is configured non-blocking invoked on a virtual
+      * thread or the operation has a timeout
+      * @throws IOException if there is an I/O error changing the blocking mode
       */
-     private void configureNonBlocking(FileDescriptor fd) throws IOException {
-         assert readLock.isHeldByCurrentThread();
-         if (!nonBlocking) {
-             synchronized (stateLock) {
-                 ensureOpen();
-                 IOUtil.configureBlocking(fd, false);
-                 nonBlocking = true;
-             }
+     private void configureNonBlockingIfNeeded(FileDescriptor fd, boolean timed)
+         throws IOException
+     {
+         if (!nonBlocking
+             && (timed || Thread.currentThread().isVirtual())) {
+             assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
+             IOUtil.configureBlocking(fd, false);
+             nonBlocking = true;
          }
      }
  
      /**
       * Marks the beginning of a read operation that might block.

@@ -298,24 +304,24 @@
              if (connectionReset)
                  throw new SocketException("Connection reset");
              if (isInputClosed)
                  return -1;
              int timeout = this.timeout;
+             configureNonBlockingIfNeeded(fd, timeout > 0);
              if (timeout > 0) {
                  // read with timeout
-                 configureNonBlocking(fd);
                  n = timedRead(fd, b, off, len, MILLISECONDS.toNanos(timeout));
              } else {
                  // read, no timeout
                  n = tryRead(fd, b, off, len);
                  while (IOStatus.okayToRetry(n) && isOpen()) {
                      park(fd, Net.POLLIN);
                      n = tryRead(fd, b, off, len);
                  }
              }
              return n;
-         } catch (SocketTimeoutException e) {
+         } catch (InterruptedIOException e) {
              throw e;
          } catch (ConnectionResetException e) {
              connectionReset = true;
              throw new SocketException("Connection reset");
          } catch (IOException ioe) {

@@ -405,16 +411,19 @@
       */
      private int implWrite(byte[] b, int off, int len) throws IOException {
          int n = 0;
          FileDescriptor fd = beginWrite();
          try {
+             configureNonBlockingIfNeeded(fd, false);
              n = tryWrite(fd, b, off, len);
              while (IOStatus.okayToRetry(n) && isOpen()) {
                  park(fd, Net.POLLOUT);
                  n = tryWrite(fd, b, off, len);
              }
              return n;
+         } catch (InterruptedIOException e) {
+             throw e;
          } catch (IOException ioe) {
              throw new SocketException(ioe.getMessage());
          } finally {
              endWrite(n > 0);
          }

@@ -467,10 +476,11 @@
                      ResourceManager.afterUdpClose();
                  throw ioe;
              }
              Runnable closer = closerFor(fd, stream);
              this.fd = fd;
+             this.fdVal = IOUtil.fdVal(fd);
              this.stream = stream;
              this.cleaner = CleanerFactory.cleaner().register(this, closer);
              this.state = ST_UNCONNECTED;
          }
      }

@@ -574,16 +584,11 @@
              connectLock.lock();
              try {
                  boolean connected = false;
                  FileDescriptor fd = beginConnect(address, port);
                  try {
- 
-                     // configure socket to non-blocking mode when there is a timeout
-                     if (millis > 0) {
-                         configureNonBlocking(fd);
-                     }
- 
+                     configureNonBlockingIfNeeded(fd, millis > 0);
                      int n = Net.connect(fd, address, port);
                      if (n > 0) {
                          // connection established
                          connected = true;
                      } else {

@@ -600,25 +605,23 @@
                                  polled = Net.pollConnectNow(fd);
                              }
                              connected = polled && isOpen();
                          }
                      }
- 
-                     // restore socket to blocking mode
-                     if (connected && millis > 0) {
-                         configureBlocking(fd);
-                     }
- 
                  } finally {
                      endConnect(fd, connected);
                  }
              } finally {
                  connectLock.unlock();
              }
          } catch (IOException ioe) {
              close();
-             throw SocketExceptions.of(ioe, isa);
+             if (ioe instanceof InterruptedIOException) {
+                 throw ioe;
+             } else {
+                 throw SocketExceptions.of(ioe, isa);
+             }
          }
      }
  
      @Override
      protected void connect(String host, int port) throws IOException {

@@ -741,13 +744,13 @@
          // accept a connection
          try {
              int n = 0;
              FileDescriptor fd = beginAccept();
              try {
+                 configureNonBlockingIfNeeded(fd, remainingNanos > 0);
                  if (remainingNanos > 0) {
                      // accept with timeout
-                     configureNonBlocking(fd);
                      n = timedAccept(fd, newfd, isaa, remainingNanos);
                  } else {
                      // accept, no timeout
                      n = Net.accept(fd, newfd, isaa);
                      while (IOStatus.okayToRetry(n) && isOpen()) {

@@ -775,10 +778,11 @@
  
          // set the fields
          Runnable closer = closerFor(newfd, true);
          synchronized (nsi.stateLock) {
              nsi.fd = newfd;
+             nsi.fdVal = IOUtil.fdVal(newfd);
              nsi.stream = true;
              nsi.cleaner = CleanerFactory.cleaner().register(nsi, closer);
              nsi.localport = localAddress.getPort();
              nsi.address = isaa[0].getAddress();
              nsi.port = isaa[0].getPort();

@@ -885,31 +889,41 @@
              if (state == ST_NEW) {
                  // stillborn
                  this.state = ST_CLOSED;
                  return;
              }
+             boolean connected = (state == ST_CONNECTED);
              this.state = ST_CLOSING;
  
              // shutdown output when linger interval not set to 0
-             try {
-                 var SO_LINGER = StandardSocketOptions.SO_LINGER;
-                 if ((int) Net.getSocketOption(fd, SO_LINGER) != 0) {
-                     Net.shutdown(fd, Net.SHUT_WR);
-                 }
-             } catch (IOException ignore) { }
+             if (connected) {
+                 try {
+                     var SO_LINGER = StandardSocketOptions.SO_LINGER;
+                     if ((int) Net.getSocketOption(fd, SO_LINGER) != 0) {
+                         Net.shutdown(fd, Net.SHUT_WR);
+                     }
+                 } catch (IOException ignore) { }
+             }
  
              // attempt to close the socket. If there are I/O operations in progress
              // then the socket is pre-closed and the thread(s) signalled. The
              // last thread will close the file descriptor.
              if (!tryClose()) {
-                 nd.preClose(fd);
                  long reader = readerThread;
-                 if (reader != 0)
-                     NativeThread.signal(reader);
                  long writer = writerThread;
-                 if (writer != 0)
-                     NativeThread.signal(writer);
+                 if (NativeThread.isVirtualThread(reader)
+                         || NativeThread.isVirtualThread(writer)) {
+                     Poller.stopPoll(fdVal);
+                 }
+                 if (NativeThread.isKernelThread(reader)
+                         || NativeThread.isKernelThread(writer)) {
+                     nd.preClose(fd);
+                     if (NativeThread.isKernelThread(reader))
+                         NativeThread.signal(reader);
+                     if (NativeThread.isKernelThread(writer))
+                         NativeThread.signal(writer);
+                 }
              }
          }
      }
  
      // the socket options supported by client and server sockets

@@ -1146,10 +1160,13 @@
      protected void shutdownInput() throws IOException {
          synchronized (stateLock) {
              ensureOpenAndConnected();
              if (!isInputClosed) {
                  Net.shutdown(fd, Net.SHUT_RD);
+                 if (NativeThread.isVirtualThread(readerThread)) {
+                     Poller.stopPoll(fdVal, Net.POLLIN);
+                 }
                  isInputClosed = true;
              }
          }
      }
  

@@ -1157,10 +1174,13 @@
      protected void shutdownOutput() throws IOException {
          synchronized (stateLock) {
              ensureOpenAndConnected();
              if (!isOutputClosed) {
                  Net.shutdown(fd, Net.SHUT_WR);
+                 if (NativeThread.isVirtualThread(writerThread)) {
+                     Poller.stopPoll(fdVal, Net.POLLOUT);
+                 }
                  isOutputClosed = true;
              }
          }
      }
  

@@ -1174,10 +1194,11 @@
          writeLock.lock();
          try {
              int n = 0;
              FileDescriptor fd = beginWrite();
              try {
+                 configureNonBlockingIfNeeded(fd, false);
                  do {
                      n = Net.sendOOB(fd, (byte) data);
                  } while (n == IOStatus.INTERRUPTED && isOpen());
                  if (n == IOStatus.UNAVAILABLE) {
                      throw new SocketException("No buffer space available");
< prev index next >