< prev index next >

src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java

Print this page
*** 25,10 ***
--- 25,11 ---
  
  package sun.nio.ch;
  
  import java.io.FileDescriptor;
  import java.io.IOException;
+ import java.io.InterruptedIOException;
  import java.io.UncheckedIOException;
  import java.lang.invoke.MethodHandles;
  import java.lang.invoke.VarHandle;
  import java.lang.ref.Cleaner.Cleanable;
  import java.lang.reflect.Method;

*** 64,13 ***
--- 65,15 ---
  import java.util.HashMap;
  import java.util.HashSet;
  import java.util.Map;
  import java.util.Objects;
  import java.util.Set;
+ import java.util.concurrent.TimeUnit;
  import java.util.concurrent.locks.ReentrantLock;
  import java.util.function.Consumer;
  
+ import jdk.internal.misc.VirtualThreads;
  import jdk.internal.ref.CleanerFactory;
  import sun.net.ResourceManager;
  import sun.net.ext.ExtendedSocketOptions;
  import sun.net.util.IPAddressUtil;
  

*** 157,10 ***
--- 160,13 ---
      private boolean reuseAddressEmulated;
  
      // set true/false when socket is already bound and SO_REUSEADDR is emulated
      private boolean isReuseAddress;
  
+     // lazily set to true when the socket is configured non-blocking
+     private volatile boolean nonBlocking;
+ 
      // -- End of fields protected by stateLock
  
  
      DatagramChannelImpl(SelectorProvider sp, boolean interruptible) throws IOException {
          this(sp, (Net.isIPv6Available()

*** 468,10 ***
--- 474,40 ---
      @Override
      public final Set<SocketOption<?>> supportedOptions() {
          return DefaultOptionsHolder.defaultOptions;
      }
  
+     @Override
+     public void park(int event, long nanos) throws IOException {
+         Thread thread = Thread.currentThread();
+         if (thread.isVirtual()) {
+             Poller.register(getFDVal(), event);
+             try {
+                 if (isOpen()) {
+                     if (nanos == 0) {
+                         VirtualThreads.park();
+                     } else {
+                         VirtualThreads.park(nanos);
+                     }
+                     if (!interruptible && thread.isInterrupted()) {
+                         throw new InterruptedIOException();
+                     }
+                 }
+             } finally {
+                 Poller.deregister(getFDVal(), event);
+             }
+         } else {
+             long millis;
+             if (nanos == 0) {
+                 millis = -1;
+             } else {
+                 millis = TimeUnit.NANOSECONDS.toMillis(nanos);
+             }
+             Net.poll(getFD(), event, millis);
+         }
+     }
+ 
      /**
       * Marks the beginning of a read operation that might block.
       *
       * @param blocking true if configured blocking
       * @param mustBeConnected true if the socket must be connected

*** 533,10 ***
--- 569,11 ---
          try {
              boolean blocking = isBlocking();
              SocketAddress sender = null;
              try {
                  SocketAddress remote = beginRead(blocking, false);
+                 lockedConfigureNonBlockingIfNeeded();
                  boolean connected = (remote != null);
                  @SuppressWarnings("removal")
                  SecurityManager sm = System.getSecurityManager();
                  if (connected || (sm == null)) {
                      // connected or no security manager

*** 661,10 ***
--- 698,11 ---
          assert readLock.isHeldByCurrentThread() && isBlocking();
          SocketAddress sender = null;
          try {
              SocketAddress remote = beginRead(true, false);
              boolean connected = (remote != null);
+             lockedConfigureNonBlockingIfNeeded();
              int n = receive(dst, connected);
              while (IOStatus.okayToRetry(n) && isOpen()) {
                  park(Net.POLLIN);
                  n = receive(dst, connected);
              }

*** 787,10 ***
--- 825,11 ---
              boolean blocking = isBlocking();
              int n;
              boolean completed = false;
              try {
                  SocketAddress remote = beginWrite(blocking, false);
+                 lockedConfigureNonBlockingIfNeeded();
                  if (remote != null) {
                      // connected
                      if (!target.equals(remote)) {
                          throw new AlreadyConnectedException();
                      }

*** 935,10 ***
--- 974,11 ---
          try {
              boolean blocking = isBlocking();
              int n = 0;
              try {
                  beginRead(blocking, true);
+                 lockedConfigureNonBlockingIfNeeded();
                  n = IOUtil.read(fd, buf, -1, nd);
                  if (blocking) {
                      while (IOStatus.okayToRetry(n) && isOpen()) {
                          park(Net.POLLIN);
                          n = IOUtil.read(fd, buf, -1, nd);

*** 964,10 ***
--- 1004,11 ---
          try {
              boolean blocking = isBlocking();
              long n = 0;
              try {
                  beginRead(blocking, true);
+                 lockedConfigureNonBlockingIfNeeded();
                  n = IOUtil.read(fd, dsts, offset, length, nd);
                  if (blocking) {
                      while (IOStatus.okayToRetry(n)  && isOpen()) {
                          park(Net.POLLIN);
                          n = IOUtil.read(fd, dsts, offset, length, nd);

*** 1046,10 ***
--- 1087,11 ---
          try {
              boolean blocking = isBlocking();
              int n = 0;
              try {
                  beginWrite(blocking, true);
+                 lockedConfigureNonBlockingIfNeeded();
                  n = IOUtil.write(fd, buf, -1, nd);
                  if (blocking) {
                      while (IOStatus.okayToRetry(n) && isOpen()) {
                          park(Net.POLLOUT);
                          n = IOUtil.write(fd, buf, -1, nd);

*** 1075,10 ***
--- 1117,11 ---
          try {
              boolean blocking = isBlocking();
              long n = 0;
              try {
                  beginWrite(blocking, true);
+                 lockedConfigureNonBlockingIfNeeded();
                  n = IOUtil.write(fd, srcs, offset, length, nd);
                  if (blocking) {
                      while (IOStatus.okayToRetry(n) && isOpen()) {
                          park(Net.POLLOUT);
                          n = IOUtil.write(fd, srcs, offset, length, nd);

*** 1114,33 ***
       */
      private void lockedConfigureBlocking(boolean block) throws IOException {
          assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
          synchronized (stateLock) {
              ensureOpen();
!             IOUtil.configureBlocking(fd, block);
          }
      }
  
      /**
!      * Adjusts the blocking mode if the channel is open. readLock or writeLock
!      * must already be held.
-      *
-      * @return {@code true} if the blocking mode was adjusted, {@code false} if
-      *         the blocking mode was not adjusted because the channel is closed
       */
      private boolean tryLockedConfigureBlocking(boolean block) throws IOException {
          assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
          synchronized (stateLock) {
!             if (isOpen()) {
                  IOUtil.configureBlocking(fd, block);
                  return true;
              } else {
                  return false;
              }
          }
      }
  
      InetSocketAddress localAddress() {
          synchronized (stateLock) {
              return localAddress;
          }
      }
--- 1157,49 ---
       */
      private void lockedConfigureBlocking(boolean block) throws IOException {
          assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
          synchronized (stateLock) {
              ensureOpen();
!             // do nothing if virtual thread has forced the socket to be non-blocking
+             if (!nonBlocking) {
+                 IOUtil.configureBlocking(fd, block);
+             }
          }
      }
  
      /**
!      * Attempts to adjusts the blocking mode if the channel is open.
!      * @return {@code true} if the blocking mode was adjusted
       */
      private boolean tryLockedConfigureBlocking(boolean block) throws IOException {
          assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
          synchronized (stateLock) {
!             if (!nonBlocking && isOpen()) {
                  IOUtil.configureBlocking(fd, block);
                  return true;
              } else {
                  return false;
              }
          }
      }
  
+     /**
+      * Ensures that the socket is configured non-blocking when on a virtual
+      * thread or a timeout is specified.
+      * @throws IOException if there is an I/O error changing the blocking mode
+      */
+     private void lockedConfigureNonBlockingIfNeeded() throws IOException {
+         assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
+         if (!nonBlocking && Thread.currentThread().isVirtual()) {
+             synchronized (stateLock) {
+                 ensureOpen();
+                 IOUtil.configureBlocking(fd, false);
+                 nonBlocking = true;
+             }
+         }
+     }
+ 
      InetSocketAddress localAddress() {
          synchronized (stateLock) {
              return localAddress;
          }
      }

*** 1261,20 ***
                      localAddress = Net.localAddress(fd);
  
                      // flush any packets already received.
                      boolean blocking = isBlocking();
                      if (blocking) {
!                         IOUtil.configureBlocking(fd, false);
                      }
                      try {
                          ByteBuffer buf = ByteBuffer.allocate(100);
                          while (receive(buf, false) >= 0) {
                              buf.clear();
                          }
                      } finally {
                          if (blocking) {
!                             IOUtil.configureBlocking(fd, true);
                          }
                      }
                  }
              } finally {
                  writeLock.unlock();
--- 1320,20 ---
                      localAddress = Net.localAddress(fd);
  
                      // flush any packets already received.
                      boolean blocking = isBlocking();
                      if (blocking) {
!                         lockedConfigureBlocking(false);
                      }
                      try {
                          ByteBuffer buf = ByteBuffer.allocate(100);
                          while (receive(buf, false) >= 0) {
                              buf.clear();
                          }
                      } finally {
                          if (blocking) {
!                             tryLockedConfigureBlocking(true);
                          }
                      }
                  }
              } finally {
                  writeLock.unlock();

*** 1730,15 ***
  
              if (!tryClose()) {
                  long reader = readerThread;
                  long writer = writerThread;
                  if (reader != 0 || writer != 0) {
!                     nd.preClose(fd);
!                     if (reader != 0)
!                         NativeThread.signal(reader);
!                     if (writer != 0)
!                         NativeThread.signal(writer);
                  }
              }
          }
      }
  
--- 1789,22 ---
  
              if (!tryClose()) {
                  long reader = readerThread;
                  long writer = writerThread;
                  if (reader != 0 || writer != 0) {
!                     if (NativeThread.isVirtualThread(reader)
!                             || NativeThread.isVirtualThread(writer)) {
!                         Poller.stopPoll(fdVal);
!                     }
!                     if (NativeThread.isKernelThread(reader)
+                             || NativeThread.isKernelThread(writer)) {
+                         nd.preClose(fd);
+                         if (NativeThread.isKernelThread(reader))
+                             NativeThread.signal(reader);
+                         if (NativeThread.isKernelThread(writer))
+                             NativeThread.signal(writer);
+                     }
                  }
              }
          }
      }
  
< prev index next >