< prev index next >

src/java.base/share/classes/sun/nio/ch/NioSocketImpl.java

Print this page

        

@@ -50,10 +50,12 @@
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
 import jdk.internal.ref.CleanerFactory;
+import jdk.internal.access.SharedSecrets;
+import jdk.internal.misc.Strands;
 import sun.net.ConnectionResetException;
 import sun.net.NetHooks;
 import sun.net.PlatformSocketImpl;
 import sun.net.ResourceManager;
 import sun.net.ext.ExtendedSocketOptions;

@@ -67,17 +69,15 @@
  *
  * This implementation attempts to be compatible with legacy PlainSocketImpl,
  * including behavior and exceptions that are not specified by SocketImpl.
  *
  * The underlying socket used by this SocketImpl is initially configured
- * blocking. If the connect method is used to establish a connection with a
- * timeout then the socket is configured non-blocking for the connect attempt,
- * and then restored to blocking mode when the connection is established.
- * If the accept or read methods are used with a timeout then the socket is
- * configured non-blocking and is never restored. When in non-blocking mode,
- * operations that don't complete immediately will poll the socket and preserve
- * the semantics of blocking operations.
+ * blocking. If a connect, accept or read is attempted with a timeout, or a
+ * fiber invokes a blocking operation, then the socket is changed to non-blocking
+ * When in non-blocking mode, operations that don't complete immediately will
+ * poll the socket (or park the fiber when invoked on a fiber) and preserve the
+ * semantics of blocking operations.
  */
 
 public final class NioSocketImpl extends SocketImpl implements PlatformSocketImpl {
     private static final NativeDispatcher nd = new SocketDispatcher();
 

@@ -165,63 +165,68 @@
         if (state > ST_CONNECTED)
             throw new SocketException("Socket closed");
     }
 
     /**
-     * Disables the current thread for scheduling purposes until the socket is
-     * ready for I/O, or is asynchronously closed, for up to the specified
-     * waiting time.
-     * @throws IOException if an I/O error occurs
+     * Disables the current thread or fiber for scheduling purposes until the
+     * socket is ready for I/O or is asynchronously closed, for up to the
+     * specified waiting time.
+     * @throws IOException if an I/O error occurs or the fiber is interrupted
      */
     private void park(FileDescriptor fd, int event, long nanos) throws IOException {
-        long millis;
-        if (nanos == 0) {
-            millis = -1;
+        Object strand = Strands.currentStrand();
+        if (PollerProvider.available() && (strand instanceof Fiber)) {
+            int fdVal = fdVal(fd);
+            Poller.register(strand, fdVal, event);
+            if (isOpen()) {
+                try {
+                    if (nanos == 0) {
+                        Strands.parkFiber();
+                    } else {
+                        Strands.parkFiber(nanos);
+                    }
+                    // throw SocketException with interrupt status set for now
+                    if (Strands.isInterrupted()) {
+                        throw new SocketException("I/O operation interrupted");
+                    }
+                } finally {
+                    Poller.deregister(strand, fdVal, event);
+                }
+            }
         } else {
-            millis = NANOSECONDS.toMillis(nanos);
+            long millis;
+            if (nanos == 0) {
+                millis = -1;
+            } else {
+                millis = NANOSECONDS.toMillis(nanos);
+            }
+            Net.poll(fd, event, millis);
         }
-        Net.poll(fd, event, millis);
     }
 
     /**
      * Disables the current thread for scheduling purposes until the socket is
      * ready for I/O or is asynchronously closed.
-     * @throws IOException if an I/O error occurs
+     * @throws IOException if an I/O error occurs or the fiber is interrupted
      */
     private void park(FileDescriptor fd, int event) throws IOException {
         park(fd, event, 0);
     }
 
     /**
-     * Configures the socket to blocking mode. This method is a no-op if the
-     * socket is already in blocking mode.
-     * @throws IOException if closed or there is an I/O error changing the mode
+     * Ensures that the socket is configured non-blocking when the current
+     * strand is a fiber or the operation has a timeout
+     * @throws IOException if there is an I/O error changing the blocking mode
      */
-    private void configureBlocking(FileDescriptor fd) throws IOException {
-        assert readLock.isHeldByCurrentThread();
-        if (nonBlocking) {
-            synchronized (stateLock) {
-                ensureOpen();
-                IOUtil.configureBlocking(fd, true);
-                nonBlocking = false;
-            }
-        }
-    }
-
-    /**
-     * Configures the socket to non-blocking mode. This method is a no-op if the
-     * socket is already in non-blocking mode.
-     * @throws IOException if closed or there is an I/O error changing the mode
-     */
-    private void configureNonBlocking(FileDescriptor fd) throws IOException {
-        assert readLock.isHeldByCurrentThread();
-        if (!nonBlocking) {
-            synchronized (stateLock) {
-                ensureOpen();
-                IOUtil.configureBlocking(fd, false);
-                nonBlocking = true;
-            }
+    private void configureNonBlockingIfNeeded(FileDescriptor fd, boolean timed)
+        throws IOException
+    {
+        if (!nonBlocking
+            && (timed || Strands.currentStrand() instanceof Fiber)) {
+            assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
+            IOUtil.configureBlocking(fd, false);
+            nonBlocking = true;
         }
     }
 
     /**
      * Marks the beginning of a read operation that might block.

@@ -302,13 +307,13 @@
             if (connectionReset)
                 throw new SocketException("Connection reset");
             if (isInputClosed)
                 return -1;
             int timeout = this.timeout;
+            configureNonBlockingIfNeeded(fd, timeout > 0);
             if (timeout > 0) {
                 // read with timeout
-                configureNonBlocking(fd);
                 n = timedRead(fd, b, off, len, MILLISECONDS.toNanos(timeout));
             } else {
                 // read, no timeout
                 n = tryRead(fd, b, off, len);
                 while (IOStatus.okayToRetry(n) && isOpen()) {

@@ -409,10 +414,11 @@
      */
     private int implWrite(byte[] b, int off, int len) throws IOException {
         int n = 0;
         FileDescriptor fd = beginWrite();
         try {
+            configureNonBlockingIfNeeded(fd, false);
             n = tryWrite(fd, b, off, len);
             while (IOStatus.okayToRetry(n) && isOpen()) {
                 park(fd, Net.POLLOUT);
                 n = tryWrite(fd, b, off, len);
             }

@@ -577,16 +583,11 @@
             connectLock.lock();
             try {
                 boolean connected = false;
                 FileDescriptor fd = beginConnect(address, port);
                 try {
-
-                    // configure socket to non-blocking mode when there is a timeout
-                    if (millis > 0) {
-                        configureNonBlocking(fd);
-                    }
-
+                    configureNonBlockingIfNeeded(fd, millis > 0);
                     int n = Net.connect(fd, address, port);
                     if (n > 0) {
                         // connection established
                         connected = true;
                     } else {

@@ -603,16 +604,10 @@
                                 polled = Net.pollConnectNow(fd);
                             }
                             connected = polled && isOpen();
                         }
                     }
-
-                    // restore socket to blocking mode
-                    if (connected && millis > 0) {
-                        configureBlocking(fd);
-                    }
-
                 } finally {
                     endConnect(fd, connected);
                 }
             } finally {
                 connectLock.unlock();

@@ -744,13 +739,13 @@
         // accept a connection
         try {
             int n = 0;
             FileDescriptor fd = beginAccept();
             try {
+                configureNonBlockingIfNeeded(fd, remainingNanos > 0);
                 if (remainingNanos > 0) {
                     // accept with timeout
-                    configureNonBlocking(fd);
                     n = timedAccept(fd, newfd, isaa, remainingNanos);
                 } else {
                     // accept, no timeout
                     n = Net.accept(fd, newfd, isaa);
                     while (IOStatus.okayToRetry(n) && isOpen()) {

@@ -901,16 +896,18 @@
 
             // attempt to close the socket. If there are I/O operations in progress
             // then the socket is pre-closed and the thread(s) signalled. The
             // last thread will close the file descriptor.
             if (!tryClose()) {
-                nd.preClose(fd);
                 long reader = readerThread;
-                if (reader != 0)
-                    NativeThread.signal(reader);
                 long writer = writerThread;
-                if (writer != 0)
+                if (NativeThread.isFiber(reader) || NativeThread.isFiber(writer))
+                    Poller.stopPoll(fdVal(fd));
+                nd.preClose(fd);
+                if (NativeThread.isKernelThread(reader))
+                    NativeThread.signal(reader);
+                if (NativeThread.isKernelThread(writer))
                     NativeThread.signal(writer);
             }
         }
     }
 

@@ -1148,10 +1145,13 @@
     protected void shutdownInput() throws IOException {
         synchronized (stateLock) {
             ensureOpenAndConnected();
             if (!isInputClosed) {
                 Net.shutdown(fd, Net.SHUT_RD);
+                if (NativeThread.isFiber(readerThread)) {
+                    Poller.stopPoll(fdVal(fd), Net.POLLIN);
+                }
                 isInputClosed = true;
             }
         }
     }
 

@@ -1159,10 +1159,13 @@
     protected void shutdownOutput() throws IOException {
         synchronized (stateLock) {
             ensureOpenAndConnected();
             if (!isOutputClosed) {
                 Net.shutdown(fd, Net.SHUT_WR);
+                if (NativeThread.isFiber(writerThread)) {
+                    Poller.stopPoll(fdVal(fd), Net.POLLOUT);
+                }
                 isOutputClosed = true;
             }
         }
     }
 

@@ -1176,10 +1179,11 @@
         writeLock.lock();
         try {
             int n = 0;
             FileDescriptor fd = beginWrite();
             try {
+                configureNonBlockingIfNeeded(fd, false);
                 do {
                     n = Net.sendOOB(fd, (byte) data);
                 } while (n == IOStatus.INTERRUPTED && isOpen());
                 if (n == IOStatus.UNAVAILABLE) {
                     throw new SocketException("No buffer space available");

@@ -1278,6 +1282,15 @@
             return StandardProtocolFamily.INET6;
         } else {
             return StandardProtocolFamily.INET;
         }
     }
-}
+
+    /**
+     * Returns the native file descriptor
+     */
+    private static int fdVal(FileDescriptor fd) {
+        int fdVal = SharedSecrets.getJavaIOFileDescriptorAccess().get(fd);
+        assert fdVal == IOUtil.fdVal(fd);
+        return fdVal;
+    }
+}
\ No newline at end of file
< prev index next >