< prev index next >

src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java

Print this page

        

@@ -53,10 +53,11 @@
 import java.util.HashSet;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantLock;
 
+import jdk.internal.misc.Strands;
 import sun.net.ResourceManager;
 import sun.net.ext.ExtendedSocketOptions;
 import sun.net.util.IPAddressUtil;
 
 /**

@@ -119,10 +120,13 @@
     private boolean reuseAddressEmulated;
 
     // set true/false when socket is already bound and SO_REUSEADDR is emulated
     private boolean isReuseAddress;
 
+    // lazily set to true when the socket is configured non-blocking
+    private volatile boolean nonBlocking;
+
     // -- End of fields protected by stateLock
 
     public DatagramChannelImpl(SelectorProvider sp)
         throws IOException
     {

@@ -407,10 +411,11 @@
             boolean blocking = isBlocking();
             int n = 0;
             ByteBuffer bb = null;
             try {
                 SocketAddress remote = beginRead(blocking, false);
+                lockedConfigureNonBlockingIfFiber();
                 boolean connected = (remote != null);
                 SecurityManager sm = System.getSecurityManager();
                 if (connected || (sm == null)) {
                     // connected or no security manager
                     n = receive(fd, dst, connected);

@@ -511,10 +516,11 @@
         try {
             boolean blocking = isBlocking();
             int n = 0;
             try {
                 SocketAddress remote = beginWrite(blocking, false);
+                lockedConfigureNonBlockingIfFiber();
                 if (remote != null) {
                     // connected
                     if (!target.equals(remote)) {
                         throw new AlreadyConnectedException();
                     }

@@ -618,10 +624,11 @@
         try {
             boolean blocking = isBlocking();
             int n = 0;
             try {
                 beginRead(blocking, true);
+                lockedConfigureNonBlockingIfFiber();
                 n = IOUtil.read(fd, buf, -1, nd);
                 if (blocking) {
                     while (IOStatus.okayToRetry(n) && isOpen()) {
                         park(Net.POLLIN);
                         n = IOUtil.read(fd, buf, -1, nd);

@@ -647,10 +654,11 @@
         try {
             boolean blocking = isBlocking();
             long n = 0;
             try {
                 beginRead(blocking, true);
+                lockedConfigureNonBlockingIfFiber();
                 n = IOUtil.read(fd, dsts, offset, length, nd);
                 if (blocking) {
                     while (IOStatus.okayToRetry(n) && isOpen()) {
                         park(Net.POLLIN);
                         n = IOUtil.read(fd, dsts, offset, length, nd);

@@ -724,10 +732,11 @@
         try {
             boolean blocking = isBlocking();
             int n = 0;
             try {
                 beginWrite(blocking, true);
+                lockedConfigureNonBlockingIfFiber();
                 n = IOUtil.write(fd, buf, -1, nd);
                 if (blocking) {
                     while (IOStatus.okayToRetry(n) && isOpen()) {
                         park(Net.POLLOUT);
                         n = IOUtil.write(fd, buf, -1, nd);

@@ -753,10 +762,11 @@
         try {
             boolean blocking = isBlocking();
             long n = 0;
             try {
                 beginWrite(blocking, true);
+                lockedConfigureNonBlockingIfFiber();
                 n = IOUtil.write(fd, srcs, offset, length, nd);
                 if (blocking) {
                     while (IOStatus.okayToRetry(n) && isOpen()) {
                         park(Net.POLLOUT);
                         n = IOUtil.write(fd, srcs, offset, length, nd);

@@ -776,22 +786,49 @@
     protected void implConfigureBlocking(boolean block) throws IOException {
         readLock.lock();
         try {
             writeLock.lock();
             try {
-                synchronized (stateLock) {
-                    ensureOpen();
-                    IOUtil.configureBlocking(fd, block);
-                }
+                lockedConfigureBlocking(block);
             } finally {
                 writeLock.unlock();
             }
         } finally {
             readLock.unlock();
         }
     }
 
+    /**
+     * Adjust the blocking mode while holding readLock or writeLock.
+     */
+    private void lockedConfigureBlocking(boolean block) throws IOException {
+        assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
+        synchronized (stateLock) {
+            ensureOpen();
+            // do nothing if fiber has forced the socket to be non-blocking
+            if (!nonBlocking) {
+                IOUtil.configureBlocking(fd, block);
+            }
+        }
+    }
+
+    /**
+     * Ensures that the socket is configured non-blocking when the current
+     * strand is a fiber or a timeout is specified.
+     * @throws IOException if there is an I/O error changing the blocking mode
+     */
+    private void lockedConfigureNonBlockingIfFiber() throws IOException {
+        assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
+        if (!nonBlocking && (Strands.currentStrand() instanceof Fiber)) {
+            synchronized (stateLock) {
+                ensureOpen();
+                IOUtil.configureBlocking(fd, false);
+                nonBlocking = true;
+            }
+        }
+    }
+
     InetSocketAddress localAddress() {
         synchronized (stateLock) {
             return localAddress;
         }
     }

@@ -890,20 +927,20 @@
                     localAddress = Net.localAddress(fd);
 
                     // flush any packets already received.
                     boolean blocking = isBlocking();
                     if (blocking) {
-                        IOUtil.configureBlocking(fd, false);
+                        lockedConfigureBlocking(false);
                     }
                     try {
                         ByteBuffer buf = ByteBuffer.allocate(100);
                         while (receive(fd, buf, false) > 0) {
                             buf.clear();
                         }
                     } finally {
                         if (blocking) {
-                            IOUtil.configureBlocking(fd, true);
+                            lockedConfigureBlocking(true);
                         }
                     }
                 }
             } finally {
                 writeLock.unlock();

@@ -1203,14 +1240,17 @@
 
             if (!tryClose()) {
                 long reader = readerThread;
                 long writer = writerThread;
                 if (reader != 0 || writer != 0) {
+                    if (NativeThread.isFiber(reader) || NativeThread.isFiber(writer)) {
+                        Poller.stopPoll(fdVal);
+                    }
                     nd.preClose(fd);
-                    if (reader != 0)
+                    if (NativeThread.isKernelThread(reader))
                         NativeThread.signal(reader);
-                    if (writer != 0)
+                    if (NativeThread.isKernelThread(writer))
                         NativeThread.signal(writer);
                 }
             }
         }
     }
< prev index next >