1 /*
  2  * Copyright (c) 2011, 2022, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 
 26 package sun.nio.ch;
 27 
 28 import java.io.IOException;
 29 import java.nio.channels.ClosedSelectorException;
 30 import java.nio.channels.SelectionKey;
 31 import java.nio.channels.Selector;
 32 import java.nio.channels.spi.SelectorProvider;
 33 import java.util.ArrayDeque;
 34 import java.util.Deque;
 35 import java.util.HashMap;
 36 import java.util.Map;
 37 import java.util.concurrent.TimeUnit;
 38 import java.util.function.Consumer;
 39 
 40 import static sun.nio.ch.KQueue.EVFILT_READ;
 41 import static sun.nio.ch.KQueue.EVFILT_WRITE;
 42 import static sun.nio.ch.KQueue.EV_ADD;
 43 import static sun.nio.ch.KQueue.EV_DELETE;
 44 
 45 /**
 46  * KQueue based Selector implementation for macOS
 47  */
 48 
 49 class KQueueSelectorImpl extends SelectorImpl {
 50 
 51     // maximum number of events to poll in one call to kqueue
 52     private static final int MAX_KEVENTS = 256;
 53 
 54     // kqueue file descriptor
 55     private final int kqfd;
 56 
 57     // address of poll array (event list) when polling for pending events
 58     private final long pollArrayAddress;
 59 
 60     // file descriptors used for interrupt
 61     private final int fd0;
 62     private final int fd1;
 63 
 64     // maps file descriptor to selection key, synchronize on selector
 65     private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
 66 
 67     // pending new registrations/updates, queued by setEventOps
 68     private final Object updateLock = new Object();
 69     private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
 70 
 71     // interrupt triggering and clearing
 72     private final Object interruptLock = new Object();
 73     private boolean interruptTriggered;
 74 
 75     // used by updateSelectedKeys to handle cases where the same file
 76     // descriptor is polled by more than one filter
 77     private int pollCount;
 78 
 79     KQueueSelectorImpl(SelectorProvider sp) throws IOException {
 80         super(sp);
 81 
 82         this.kqfd = KQueue.create();
 83         this.pollArrayAddress = KQueue.allocatePollArray(MAX_KEVENTS);
 84 
 85         try {
 86             long fds = IOUtil.makePipe(false);
 87             this.fd0 = (int) (fds >>> 32);
 88             this.fd1 = (int) fds;
 89         } catch (IOException ioe) {
 90             KQueue.freePollArray(pollArrayAddress);
 91             FileDispatcherImpl.closeIntFD(kqfd);
 92             throw ioe;
 93         }
 94 
 95         // register one end of the socket pair for wakeups
 96         KQueue.register(kqfd, fd0, EVFILT_READ, EV_ADD);
 97     }
 98 
 99     private void ensureOpen() {
100         if (!isOpen())
101             throw new ClosedSelectorException();
102     }
103 
104     @Override
105     protected int doSelect(Consumer<SelectionKey> action, long timeout)
106         throws IOException
107     {
108         assert Thread.holdsLock(this);
109 
110         long to = Math.min(timeout, Integer.MAX_VALUE);  // max kqueue timeout
111         boolean blocking = (to != 0);
112         boolean timedPoll = (to > 0);
113 
114         int numEntries;
115         processUpdateQueue();
116         processDeregisterQueue();
117 
118         if (Thread.currentThread().isVirtual()) {
119             numEntries = (timedPoll)
120                     ? timedPoll(TimeUnit.MILLISECONDS.toNanos(to))
121                     : untimedPoll(blocking);
122         } else {
123             try {
124                 begin(blocking);
125                 do {
126                     long startTime = timedPoll ? System.nanoTime() : 0;
127                     numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
128                     if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
129                         // timed poll interrupted so need to adjust timeout
130                         long adjust = System.nanoTime() - startTime;
131                         to -= TimeUnit.NANOSECONDS.toMillis(adjust);
132                         if (to <= 0) {
133                             // timeout expired so no retry
134                             numEntries = 0;
135                         }
136                     }
137                 } while (numEntries == IOStatus.INTERRUPTED);
138             } finally {
139                 end(blocking);
140             }
141         }
142         assert IOStatus.check(numEntries);
143 
144         processDeregisterQueue();
145         return processEvents(numEntries, action);
146     }
147 
148     /**
149      * If blocking, parks the current virtual thread until a file descriptor is polled
150      * or the thread is interrupted.
151      */
152     private int untimedPoll(boolean block) throws IOException {
153         int numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
154         if (block) {
155             while (numEntries == 0 && !Thread.currentThread().isInterrupted()) {
156                 Poller.pollSelector(kqfd, 0);
157                 numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
158             }
159         }
160         return numEntries;
161     }
162 
163     /**
164      * Parks the current virtual thread until a file descriptor is polled, or the thread
165      * is interrupted, for up to the specified waiting time.
166      */
167     private int timedPoll(long nanos) throws IOException {
168         long startNanos = System.nanoTime();
169         int numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
170         while (numEntries == 0 && !Thread.currentThread().isInterrupted()) {
171             long remainingNanos = nanos - (System.nanoTime() - startNanos);
172             if (remainingNanos <= 0) {
173                 // timeout
174                 break;
175             }
176             Poller.pollSelector(kqfd, remainingNanos);
177             numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
178         }
179         return numEntries;
180     }
181 
182     /**
183      * Process changes to the interest ops.
184      */
185     private void processUpdateQueue() {
186         assert Thread.holdsLock(this);
187 
188         synchronized (updateLock) {
189             SelectionKeyImpl ski;
190             while ((ski = updateKeys.pollFirst()) != null) {
191                 if (ski.isValid()) {
192                     int fd = ski.getFDVal();
193                     // add to fdToKey if needed
194                     SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
195                     assert (previous == null) || (previous == ski);
196 
197                     int newEvents = ski.translateInterestOps();
198                     int registeredEvents = ski.registeredEvents();
199 
200                     // DatagramChannelImpl::disconnect has reset socket
201                     if (ski.getAndClearReset() && registeredEvents != 0) {
202                         KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE);
203                         registeredEvents = 0;
204                     }
205 
206                     if (newEvents != registeredEvents) {
207 
208                         // add or delete interest in read events
209                         if ((registeredEvents & Net.POLLIN) != 0) {
210                             if ((newEvents & Net.POLLIN) == 0) {
211                                 KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE);
212                             }
213                         } else if ((newEvents & Net.POLLIN) != 0) {
214                             KQueue.register(kqfd, fd, EVFILT_READ, EV_ADD);
215                         }
216 
217                         // add or delete interest in write events
218                         if ((registeredEvents & Net.POLLOUT) != 0) {
219                             if ((newEvents & Net.POLLOUT) == 0) {
220                                 KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE);
221                             }
222                         } else if ((newEvents & Net.POLLOUT) != 0) {
223                             KQueue.register(kqfd, fd, EVFILT_WRITE, EV_ADD);
224                         }
225 
226                         ski.registeredEvents(newEvents);
227                     }
228                 }
229             }
230         }
231     }
232 
233     /**
234      * Process the polled events.
235      * If the interrupt fd has been selected, drain it and clear the interrupt.
236      */
237     private int processEvents(int numEntries, Consumer<SelectionKey> action)
238         throws IOException
239     {
240         assert Thread.holdsLock(this);
241 
242         int numKeysUpdated = 0;
243         boolean interrupted = false;
244 
245         // A file descriptor may be registered with kqueue with more than one
246         // filter and so there may be more than one event for a fd. The poll
247         // count is incremented here and compared against the SelectionKey's
248         // "lastPolled" field. This ensures that the ready ops is updated rather
249         // than replaced when a file descriptor is polled by both the read and
250         // write filter.
251         pollCount++;
252 
253         for (int i = 0; i < numEntries; i++) {
254             long kevent = KQueue.getEvent(pollArrayAddress, i);
255             int fd = KQueue.getDescriptor(kevent);
256             if (fd == fd0) {
257                 interrupted = true;
258             } else {
259                 SelectionKeyImpl ski = fdToKey.get(fd);
260                 if (ski != null) {
261                     int rOps = 0;
262                     short filter = KQueue.getFilter(kevent);
263                     if (filter == EVFILT_READ) {
264                         rOps |= Net.POLLIN;
265                     } else if (filter == EVFILT_WRITE) {
266                         rOps |= Net.POLLOUT;
267                     }
268                     int updated = processReadyEvents(rOps, ski, action);
269                     if (updated > 0 && ski.lastPolled != pollCount) {
270                         numKeysUpdated++;
271                         ski.lastPolled = pollCount;
272                     }
273                 }
274             }
275         }
276 
277         if (interrupted) {
278             clearInterrupt();
279         }
280         return numKeysUpdated;
281     }
282 
283     @Override
284     protected void implClose() throws IOException {
285         assert !isOpen();
286         assert Thread.holdsLock(this);
287 
288         // prevent further wakeup
289         synchronized (interruptLock) {
290             interruptTriggered = true;
291         }
292 
293         FileDispatcherImpl.closeIntFD(kqfd);
294         KQueue.freePollArray(pollArrayAddress);
295 
296         FileDispatcherImpl.closeIntFD(fd0);
297         FileDispatcherImpl.closeIntFD(fd1);
298     }
299 
300     @Override
301     protected void implDereg(SelectionKeyImpl ski) throws IOException {
302         assert !ski.isValid();
303         assert Thread.holdsLock(this);
304 
305         int fd = ski.getFDVal();
306         int registeredEvents = ski.registeredEvents();
307         if (fdToKey.remove(fd) != null) {
308             if (registeredEvents != 0) {
309                 if ((registeredEvents & Net.POLLIN) != 0)
310                     KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE);
311                 if ((registeredEvents & Net.POLLOUT) != 0)
312                     KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE);
313                 ski.registeredEvents(0);
314             }
315         } else {
316             assert registeredEvents == 0;
317         }
318     }
319 
320     @Override
321     public void setEventOps(SelectionKeyImpl ski) {
322         ensureOpen();
323         synchronized (updateLock) {
324             updateKeys.addLast(ski);
325         }
326     }
327 
328     @Override
329     public Selector wakeup() {
330         synchronized (interruptLock) {
331             if (!interruptTriggered) {
332                 try {
333                     IOUtil.write1(fd1, (byte)0);
334                 } catch (IOException ioe) {
335                     throw new InternalError(ioe);
336                 }
337                 interruptTriggered = true;
338             }
339         }
340         return this;
341     }
342 
343     private void clearInterrupt() throws IOException {
344         synchronized (interruptLock) {
345             IOUtil.drain(fd0);
346             interruptTriggered = false;
347         }
348     }
349 }