< prev index next >

src/java.base/macosx/classes/sun/nio/ch/KQueueSelectorImpl.java

Print this page

 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 
 26 package sun.nio.ch;
 27 
 28 import java.io.IOException;
 29 import java.nio.channels.ClosedSelectorException;
 30 import java.nio.channels.SelectionKey;
 31 import java.nio.channels.Selector;
 32 import java.nio.channels.spi.SelectorProvider;
 33 import java.util.ArrayDeque;
 34 import java.util.Deque;
 35 import java.util.HashMap;
 36 import java.util.Map;
 37 import java.util.concurrent.TimeUnit;
 38 import java.util.function.Consumer;
 39 import jdk.internal.misc.Blocker;
 40 
 41 import static sun.nio.ch.KQueue.EVFILT_READ;
 42 import static sun.nio.ch.KQueue.EVFILT_WRITE;
 43 import static sun.nio.ch.KQueue.EV_ADD;
 44 import static sun.nio.ch.KQueue.EV_DELETE;
 45 
 46 /**
 47  * KQueue based Selector implementation for macOS
 48  */
 49 
 50 class KQueueSelectorImpl extends SelectorImpl {
 51 
 52     // maximum number of events to poll in one call to kqueue
 53     private static final int MAX_KEVENTS = 256;
 54 
 55     // kqueue file descriptor
 56     private final int kqfd;
 57 
 58     // address of poll array (event list) when polling for pending events
 59     private final long pollArrayAddress;

 98     }
 99 
100     private void ensureOpen() {
101         if (!isOpen())
102             throw new ClosedSelectorException();
103     }
104 
105     @Override
106     protected int doSelect(Consumer<SelectionKey> action, long timeout)
107         throws IOException
108     {
109         assert Thread.holdsLock(this);
110 
111         long to = Math.min(timeout, Integer.MAX_VALUE);  // max kqueue timeout
112         boolean blocking = (to != 0);
113         boolean timedPoll = (to > 0);
114 
115         int numEntries;
116         processUpdateQueue();
117         processDeregisterQueue();
118         try {
119             begin(blocking);
120 
121             do {
122                 long startTime = timedPoll ? System.nanoTime() : 0;
123                 long comp = Blocker.begin(blocking);
124                 try {





125                     numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
126                 } finally {
127                     Blocker.end(comp);
128                 }
129                 if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
130                     // timed poll interrupted so need to adjust timeout
131                     long adjust = System.nanoTime() - startTime;
132                     to -= TimeUnit.NANOSECONDS.toMillis(adjust);
133                     if (to <= 0) {
134                         // timeout expired so no retry
135                         numEntries = 0;
136                     }
137                 }
138             } while (numEntries == IOStatus.INTERRUPTED);
139             assert IOStatus.check(numEntries);
140 
141         } finally {
142             end(blocking);
143         }


144         processDeregisterQueue();
145         return processEvents(numEntries, action);
146     }
147 


































