< prev index next >

src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java

Print this page
*** 121,10 ***
--- 121,13 ---
      private SocketAddress remoteAddress;
  
      // Socket adaptor, created on demand
      private Socket socket;
  
+     // lazily set to true when the socket is configured non-blocking
+     private volatile boolean nonBlocking;
+ 
      // -- End of fields protected by stateLock
  
      SocketChannelImpl(SelectorProvider sp) throws IOException {
          this(sp, Net.isIPv6Available() ? INET6 : INET);
      }

*** 412,10 ***
--- 415,11 ---
  
                  // check if input is shutdown
                  if (isInputClosed)
                      return IOStatus.EOF;
  
+                 configureNonBlockingIfNeeded();
                  n = IOUtil.read(fd, buf, -1, nd);
                  if (blocking) {
                      while (IOStatus.okayToRetry(n) && isOpen()) {
                          park(Net.POLLIN);
                          n = IOUtil.read(fd, buf, -1, nd);

*** 455,10 ***
--- 459,11 ---
  
                  // check if input is shutdown
                  if (isInputClosed)
                      return IOStatus.EOF;
  
+                 configureNonBlockingIfNeeded();
                  n = IOUtil.read(fd, dsts, offset, length, nd);
                  if (blocking) {
                      while (IOStatus.okayToRetry(n) && isOpen()) {
                          park(Net.POLLIN);
                          n = IOUtil.read(fd, dsts, offset, length, nd);

*** 527,10 ***
--- 532,11 ---
              ensureOpenAndConnected();
              boolean blocking = isBlocking();
              int n = 0;
              try {
                  beginWrite(blocking);
+                 configureNonBlockingIfNeeded();
                  n = IOUtil.write(fd, buf, -1, nd);
                  if (blocking) {
                      while (IOStatus.okayToRetry(n) && isOpen()) {
                          park(Net.POLLOUT);
                          n = IOUtil.write(fd, buf, -1, nd);

*** 558,10 ***
--- 564,11 ---
              ensureOpenAndConnected();
              boolean blocking = isBlocking();
              long n = 0;
              try {
                  beginWrite(blocking);
+                 configureNonBlockingIfNeeded();
                  n = IOUtil.write(fd, srcs, offset, length, nd);
                  if (blocking) {
                      while (IOStatus.okayToRetry(n) && isOpen()) {
                          park(Net.POLLOUT);
                          n = IOUtil.write(fd, srcs, offset, length, nd);

*** 587,16 ***
              ensureOpenAndConnected();
              boolean blocking = isBlocking();
              int n = 0;
              try {
                  beginWrite(blocking);
!                 if (blocking) {
!                     do {
-                         n = Net.sendOOB(fd, b);
-                     } while (n == IOStatus.INTERRUPTED && isOpen());
-                 } else {
                      n = Net.sendOOB(fd, b);
                  }
              } finally {
                  endWrite(blocking, n > 0);
                  if (n <= 0 && isOutputClosed)
                      throw new AsynchronousCloseException();
--- 594,16 ---
              ensureOpenAndConnected();
              boolean blocking = isBlocking();
              int n = 0;
              try {
                  beginWrite(blocking);
!                 configureNonBlockingIfNeeded();
!                 do {
                      n = Net.sendOOB(fd, b);
+                 } while (n == IOStatus.INTERRUPTED && isOpen());
+                 if (blocking && n == IOStatus.UNAVAILABLE) {
+                     throw new SocketException("No buffer space available");
                  }
              } finally {
                  endWrite(blocking, n > 0);
                  if (n <= 0 && isOutputClosed)
                      throw new AsynchronousCloseException();

*** 621,39 ***
              readLock.unlock();
          }
      }
  
      /**
!      * Adjusts the blocking mode. readLock or writeLock must already be held.
       */
      private void lockedConfigureBlocking(boolean block) throws IOException {
          assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
          synchronized (stateLock) {
              ensureOpen();
!             IOUtil.configureBlocking(fd, block);
          }
      }
  
      /**
!      * Adjusts the blocking mode if the channel is open. readLock or writeLock
!      * must already be held.
-      *
-      * @return {@code true} if the blocking mode was adjusted, {@code false} if
-      *         the blocking mode was not adjusted because the channel is closed
       */
      private boolean tryLockedConfigureBlocking(boolean block) throws IOException {
          assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
          synchronized (stateLock) {
!             if (isOpen()) {
                  IOUtil.configureBlocking(fd, block);
                  return true;
              } else {
                  return false;
              }
          }
      }
  
      /**
       * Returns the local address, or null if not bound
       */
      SocketAddress localAddress() {
          synchronized (stateLock) {
--- 628,55 ---
              readLock.unlock();
          }
      }
  
      /**
!      * Adjust the blocking mode while holding readLock or writeLock.
       */
      private void lockedConfigureBlocking(boolean block) throws IOException {
          assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
          synchronized (stateLock) {
              ensureOpen();
!             // do nothing if virtual thread has forced the socket to be non-blocking
+             if (!nonBlocking) {
+                 IOUtil.configureBlocking(fd, block);
+             }
          }
      }
  
      /**
!      * Attempts to adjusts the blocking mode if the channel is open.
!      * @return {@code true} if the blocking mode was adjusted
       */
      private boolean tryLockedConfigureBlocking(boolean block) throws IOException {
          assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
          synchronized (stateLock) {
!             // do nothing if virtual thread has forced the socket to be non-blocking
+             if (!nonBlocking && isOpen()) {
                  IOUtil.configureBlocking(fd, block);
                  return true;
              } else {
                  return false;
              }
          }
      }
  
+     /**
+      * Ensures that the socket is configured non-blocking when on a virtual
+      * thread.
+      */
+     private void configureNonBlockingIfNeeded() throws IOException {
+         assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
+         if (!nonBlocking && Thread.currentThread().isVirtual()) {
+             synchronized (stateLock) {
+                 ensureOpen();
+                 IOUtil.configureBlocking(fd, false);
+                 nonBlocking = true;
+             }
+         }
+     }
+ 
      /**
       * Returns the local address, or null if not bound
       */
      SocketAddress localAddress() {
          synchronized (stateLock) {

*** 844,10 ***
--- 867,11 ---
                  try {
                      boolean blocking = isBlocking();
                      boolean connected = false;
                      try {
                          beginConnect(blocking, sa);
+                         configureNonBlockingIfNeeded();
                          int n;
                          if (isUnixSocket()) {
                              n = UnixDomainSockets.connect(fd, sa);
                          } else {
                              n = Net.connect(family, fd, sa);

*** 1007,19 ***
       * mode before the key is flushed from the Selector.
       */
      private void implCloseBlockingMode() throws IOException {
          synchronized (stateLock) {
              assert state < ST_CLOSING;
              state = ST_CLOSING;
              if (!tryClose()) {
                  long reader = readerThread;
                  long writer = writerThread;
!                 if (reader != 0 || writer != 0) {
                      nd.preClose(fd);
!                     if (reader != 0)
                          NativeThread.signal(reader);
!                     if (writer != 0)
                          NativeThread.signal(writer);
                  }
              }
          }
      }
--- 1031,36 ---
       * mode before the key is flushed from the Selector.
       */
      private void implCloseBlockingMode() throws IOException {
          synchronized (stateLock) {
              assert state < ST_CLOSING;
+             boolean connected = (state == ST_CONNECTED);
              state = ST_CLOSING;
+ 
              if (!tryClose()) {
+                 // shutdown output when linger interval not set to 0
+                 if (connected) {
+                     try {
+                         var SO_LINGER = StandardSocketOptions.SO_LINGER;
+                         if ((int) Net.getSocketOption(fd, SO_LINGER) != 0) {
+                             Net.shutdown(fd, Net.SHUT_WR);
+                         }
+                     } catch (IOException ignore) { }
+                 }
+ 
                  long reader = readerThread;
                  long writer = writerThread;
!                 if (NativeThread.isVirtualThread(reader)
+                         || NativeThread.isVirtualThread(writer)) {
+                     Poller.stopPoll(fdVal);
+                 }
+                 if (NativeThread.isKernelThread(reader)
+                         || NativeThread.isKernelThread(writer)) {
                      nd.preClose(fd);
!                     if (NativeThread.isKernelThread(reader))
                          NativeThread.signal(reader);
!                     if (NativeThread.isKernelThread(writer))
                          NativeThread.signal(writer);
                  }
              }
          }
      }

*** 1096,13 ***
              ensureOpen();
              if (!isConnected())
                  throw new NotYetConnectedException();
              if (!isInputClosed) {
                  Net.shutdown(fd, Net.SHUT_RD);
!                 long thread = readerThread;
!                 if (thread != 0)
!                     NativeThread.signal(thread);
                  isInputClosed = true;
              }
              return this;
          }
      }
--- 1137,16 ---
              ensureOpen();
              if (!isConnected())
                  throw new NotYetConnectedException();
              if (!isInputClosed) {
                  Net.shutdown(fd, Net.SHUT_RD);
!                 long reader = readerThread;
!                 if (NativeThread.isVirtualThread(reader)) {
!                     Poller.stopPoll(fdVal, Net.POLLIN);
+                 } else if (NativeThread.isKernelThread(reader)) {
+                     NativeThread.signal(reader);
+                 }
                  isInputClosed = true;
              }
              return this;
          }
      }

*** 1113,13 ***
              ensureOpen();
              if (!isConnected())
                  throw new NotYetConnectedException();
              if (!isOutputClosed) {
                  Net.shutdown(fd, Net.SHUT_WR);
!                 long thread = writerThread;
!                 if (thread != 0)
!                     NativeThread.signal(thread);
                  isOutputClosed = true;
              }
              return this;
          }
      }
--- 1157,16 ---
              ensureOpen();
              if (!isConnected())
                  throw new NotYetConnectedException();
              if (!isOutputClosed) {
                  Net.shutdown(fd, Net.SHUT_WR);
!                 long writer = writerThread;
!                 if (NativeThread.isVirtualThread(writer)) {
!                     Poller.stopPoll(fdVal, Net.POLLOUT);
+                 } else if (NativeThread.isKernelThread(writer)) {
+                     NativeThread.signal(writer);
+                 }
                  isOutputClosed = true;
              }
              return this;
          }
      }

*** 1280,10 ***
--- 1327,11 ---
                          // restore socket to blocking mode (if channel is open)
                          tryLockedConfigureBlocking(true);
                      }
                  } else {
                      // read, no timeout
+                     configureNonBlockingIfNeeded();
                      n = tryRead(b, off, len);
                      while (IOStatus.okayToRetry(n) && isOpen()) {
                          park(Net.POLLIN);
                          n = tryRead(b, off, len);
                      }

*** 1341,10 ***
--- 1389,11 ---
              // loop until all bytes have been written
              int pos = off;
              int end = off + len;
              try {
                  beginWrite(true);
+                 configureNonBlockingIfNeeded();
                  while (pos < end && isOpen()) {
                      int size = end - pos;
                      int n = tryWrite(b, pos, size);
                      while (IOStatus.okayToRetry(n) && isOpen()) {
                          park(Net.POLLOUT);
< prev index next >