< prev index next >

src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java

Print this page
@@ -121,10 +121,13 @@
      private SocketAddress remoteAddress;
  
      // Socket adaptor, created on demand
      private Socket socket;
  
+     // lazily set to true when the socket is configured non-blocking
+     private volatile boolean nonBlocking;
+ 
      // -- End of fields protected by stateLock
  
      SocketChannelImpl(SelectorProvider sp) throws IOException {
          this(sp, Net.isIPv6Available() ? INET6 : INET);
      }

@@ -412,10 +415,11 @@
  
                  // check if input is shutdown
                  if (isInputClosed)
                      return IOStatus.EOF;
  
+                 configureNonBlockingIfNeeded();
                  n = IOUtil.read(fd, buf, -1, nd);
                  if (blocking) {
                      while (IOStatus.okayToRetry(n) && isOpen()) {
                          park(Net.POLLIN);
                          n = IOUtil.read(fd, buf, -1, nd);

@@ -455,10 +459,11 @@
  
                  // check if input is shutdown
                  if (isInputClosed)
                      return IOStatus.EOF;
  
+                 configureNonBlockingIfNeeded();
                  n = IOUtil.read(fd, dsts, offset, length, nd);
                  if (blocking) {
                      while (IOStatus.okayToRetry(n) && isOpen()) {
                          park(Net.POLLIN);
                          n = IOUtil.read(fd, dsts, offset, length, nd);

@@ -527,10 +532,11 @@
              ensureOpenAndConnected();
              boolean blocking = isBlocking();
              int n = 0;
              try {
                  beginWrite(blocking);
+                 configureNonBlockingIfNeeded();
                  n = IOUtil.write(fd, buf, -1, nd);
                  if (blocking) {
                      while (IOStatus.okayToRetry(n) && isOpen()) {
                          park(Net.POLLOUT);
                          n = IOUtil.write(fd, buf, -1, nd);

@@ -558,10 +564,11 @@
              ensureOpenAndConnected();
              boolean blocking = isBlocking();
              long n = 0;
              try {
                  beginWrite(blocking);
+                 configureNonBlockingIfNeeded();
                  n = IOUtil.write(fd, srcs, offset, length, nd);
                  if (blocking) {
                      while (IOStatus.okayToRetry(n) && isOpen()) {
                          park(Net.POLLOUT);
                          n = IOUtil.write(fd, srcs, offset, length, nd);

@@ -587,16 +594,16 @@
              ensureOpenAndConnected();
              boolean blocking = isBlocking();
              int n = 0;
              try {
                  beginWrite(blocking);
-                 if (blocking) {
-                     do {
-                         n = Net.sendOOB(fd, b);
-                     } while (n == IOStatus.INTERRUPTED && isOpen());
-                 } else {
+                 configureNonBlockingIfNeeded();
+                 do {
                      n = Net.sendOOB(fd, b);
+                 } while (n == IOStatus.INTERRUPTED && isOpen());
+                 if (blocking && n == IOStatus.UNAVAILABLE) {
+                     throw new SocketException("No buffer space available");
                  }
              } finally {
                  endWrite(blocking, n > 0);
                  if (n <= 0 && isOutputClosed)
                      throw new AsynchronousCloseException();

@@ -621,39 +628,55 @@
              readLock.unlock();
          }
      }
  
      /**
-      * Adjusts the blocking mode. readLock or writeLock must already be held.
+      * Adjust the blocking mode while holding readLock or writeLock.
       */
      private void lockedConfigureBlocking(boolean block) throws IOException {
          assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
          synchronized (stateLock) {
              ensureOpen();
-             IOUtil.configureBlocking(fd, block);
+             // do nothing if virtual thread has forced the socket to be non-blocking
+             if (!nonBlocking) {
+                 IOUtil.configureBlocking(fd, block);
+             }
          }
      }
  
      /**
-      * Adjusts the blocking mode if the channel is open. readLock or writeLock
-      * must already be held.
-      *
-      * @return {@code true} if the blocking mode was adjusted, {@code false} if
-      *         the blocking mode was not adjusted because the channel is closed
+      * Attempts to adjusts the blocking mode if the channel is open.
+      * @return {@code true} if the blocking mode was adjusted
       */
      private boolean tryLockedConfigureBlocking(boolean block) throws IOException {
          assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
          synchronized (stateLock) {
-             if (isOpen()) {
+             // do nothing if virtual thread has forced the socket to be non-blocking
+             if (!nonBlocking && isOpen()) {
                  IOUtil.configureBlocking(fd, block);
                  return true;
              } else {
                  return false;
              }
          }
      }
  
+     /**
+      * Ensures that the socket is configured non-blocking when on a virtual
+      * thread.
+      */
+     private void configureNonBlockingIfNeeded() throws IOException {
+         assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
+         if (!nonBlocking && Thread.currentThread().isVirtual()) {
+             synchronized (stateLock) {
+                 ensureOpen();
+                 IOUtil.configureBlocking(fd, false);
+                 nonBlocking = true;
+             }
+         }
+     }
+ 
      /**
       * Returns the local address, or null if not bound
       */
      SocketAddress localAddress() {
          synchronized (stateLock) {

@@ -844,10 +867,11 @@
                  try {
                      boolean blocking = isBlocking();
                      boolean connected = false;
                      try {
                          beginConnect(blocking, sa);
+                         configureNonBlockingIfNeeded();
                          int n;
                          if (isUnixSocket()) {
                              n = UnixDomainSockets.connect(fd, sa);
                          } else {
                              n = Net.connect(family, fd, sa);

@@ -1007,19 +1031,36 @@
       * mode before the key is flushed from the Selector.
       */
      private void implCloseBlockingMode() throws IOException {
          synchronized (stateLock) {
              assert state < ST_CLOSING;
+             boolean connected = (state == ST_CONNECTED);
              state = ST_CLOSING;
+ 
              if (!tryClose()) {
+                 // shutdown output when linger interval not set to 0
+                 if (connected) {
+                     try {
+                         var SO_LINGER = StandardSocketOptions.SO_LINGER;
+                         if ((int) Net.getSocketOption(fd, SO_LINGER) != 0) {
+                             Net.shutdown(fd, Net.SHUT_WR);
+                         }
+                     } catch (IOException ignore) { }
+                 }
+ 
                  long reader = readerThread;
                  long writer = writerThread;
-                 if (reader != 0 || writer != 0) {
+                 if (NativeThread.isVirtualThread(reader)
+                         || NativeThread.isVirtualThread(writer)) {
+                     Poller.stopPoll(fdVal);
+                 }
+                 if (NativeThread.isKernelThread(reader)
+                         || NativeThread.isKernelThread(writer)) {
                      nd.preClose(fd);
-                     if (reader != 0)
+                     if (NativeThread.isKernelThread(reader))
                          NativeThread.signal(reader);
-                     if (writer != 0)
+                     if (NativeThread.isKernelThread(writer))
                          NativeThread.signal(writer);
                  }
              }
          }
      }

@@ -1096,13 +1137,16 @@
              ensureOpen();
              if (!isConnected())
                  throw new NotYetConnectedException();
              if (!isInputClosed) {
                  Net.shutdown(fd, Net.SHUT_RD);
-                 long thread = readerThread;
-                 if (thread != 0)
-                     NativeThread.signal(thread);
+                 long reader = readerThread;
+                 if (NativeThread.isVirtualThread(reader)) {
+                     Poller.stopPoll(fdVal, Net.POLLIN);
+                 } else if (NativeThread.isKernelThread(reader)) {
+                     NativeThread.signal(reader);
+                 }
                  isInputClosed = true;
              }
              return this;
          }
      }

@@ -1113,13 +1157,16 @@
              ensureOpen();
              if (!isConnected())
                  throw new NotYetConnectedException();
              if (!isOutputClosed) {
                  Net.shutdown(fd, Net.SHUT_WR);
-                 long thread = writerThread;
-                 if (thread != 0)
-                     NativeThread.signal(thread);
+                 long writer = writerThread;
+                 if (NativeThread.isVirtualThread(writer)) {
+                     Poller.stopPoll(fdVal, Net.POLLOUT);
+                 } else if (NativeThread.isKernelThread(writer)) {
+                     NativeThread.signal(writer);
+                 }
                  isOutputClosed = true;
              }
              return this;
          }
      }

@@ -1280,10 +1327,11 @@
                          // restore socket to blocking mode (if channel is open)
                          tryLockedConfigureBlocking(true);
                      }
                  } else {
                      // read, no timeout
+                     configureNonBlockingIfNeeded();
                      n = tryRead(b, off, len);
                      while (IOStatus.okayToRetry(n) && isOpen()) {
                          park(Net.POLLIN);
                          n = tryRead(b, off, len);
                      }

@@ -1341,10 +1389,11 @@
              // loop until all bytes have been written
              int pos = off;
              int end = off + len;
              try {
                  beginWrite(true);
+                 configureNonBlockingIfNeeded();
                  while (pos < end && isOpen()) {
                      int size = end - pos;
                      int n = tryWrite(b, pos, size);
                      while (IOStatus.okayToRetry(n) && isOpen()) {
                          park(Net.POLLOUT);
< prev index next >