< prev index next >

src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java

Print this page

        

@@ -53,10 +53,11 @@
 import java.util.HashSet;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantLock;
 
+import jdk.internal.misc.Strands;
 import sun.net.ConnectionResetException;
 import sun.net.NetHooks;
 import sun.net.ext.ExtendedSocketOptions;
 import sun.net.util.SocketExceptions;
 

@@ -114,10 +115,13 @@
     private InetSocketAddress remoteAddress;
 
     // Socket adaptor, created on demand
     private Socket socket;
 
+    // lazily set to true when the socket is configured non-blocking
+    private volatile boolean nonBlocking;
+
     // -- End of fields protected by stateLock
 
 
     // Constructor for normal connecting sockets
     //

@@ -362,10 +366,11 @@
 
                 // check if input is shutdown
                 if (isInputClosed)
                     return IOStatus.EOF;
 
+                lockedConfigureNonBlockingIfFiber();
                 n = IOUtil.read(fd, buf, -1, nd);
                 if (blocking) {
                     while (IOStatus.okayToRetry(n) && isOpen()) {
                         park(Net.POLLIN);
                         n = IOUtil.read(fd, buf, -1, nd);

@@ -404,10 +409,11 @@
 
                 // check if input is shutdown
                 if (isInputClosed)
                     return IOStatus.EOF;
 
+                lockedConfigureNonBlockingIfFiber();
                 n = IOUtil.read(fd, dsts, offset, length, nd);
                 if (blocking) {
                     while (IOStatus.okayToRetry(n) && isOpen()) {
                         park(Net.POLLIN);
                         n = IOUtil.read(fd, dsts, offset, length, nd);

@@ -479,10 +485,11 @@
         try {
             boolean blocking = isBlocking();
             int n = 0;
             try {
                 beginWrite(blocking);
+                lockedConfigureNonBlockingIfFiber();
                 n = IOUtil.write(fd, buf, -1, nd);
                 if (blocking) {
                     while (IOStatus.okayToRetry(n) && isOpen()) {
                         park(Net.POLLOUT);
                         n = IOUtil.write(fd, buf, -1, nd);

@@ -509,10 +516,11 @@
         try {
             boolean blocking = isBlocking();
             long n = 0;
             try {
                 beginWrite(blocking);
+                lockedConfigureNonBlockingIfFiber();
                 n = IOUtil.write(fd, srcs, offset, length, nd);
                 if (blocking) {
                     while (IOStatus.okayToRetry(n) && isOpen()) {
                         park(Net.POLLOUT);
                         n = IOUtil.write(fd, srcs, offset, length, nd);

@@ -537,16 +545,16 @@
         try {
             boolean blocking = isBlocking();
             int n = 0;
             try {
                 beginWrite(blocking);
-                if (blocking) {
-                    do {
-                        n = Net.sendOOB(fd, b);
-                    } while (n == IOStatus.INTERRUPTED && isOpen());
-                } else {
+                lockedConfigureNonBlockingIfFiber();
+                do {
                     n = Net.sendOOB(fd, b);
+                } while (n == IOStatus.INTERRUPTED && isOpen());
+                if (blocking && n == IOStatus.UNAVAILABLE) {
+                    throw new SocketException("No buffer space available");
                 }
             } finally {
                 endWrite(blocking, n > 0);
                 if (n <= 0 && isOutputClosed)
                     throw new AsynchronousCloseException();

@@ -571,17 +579,35 @@
             readLock.unlock();
         }
     }
 
     /**
-     * Adjust the blocking mode while holding the readLock or writeLock.
+     * Adjust the blocking mode while holding readLock or writeLock.
      */
     private void lockedConfigureBlocking(boolean block) throws IOException {
         assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
         synchronized (stateLock) {
             ensureOpen();
-            IOUtil.configureBlocking(fd, block);
+            // do nothing if fiber has forced the socket to be non-blocking
+            if (!nonBlocking) {
+                IOUtil.configureBlocking(fd, block);
+            }
+        }
+    }
+
+    /**
+     * Ensures that the socket is configured non-blocking when the current
+     * strand is a fiber.
+     */
+    private void lockedConfigureNonBlockingIfFiber() throws IOException {
+        assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
+        if (!nonBlocking && (Strands.currentStrand() instanceof Fiber)) {
+            synchronized (stateLock) {
+                ensureOpen();
+                IOUtil.configureBlocking(fd, false);
+                nonBlocking = true;
+            }
         }
     }
 
     /**
      * Returns the local address, or null if not bound

@@ -727,10 +753,11 @@
                 try {
                     boolean blocking = isBlocking();
                     boolean connected = false;
                     try {
                         beginConnect(blocking, isa);
+                        lockedConfigureNonBlockingIfFiber();
                         int n = Net.connect(fd, isa.getAddress(), isa.getPort());
                         if (n > 0) {
                             connected = true;
                         } else if (blocking) {
                             assert IOStatus.okayToRetry(n);

@@ -886,14 +913,17 @@
             state = ST_CLOSING;
             if (!tryClose()) {
                 long reader = readerThread;
                 long writer = writerThread;
                 if (reader != 0 || writer != 0) {
+                    if (NativeThread.isFiber(reader) || NativeThread.isFiber(writer)) {
+                        Poller.stopPoll(fdVal);
+                    }
                     nd.preClose(fd);
-                    if (reader != 0)
+                    if (NativeThread.isKernelThread(reader))
                         NativeThread.signal(reader);
-                    if (writer != 0)
+                    if (NativeThread.isKernelThread(writer))
                         NativeThread.signal(writer);
                 }
             }
         }
     }

@@ -970,13 +1000,16 @@
             ensureOpen();
             if (!isConnected())
                 throw new NotYetConnectedException();
             if (!isInputClosed) {
                 Net.shutdown(fd, Net.SHUT_RD);
-                long thread = readerThread;
-                if (thread != 0)
-                    NativeThread.signal(thread);
+                long reader = readerThread;
+                if (NativeThread.isFiber(reader)) {
+                    Poller.stopPoll(fdVal, Net.POLLIN);
+                } else if (NativeThread.isKernelThread(reader)) {
+                    NativeThread.signal(reader);
+                }
                 isInputClosed = true;
             }
             return this;
         }
     }

@@ -987,13 +1020,16 @@
             ensureOpen();
             if (!isConnected())
                 throw new NotYetConnectedException();
             if (!isOutputClosed) {
                 Net.shutdown(fd, Net.SHUT_WR);
-                long thread = writerThread;
-                if (thread != 0)
-                    NativeThread.signal(thread);
+                long writer = writerThread;
+                if (NativeThread.isFiber(writer)) {
+                    Poller.stopPoll(fdVal, Net.POLLOUT);
+                } else if (NativeThread.isKernelThread(writer)) {
+                    NativeThread.signal(writer);
+                }
                 isOutputClosed = true;
             }
             return this;
         }
     }

@@ -1147,10 +1183,11 @@
                         // restore socket to blocking mode
                         lockedConfigureBlocking(true);
                     }
                 } else {
                     // read, no timeout
+                    lockedConfigureNonBlockingIfFiber();
                     n = tryRead(b, off, len);
                     while (IOStatus.okayToRetry(n) && isOpen()) {
                         park(Net.POLLIN);
                         n = tryRead(b, off, len);
                     }

@@ -1206,10 +1243,11 @@
             // loop until all bytes have been written
             int pos = off;
             int end = off + len;
             beginWrite(true);
             try {
+                lockedConfigureNonBlockingIfFiber();
                 while (pos < end && isOpen()) {
                     int size = end - pos;
                     int n = tryWrite(b, pos, size);
                     while (IOStatus.okayToRetry(n) && isOpen()) {
                         park(Net.POLLOUT);
< prev index next >