1 /*
  2  * Copyright (c) 2011, 2024, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 
 26 package sun.nio.ch;
 27 
 28 import java.io.IOException;
 29 import java.nio.channels.ClosedSelectorException;
 30 import java.nio.channels.SelectionKey;
 31 import java.nio.channels.Selector;
 32 import java.nio.channels.spi.SelectorProvider;
 33 import java.util.ArrayDeque;
 34 import java.util.Deque;
 35 import java.util.HashMap;
 36 import java.util.Map;
 37 import java.util.concurrent.TimeUnit;
 38 import java.util.function.Consumer;
 39 import jdk.internal.misc.Blocker;
 40 
 41 import static sun.nio.ch.KQueue.EVFILT_READ;
 42 import static sun.nio.ch.KQueue.EVFILT_WRITE;
 43 import static sun.nio.ch.KQueue.EV_ADD;
 44 import static sun.nio.ch.KQueue.EV_DELETE;
 45 
 46 /**
 47  * KQueue based Selector implementation for macOS
 48  */
 49 
 50 class KQueueSelectorImpl extends SelectorImpl {
 51 
 52     // maximum number of events to poll in one call to kqueue
 53     private static final int MAX_KEVENTS = 256;
 54 
 55     // kqueue file descriptor
 56     private final int kqfd;
 57 
 58     // address of poll array (event list) when polling for pending events
 59     private final long pollArrayAddress;
 60 
 61     // file descriptors used for interrupt
 62     private final int fd0;
 63     private final int fd1;
 64 
 65     // maps file descriptor to selection key, synchronize on selector
 66     private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
 67 
 68     // pending new registrations/updates, queued by setEventOps
 69     private final Object updateLock = new Object();
 70     private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
 71 
 72     // interrupt triggering and clearing
 73     private final Object interruptLock = new Object();
 74     private boolean interruptTriggered;
 75 
 76     // used by updateSelectedKeys to handle cases where the same file
 77     // descriptor is polled by more than one filter
 78     private int pollCount;
 79 
 80     KQueueSelectorImpl(SelectorProvider sp) throws IOException {
 81         super(sp);
 82 
 83         this.kqfd = KQueue.create();
 84         this.pollArrayAddress = KQueue.allocatePollArray(MAX_KEVENTS);
 85 
 86         try {
 87             long fds = IOUtil.makePipe(false);
 88             this.fd0 = (int) (fds >>> 32);
 89             this.fd1 = (int) fds;
 90         } catch (IOException ioe) {
 91             KQueue.freePollArray(pollArrayAddress);
 92             FileDispatcherImpl.closeIntFD(kqfd);
 93             throw ioe;
 94         }
 95 
 96         // register one end of the socket pair for wakeups
 97         KQueue.register(kqfd, fd0, EVFILT_READ, EV_ADD);
 98     }
 99 
100     private void ensureOpen() {
101         if (!isOpen())
102             throw new ClosedSelectorException();
103     }
104 
105     @Override
106     protected int doSelect(Consumer<SelectionKey> action, long timeout)
107         throws IOException
108     {
109         assert Thread.holdsLock(this);
110 
111         long to = Math.min(timeout, Integer.MAX_VALUE);  // max kqueue timeout
112         boolean blocking = (to != 0);
113         boolean timedPoll = (to > 0);
114 
115         int numEntries;
116         processUpdateQueue();
117         processDeregisterQueue();
118         try {
119             begin(blocking);
120 
121             do {
122                 long startTime = timedPoll ? System.nanoTime() : 0;
123                 boolean attempted = Blocker.begin(blocking);
124                 try {
125                     numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
126                 } finally {
127                     Blocker.end(attempted);
128                 }
129                 if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
130                     // timed poll interrupted so need to adjust timeout
131                     long adjust = System.nanoTime() - startTime;
132                     to -= TimeUnit.NANOSECONDS.toMillis(adjust);
133                     if (to <= 0) {
134                         // timeout expired so no retry
135                         numEntries = 0;
136                     }
137                 }
138             } while (numEntries == IOStatus.INTERRUPTED);
139             assert IOStatus.check(numEntries);
140 
141         } finally {
142             end(blocking);
143         }
144         processDeregisterQueue();
145         return processEvents(numEntries, action);
146     }
147 
148     /**
149      * Process changes to the interest ops.
150      */
151     private void processUpdateQueue() {
152         assert Thread.holdsLock(this);
153 
154         synchronized (updateLock) {
155             SelectionKeyImpl ski;
156             while ((ski = updateKeys.pollFirst()) != null) {
157                 if (ski.isValid()) {
158                     int fd = ski.getFDVal();
159                     // add to fdToKey if needed
160                     SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
161                     assert (previous == null) || (previous == ski);
162 
163                     int newEvents = ski.translateInterestOps();
164                     int registeredEvents = ski.registeredEvents();
165 
166                     // DatagramChannelImpl::disconnect has reset socket
167                     if (ski.getAndClearReset() && registeredEvents != 0) {
168                         KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE);
169                         registeredEvents = 0;
170                     }
171 
172                     if (newEvents != registeredEvents) {
173 
174                         // add or delete interest in read events
175                         if ((registeredEvents & Net.POLLIN) != 0) {
176                             if ((newEvents & Net.POLLIN) == 0) {
177                                 KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE);
178                             }
179                         } else if ((newEvents & Net.POLLIN) != 0) {
180                             KQueue.register(kqfd, fd, EVFILT_READ, EV_ADD);
181                         }
182 
183                         // add or delete interest in write events
184                         if ((registeredEvents & Net.POLLOUT) != 0) {
185                             if ((newEvents & Net.POLLOUT) == 0) {
186                                 KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE);
187                             }
188                         } else if ((newEvents & Net.POLLOUT) != 0) {
189                             KQueue.register(kqfd, fd, EVFILT_WRITE, EV_ADD);
190                         }
191 
192                         ski.registeredEvents(newEvents);
193                     }
194                 }
195             }
196         }
197     }
198 
199     /**
200      * Process the polled events.
201      * If the interrupt fd has been selected, drain it and clear the interrupt.
202      */
203     private int processEvents(int numEntries, Consumer<SelectionKey> action)
204         throws IOException
205     {
206         assert Thread.holdsLock(this);
207 
208         int numKeysUpdated = 0;
209         boolean interrupted = false;
210 
211         // A file descriptor may be registered with kqueue with more than one
212         // filter and so there may be more than one event for a fd. The poll
213         // count is incremented here and compared against the SelectionKey's
214         // "lastPolled" field. This ensures that the ready ops is updated rather
215         // than replaced when a file descriptor is polled by both the read and
216         // write filter.
217         pollCount++;
218 
219         for (int i = 0; i < numEntries; i++) {
220             long kevent = KQueue.getEvent(pollArrayAddress, i);
221             int fd = KQueue.getDescriptor(kevent);
222             if (fd == fd0) {
223                 interrupted = true;
224             } else {
225                 SelectionKeyImpl ski = fdToKey.get(fd);
226                 if (ski != null) {
227                     int rOps = 0;
228                     short filter = KQueue.getFilter(kevent);
229                     if (filter == EVFILT_READ) {
230                         rOps |= Net.POLLIN;
231                     } else if (filter == EVFILT_WRITE) {
232                         rOps |= Net.POLLOUT;
233                     }
234                     int updated = processReadyEvents(rOps, ski, action);
235                     if (updated > 0 && ski.lastPolled != pollCount) {
236                         numKeysUpdated++;
237                         ski.lastPolled = pollCount;
238                     }
239                 }
240             }
241         }
242 
243         if (interrupted) {
244             clearInterrupt();
245         }
246         return numKeysUpdated;
247     }
248 
249     @Override
250     protected void implClose() throws IOException {
251         assert !isOpen();
252         assert Thread.holdsLock(this);
253 
254         // prevent further wakeup
255         synchronized (interruptLock) {
256             interruptTriggered = true;
257         }
258 
259         FileDispatcherImpl.closeIntFD(kqfd);
260         KQueue.freePollArray(pollArrayAddress);
261 
262         FileDispatcherImpl.closeIntFD(fd0);
263         FileDispatcherImpl.closeIntFD(fd1);
264     }
265 
266     @Override
267     protected void implDereg(SelectionKeyImpl ski) throws IOException {
268         assert !ski.isValid();
269         assert Thread.holdsLock(this);
270 
271         int fd = ski.getFDVal();
272         int registeredEvents = ski.registeredEvents();
273         if (fdToKey.remove(fd) != null) {
274             if (registeredEvents != 0) {
275                 if ((registeredEvents & Net.POLLIN) != 0)
276                     KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE);
277                 if ((registeredEvents & Net.POLLOUT) != 0)
278                     KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE);
279                 ski.registeredEvents(0);
280             }
281         } else {
282             assert registeredEvents == 0;
283         }
284     }
285 
286     @Override
287     public void setEventOps(SelectionKeyImpl ski) {
288         ensureOpen();
289         synchronized (updateLock) {
290             updateKeys.addLast(ski);
291         }
292     }
293 
294     @Override
295     public Selector wakeup() {
296         synchronized (interruptLock) {
297             if (!interruptTriggered) {
298                 try {
299                     IOUtil.write1(fd1, (byte)0);
300                 } catch (IOException ioe) {
301                     throw new InternalError(ioe);
302                 }
303                 interruptTriggered = true;
304             }
305         }
306         return this;
307     }
308 
309     private void clearInterrupt() throws IOException {
310         synchronized (interruptLock) {
311             IOUtil.drain(fd0);
312             interruptTriggered = false;
313         }
314     }
315 }