< prev index next >

src/java.base/linux/classes/sun/nio/ch/EPollSelectorImpl.java

Print this page
*** 34,11 ***
  import java.util.Deque;
  import java.util.HashMap;
  import java.util.Map;
  import java.util.concurrent.TimeUnit;
  import java.util.function.Consumer;
- import jdk.internal.misc.Blocker;
  
  import static sun.nio.ch.EPoll.EPOLLIN;
  import static sun.nio.ch.EPoll.EPOLL_CTL_ADD;
  import static sun.nio.ch.EPoll.EPOLL_CTL_DEL;
  import static sun.nio.ch.EPoll.EPOLL_CTL_MOD;
--- 34,10 ---

*** 109,40 ***
          boolean timedPoll = (to > 0);
  
          int numEntries;
          processUpdateQueue();
          processDeregisterQueue();
-         try {
-             begin(blocking);
  
!             do {
!                 long startTime = timedPoll ? System.nanoTime() : 0;
!                 long comp = Blocker.begin(blocking);
!                 try {
                      numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to);
!                 } finally {
!                     Blocker.end(comp);
!                 }
!                 if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
!                     // timed poll interrupted so need to adjust timeout
!                     long adjust = System.nanoTime() - startTime;
!                     to -= (int) TimeUnit.NANOSECONDS.toMillis(adjust);
!                     if (to <= 0) {
-                         // timeout expired so no retry
-                         numEntries = 0;
                      }
!                 }
!             } while (numEntries == IOStatus.INTERRUPTED);
!             assert IOStatus.check(numEntries);
! 
-         } finally {
-             end(blocking);
          }
          processDeregisterQueue();
          return processEvents(numEntries, action);
      }
  
      /**
       * Process changes to the interest ops.
       */
      private void processUpdateQueue() {
          assert Thread.holdsLock(this);
--- 108,75 ---
          boolean timedPoll = (to > 0);
  
          int numEntries;
          processUpdateQueue();
          processDeregisterQueue();
  
!         if (Thread.currentThread().isVirtual()) {
!             numEntries = (timedPoll)
!                     ? timedPoll(TimeUnit.MILLISECONDS.toNanos(to))
!                     : untimedPoll(blocking);
+         } else {
+             try {
+                 begin(blocking);
+                 do {
+                     long startTime = timedPoll ? System.nanoTime() : 0;
                      numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to);
!                     if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
!                         // timed poll interrupted so need to adjust timeout
!                         long adjust = System.nanoTime() - startTime;
!                         to -= (int) TimeUnit.NANOSECONDS.toMillis(adjust);
!                         if (to <= 0) {
!                             // timeout expired so no retry
!                             numEntries = 0;
!                         }
                      }
!                 } while (numEntries == IOStatus.INTERRUPTED);
!             } finally {
!                 end(blocking);
!             }
          }
+         assert IOStatus.check(numEntries);
+ 
          processDeregisterQueue();
          return processEvents(numEntries, action);
      }
  
+     /**
+      * If blocking, parks the current virtual thread until a file descriptor is polled
+      * or the thread is interrupted.
+      */
+     private int untimedPoll(boolean block) throws IOException {
+         int numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, 0);
+         if (block) {
+             while (numEntries == 0 && !Thread.currentThread().isInterrupted()) {
+                 Poller.pollSelector(epfd, 0);
+                 numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, 0);
+             }
+         }
+         return numEntries;
+     }
+ 
+     /**
+      * Parks the current virtual thread until a file descriptor is polled, or the thread
+      * is interrupted, for up to the specified waiting time.
+      */
+     private int timedPoll(long nanos) throws IOException {
+         long startNanos = System.nanoTime();
+         int numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, 0);
+         while (numEntries == 0 && !Thread.currentThread().isInterrupted()) {
+             long remainingNanos = nanos - (System.nanoTime() - startNanos);
+             if (remainingNanos <= 0) {
+                 // timeout
+                 break;
+             }
+             Poller.pollSelector(epfd, remainingNanos);
+             numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, 0);
+         }
+         return numEntries;
+     }
+ 
      /**
       * Process changes to the interest ops.
       */
      private void processUpdateQueue() {
          assert Thread.holdsLock(this);
< prev index next >