< prev index next >

src/java.base/macosx/classes/sun/nio/ch/KQueueSelectorImpl.java

Print this page

 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 
 26 package sun.nio.ch;
 27 
 28 import java.io.IOException;
 29 import java.nio.channels.SelectionKey;
 30 import java.nio.channels.Selector;
 31 import java.nio.channels.spi.SelectorProvider;
 32 import java.util.ArrayDeque;
 33 import java.util.Deque;
 34 import java.util.HashMap;
 35 import java.util.Map;
 36 import java.util.concurrent.TimeUnit;
 37 import java.util.function.Consumer;
 38 import jdk.internal.misc.Blocker;
 39 
 40 import static sun.nio.ch.KQueue.EVFILT_READ;
 41 import static sun.nio.ch.KQueue.EVFILT_WRITE;
 42 import static sun.nio.ch.KQueue.EV_ADD;
 43 import static sun.nio.ch.KQueue.EV_DELETE;
 44 
 45 /**
 46  * KQueue based Selector implementation for macOS
 47  */
 48 
 49 class KQueueSelectorImpl extends SelectorImpl {
 50 
 51     // maximum number of events to poll in one call to kqueue
 52     private static final int MAX_KEVENTS = 256;
 53 
 54     // kqueue file descriptor
 55     private final int kqfd;
 56 
 57     // address of poll array (event list) when polling for pending events
 58     private final long pollArrayAddress;

 92             throw ioe;
 93         }
 94 
 95         // register one end of the socket pair for wakeups
 96         KQueue.register(kqfd, fd0, EVFILT_READ, EV_ADD);
 97     }
 98 
 99     @Override
100     protected int doSelect(Consumer<SelectionKey> action, long timeout)
101         throws IOException
102     {
103         assert Thread.holdsLock(this);
104 
105         long to = Math.min(timeout, Integer.MAX_VALUE);  // max kqueue timeout
106         boolean blocking = (to != 0);
107         boolean timedPoll = (to > 0);
108 
109         int numEntries;
110         processUpdateQueue();
111         processDeregisterQueue();
112         try {
113             begin(blocking);
114 
115             do {
116                 long startTime = timedPoll ? System.nanoTime() : 0;
117                 boolean attempted = Blocker.begin(blocking);
118                 try {





119                     numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
120                 } finally {
121                     Blocker.end(attempted);
122                 }
123                 if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
124                     // timed poll interrupted so need to adjust timeout
125                     long adjust = System.nanoTime() - startTime;
126                     to -= TimeUnit.NANOSECONDS.toMillis(adjust);
127                     if (to <= 0) {
128                         // timeout expired so no retry
129                         numEntries = 0;
130                     }
131                 }
132             } while (numEntries == IOStatus.INTERRUPTED);
133             assert IOStatus.check(numEntries);
134 
135         } finally {
136             end(blocking);
137         }


138         processDeregisterQueue();
139         return processEvents(numEntries, action);
140     }
141 


