148     /**
149      * Process changes to the interest ops.
150      */
151     private void processUpdateQueue() {
152         assert Thread.holdsLock(this);
153 
154         synchronized (updateLock) {
155             SelectionKeyImpl ski;
156             while ((ski = updateKeys.pollFirst()) != null) {
157                 if (ski.isValid()) {
158                     int fd = ski.getFDVal();
159                     // add to fdToKey if needed
160                     SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
161                     assert (previous == null) || (previous == ski);
162 
163                     int newEvents = ski.translateInterestOps();
164                     int registeredEvents = ski.registeredEvents();
165 
166                     // DatagramChannelImpl::disconnect has reset socket
167                     if (ski.getAndClearReset() && registeredEvents != 0) {

 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 
 26 package sun.nio.ch;
 27 
 28 import java.io.IOException;
 29 import java.nio.channels.ClosedSelectorException;
 30 import java.nio.channels.SelectionKey;
 31 import java.nio.channels.Selector;
 32 import java.nio.channels.spi.SelectorProvider;
 33 import java.util.ArrayDeque;
 34 import java.util.Deque;
 35 import java.util.HashMap;
 36 import java.util.Map;
 37 import java.util.concurrent.TimeUnit;
 38 import java.util.function.Consumer;

 39 
 40 import static sun.nio.ch.KQueue.EVFILT_READ;
 41 import static sun.nio.ch.KQueue.EVFILT_WRITE;
 42 import static sun.nio.ch.KQueue.EV_ADD;
 43 import static sun.nio.ch.KQueue.EV_DELETE;
 44 
 45 /**
 46  * KQueue based Selector implementation for macOS
 47  */
 48 
 49 class KQueueSelectorImpl extends SelectorImpl {
 50 
 51     // maximum number of events to poll in one call to kqueue
 52     private static final int MAX_KEVENTS = 256;
 53 
 54     // kqueue file descriptor
 55     private final int kqfd;
 56 
 57     // address of poll array (event list) when polling for pending events
 58     private final long pollArrayAddress;

 97     }
 98 
 99     private void ensureOpen() {
100         if (!isOpen())
101             throw new ClosedSelectorException();
102     }
103 
104     @Override
105     protected int doSelect(Consumer<SelectionKey> action, long timeout)
106         throws IOException
107     {
108         assert Thread.holdsLock(this);
109 
110         long to = Math.min(timeout, Integer.MAX_VALUE);  // max kqueue timeout
111         boolean blocking = (to != 0);
112         boolean timedPoll = (to > 0);
113 
114         int numEntries;
115         processUpdateQueue();
116         processDeregisterQueue();


117 
118         if (Thread.currentThread().isVirtual()) {
119             numEntries = (timedPoll)
120                     ? timedPoll(TimeUnit.MILLISECONDS.toNanos(to))
121                     : untimedPoll(blocking);
122         } else {
123             try {
124                 begin(blocking);
125                 do {
126                     long startTime = timedPoll ? System.nanoTime() : 0;
127                     numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
128                     if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
129                         // timed poll interrupted so need to adjust timeout
130                         long adjust = System.nanoTime() - startTime;
131                         to -= TimeUnit.NANOSECONDS.toMillis(adjust);
132                         if (to <= 0) {
133                             // timeout expired so no retry
134                             numEntries = 0;
135                         }


136                     }
137                 } while (numEntries == IOStatus.INTERRUPTED);
138             } finally {
139                 end(blocking);
140             }


141         }
142         assert IOStatus.check(numEntries);
143 
144         processDeregisterQueue();
145         return processEvents(numEntries, action);
146     }
147 
148     /**
149      * If blocking, parks the current virtual thread until a file descriptor is polled
150      * or the thread is interrupted.
151      */
152     private int untimedPoll(boolean block) throws IOException {
153         int numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
154         if (block) {
155             while (numEntries == 0 && !Thread.currentThread().isInterrupted()) {
156                 Poller.pollSelector(kqfd, 0);
157                 numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
158             }
159         }
160         return numEntries;
161     }
162 
163     /**
164      * Parks the current virtual thread until a file descriptor is polled, or the thread
165      * is interrupted, for up to the specified waiting time.
166      */
167     private int timedPoll(long nanos) throws IOException {
168         long startNanos = System.nanoTime();
169         int numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
170         while (numEntries == 0 && !Thread.currentThread().isInterrupted()) {
171             long remainingNanos = nanos - (System.nanoTime() - startNanos);
172             if (remainingNanos <= 0) {
173                 // timeout
174                 break;
175             }
176             Poller.pollSelector(kqfd, remainingNanos);
177             numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
178         }
179         return numEntries;
180     }
181 
182     /**
183      * Process changes to the interest ops.
184      */
185     private void processUpdateQueue() {
186         assert Thread.holdsLock(this);
187 
188         synchronized (updateLock) {
189             SelectionKeyImpl ski;
190             while ((ski = updateKeys.pollFirst()) != null) {
191                 if (ski.isValid()) {
192                     int fd = ski.getFDVal();
193                     // add to fdToKey if needed
194                     SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
195                     assert (previous == null) || (previous == ski);
196 
197                     int newEvents = ski.translateInterestOps();
198                     int registeredEvents = ski.registeredEvents();
199 
200                     // DatagramChannelImpl::disconnect has reset socket
201                     if (ski.getAndClearReset() && registeredEvents != 0) {
< prev index next >