< prev index next >

src/java.base/macosx/classes/sun/nio/ch/KQueueSelectorImpl.java

Print this page
*** 1,7 ***
  /*
!  * Copyright (c) 2011, 2024, Oracle and/or its affiliates. All rights reserved.
   * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   *
   * This code is free software; you can redistribute it and/or modify it
   * under the terms of the GNU General Public License version 2 only, as
   * published by the Free Software Foundation.  Oracle designates this
--- 1,7 ---
  /*
!  * Copyright (c) 2011, 2022, Oracle and/or its affiliates. All rights reserved.
   * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   *
   * This code is free software; you can redistribute it and/or modify it
   * under the terms of the GNU General Public License version 2 only, as
   * published by the Free Software Foundation.  Oracle designates this

*** 34,11 ***
  import java.util.Deque;
  import java.util.HashMap;
  import java.util.Map;
  import java.util.concurrent.TimeUnit;
  import java.util.function.Consumer;
- import jdk.internal.misc.Blocker;
  
  import static sun.nio.ch.KQueue.EVFILT_READ;
  import static sun.nio.ch.KQueue.EVFILT_WRITE;
  import static sun.nio.ch.KQueue.EV_ADD;
  import static sun.nio.ch.KQueue.EV_DELETE;
--- 34,10 ---

*** 113,40 ***
          boolean timedPoll = (to > 0);
  
          int numEntries;
          processUpdateQueue();
          processDeregisterQueue();
-         try {
-             begin(blocking);
  
!             do {
!                 long startTime = timedPoll ? System.nanoTime() : 0;
!                 boolean attempted = Blocker.begin(blocking);
!                 try {
                      numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
!                 } finally {
!                     Blocker.end(attempted);
!                 }
!                 if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
!                     // timed poll interrupted so need to adjust timeout
!                     long adjust = System.nanoTime() - startTime;
!                     to -= TimeUnit.NANOSECONDS.toMillis(adjust);
!                     if (to <= 0) {
-                         // timeout expired so no retry
-                         numEntries = 0;
                      }
!                 }
!             } while (numEntries == IOStatus.INTERRUPTED);
!             assert IOStatus.check(numEntries);
! 
-         } finally {
-             end(blocking);
          }
          processDeregisterQueue();
          return processEvents(numEntries, action);
      }
  
      /**
       * Process changes to the interest ops.
       */
      private void processUpdateQueue() {
          assert Thread.holdsLock(this);
--- 112,75 ---
          boolean timedPoll = (to > 0);
  
          int numEntries;
          processUpdateQueue();
          processDeregisterQueue();
  
!         if (Thread.currentThread().isVirtual()) {
!             numEntries = (timedPoll)
!                     ? timedPoll(TimeUnit.MILLISECONDS.toNanos(to))
!                     : untimedPoll(blocking);
+         } else {
+             try {
+                 begin(blocking);
+                 do {
+                     long startTime = timedPoll ? System.nanoTime() : 0;
                      numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
!                     if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
!                         // timed poll interrupted so need to adjust timeout
!                         long adjust = System.nanoTime() - startTime;
!                         to -= TimeUnit.NANOSECONDS.toMillis(adjust);
!                         if (to <= 0) {
!                             // timeout expired so no retry
!                             numEntries = 0;
!                         }
                      }
!                 } while (numEntries == IOStatus.INTERRUPTED);
!             } finally {
!                 end(blocking);
!             }
          }
+         assert IOStatus.check(numEntries);
+ 
          processDeregisterQueue();
          return processEvents(numEntries, action);
      }
  
+     /**
+      * If blocking, parks the current virtual thread until a file descriptor is polled
+      * or the thread is interrupted.
+      */
+     private int untimedPoll(boolean block) throws IOException {
+         int numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
+         if (block) {
+             while (numEntries == 0 && !Thread.currentThread().isInterrupted()) {
+                 Poller.pollSelector(kqfd, 0);
+                 numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
+             }
+         }
+         return numEntries;
+     }
+ 
+     /**
+      * Parks the current virtual thread until a file descriptor is polled, or the thread
+      * is interrupted, for up to the specified waiting time.
+      */
+     private int timedPoll(long nanos) throws IOException {
+         long startNanos = System.nanoTime();
+         int numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
+         while (numEntries == 0 && !Thread.currentThread().isInterrupted()) {
+             long remainingNanos = nanos - (System.nanoTime() - startNanos);
+             if (remainingNanos <= 0) {
+                 // timeout
+                 break;
+             }
+             Poller.pollSelector(kqfd, remainingNanos);
+             numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
+         }
+         return numEntries;
+     }
+ 
      /**
       * Process changes to the interest ops.
       */
      private void processUpdateQueue() {
          assert Thread.holdsLock(this);
< prev index next >