142     /**
143      * Process changes to the interest ops.
144      */
145     private void processUpdateQueue() {
146         assert Thread.holdsLock(this);
147 
148         synchronized (updateLock) {
149             SelectionKeyImpl ski;
150             while ((ski = updateKeys.pollFirst()) != null) {
151                 if (ski.isValid()) {
152                     int fd = ski.getFDVal();
153                     // add to fdToKey if needed
154                     SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
155                     assert (previous == null) || (previous == ski);
156 
157                     int newEvents = ski.translateInterestOps();
158                     int registeredEvents = ski.registeredEvents();
159 
160                     // DatagramChannelImpl::disconnect has reset socket
161                     if (ski.getAndClearReset() && registeredEvents != 0) {

 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 
 26 package sun.nio.ch;
 27 
 28 import java.io.IOException;
 29 import java.nio.channels.SelectionKey;
 30 import java.nio.channels.Selector;
 31 import java.nio.channels.spi.SelectorProvider;
 32 import java.util.ArrayDeque;
 33 import java.util.Deque;
 34 import java.util.HashMap;
 35 import java.util.Map;
 36 import java.util.concurrent.TimeUnit;
 37 import java.util.function.Consumer;

 38 
 39 import static sun.nio.ch.KQueue.EVFILT_READ;
 40 import static sun.nio.ch.KQueue.EVFILT_WRITE;
 41 import static sun.nio.ch.KQueue.EV_ADD;
 42 import static sun.nio.ch.KQueue.EV_DELETE;
 43 
 44 /**
 45  * KQueue based Selector implementation for macOS
 46  */
 47 
 48 class KQueueSelectorImpl extends SelectorImpl {
 49 
 50     // maximum number of events to poll in one call to kqueue
 51     private static final int MAX_KEVENTS = 256;
 52 
 53     // kqueue file descriptor
 54     private final int kqfd;
 55 
 56     // address of poll array (event list) when polling for pending events
 57     private final long pollArrayAddress;

 91             throw ioe;
 92         }
 93 
 94         // register one end of the socket pair for wakeups
 95         KQueue.register(kqfd, fd0, EVFILT_READ, EV_ADD);
 96     }
 97 
 98     @Override
 99     protected int doSelect(Consumer<SelectionKey> action, long timeout)
100         throws IOException
101     {
102         assert Thread.holdsLock(this);
103 
104         long to = Math.min(timeout, Integer.MAX_VALUE);  // max kqueue timeout
105         boolean blocking = (to != 0);
106         boolean timedPoll = (to > 0);
107 
108         int numEntries;
109         processUpdateQueue();
110         processDeregisterQueue();


111 
112         if (Thread.currentThread().isVirtual()) {
113             numEntries = (timedPoll)
114                     ? timedPoll(TimeUnit.MILLISECONDS.toNanos(to))
115                     : untimedPoll(blocking);
116         } else {
117             try {
118                 begin(blocking);
119                 do {
120                     long startTime = timedPoll ? System.nanoTime() : 0;
121                     numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
122                     if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
123                         // timed poll interrupted so need to adjust timeout
124                         long adjust = System.nanoTime() - startTime;
125                         to -= TimeUnit.NANOSECONDS.toMillis(adjust);
126                         if (to <= 0) {
127                             // timeout expired so no retry
128                             numEntries = 0;
129                         }


130                     }
131                 } while (numEntries == IOStatus.INTERRUPTED);
132             } finally {
133                 end(blocking);
134             }


135         }
136         assert IOStatus.check(numEntries);
137 
138         processDeregisterQueue();
139         return processEvents(numEntries, action);
140     }
141 
142     /**
143      * If blocking, parks the current virtual thread until a file descriptor is polled
144      * or the thread is interrupted.
145      */
146     private int untimedPoll(boolean block) throws IOException {
147         int numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
148         if (block) {
149             while (numEntries == 0 && !Thread.currentThread().isInterrupted()) {
150                 Poller.pollSelector(kqfd, 0);
151                 numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
152             }
153         }
154         return numEntries;
155     }
156 
157     /**
158      * Parks the current virtual thread until a file descriptor is polled, or the thread
159      * is interrupted, for up to the specified waiting time.
160      */
161     private int timedPoll(long nanos) throws IOException {
162         long startNanos = System.nanoTime();
163         int numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
164         while (numEntries == 0 && !Thread.currentThread().isInterrupted()) {
165             long remainingNanos = nanos - (System.nanoTime() - startNanos);
166             if (remainingNanos <= 0) {
167                 // timeout
168                 break;
169             }
170             Poller.pollSelector(kqfd, remainingNanos);
171             numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
172         }
173         return numEntries;
174     }
175 
176     /**
177      * Process changes to the interest ops.
178      */
179     private void processUpdateQueue() {
180         assert Thread.holdsLock(this);
181 
182         synchronized (updateLock) {
183             SelectionKeyImpl ski;
184             while ((ski = updateKeys.pollFirst()) != null) {
185                 if (ski.isValid()) {
186                     int fd = ski.getFDVal();
187                     // add to fdToKey if needed
188                     SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
189                     assert (previous == null) || (previous == ski);
190 
191                     int newEvents = ski.translateInterestOps();
192                     int registeredEvents = ski.registeredEvents();
193 
194                     // DatagramChannelImpl::disconnect has reset socket
195                     if (ski.getAndClearReset() && registeredEvents != 0) {
< prev index next >