1 /* 2 * Copyright (c) 2011, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.io.IOException; 29 import java.nio.channels.ClosedSelectorException; 30 import java.nio.channels.SelectionKey; 31 import java.nio.channels.Selector; 32 import java.nio.channels.spi.SelectorProvider; 33 import java.util.ArrayDeque; 34 import java.util.Deque; 35 import java.util.HashMap; 36 import java.util.Map; 37 import java.util.concurrent.TimeUnit; 38 import java.util.function.Consumer; 39 import jdk.internal.misc.Blocker; 40 41 import static sun.nio.ch.KQueue.EVFILT_READ; 42 import static sun.nio.ch.KQueue.EVFILT_WRITE; 43 import static sun.nio.ch.KQueue.EV_ADD; 44 import static sun.nio.ch.KQueue.EV_DELETE; 45 46 /** 47 * KQueue based Selector implementation for macOS 48 */ 49 50 class KQueueSelectorImpl extends SelectorImpl { 51 52 // maximum number of events to poll in one call to kqueue 53 private static final int MAX_KEVENTS = 256; 54 55 // kqueue file descriptor 56 private final int kqfd; 57 58 // address of poll array (event list) when polling for pending events 59 private final long pollArrayAddress; 60 61 // file descriptors used for interrupt 62 private final int fd0; 63 private final int fd1; 64 65 // maps file descriptor to selection key, synchronize on selector 66 private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>(); 67 68 // pending new registrations/updates, queued by setEventOps 69 private final Object updateLock = new Object(); 70 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); 71 72 // interrupt triggering and clearing 73 private final Object interruptLock = new Object(); 74 private boolean interruptTriggered; 75 76 // used by updateSelectedKeys to handle cases where the same file 77 // descriptor is polled by more than one filter 78 private int pollCount; 79 80 KQueueSelectorImpl(SelectorProvider sp) throws IOException { 81 super(sp); 82 83 this.kqfd = KQueue.create(); 84 this.pollArrayAddress = KQueue.allocatePollArray(MAX_KEVENTS); 85 86 try { 87 long fds = IOUtil.makePipe(false); 88 this.fd0 = (int) (fds >>> 32); 89 this.fd1 = (int) fds; 90 } catch (IOException ioe) { 91 KQueue.freePollArray(pollArrayAddress); 92 FileDispatcherImpl.closeIntFD(kqfd); 93 throw ioe; 94 } 95 96 // register one end of the socket pair for wakeups 97 KQueue.register(kqfd, fd0, EVFILT_READ, EV_ADD); 98 } 99 100 private void ensureOpen() { 101 if (!isOpen()) 102 throw new ClosedSelectorException(); 103 } 104 105 @Override 106 protected int doSelect(Consumer<SelectionKey> action, long timeout) 107 throws IOException 108 { 109 assert Thread.holdsLock(this); 110 111 long to = Math.min(timeout, Integer.MAX_VALUE); // max kqueue timeout 112 boolean blocking = (to != 0); 113 boolean timedPoll = (to > 0); 114 115 int numEntries; 116 processUpdateQueue(); 117 processDeregisterQueue(); 118 try { 119 begin(blocking); 120 121 do { 122 long startTime = timedPoll ? System.nanoTime() : 0; 123 boolean attempted = Blocker.begin(blocking); 124 try { 125 numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to); 126 } finally { 127 Blocker.end(attempted); 128 } 129 if (numEntries == IOStatus.INTERRUPTED && timedPoll) { 130 // timed poll interrupted so need to adjust timeout 131 long adjust = System.nanoTime() - startTime; 132 to -= TimeUnit.NANOSECONDS.toMillis(adjust); 133 if (to <= 0) { 134 // timeout expired so no retry 135 numEntries = 0; 136 } 137 } 138 } while (numEntries == IOStatus.INTERRUPTED); 139 assert IOStatus.check(numEntries); 140 141 } finally { 142 end(blocking); 143 } 144 processDeregisterQueue(); 145 return processEvents(numEntries, action); 146 } 147 148 /** 149 * Process changes to the interest ops. 150 */ 151 private void processUpdateQueue() { 152 assert Thread.holdsLock(this); 153 154 synchronized (updateLock) { 155 SelectionKeyImpl ski; 156 while ((ski = updateKeys.pollFirst()) != null) { 157 if (ski.isValid()) { 158 int fd = ski.getFDVal(); 159 // add to fdToKey if needed 160 SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski); 161 assert (previous == null) || (previous == ski); 162 163 int newEvents = ski.translateInterestOps(); 164 int registeredEvents = ski.registeredEvents(); 165 166 // DatagramChannelImpl::disconnect has reset socket 167 if (ski.getAndClearReset() && registeredEvents != 0) { 168 KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE); 169 registeredEvents = 0; 170 } 171 172 if (newEvents != registeredEvents) { 173 174 // add or delete interest in read events 175 if ((registeredEvents & Net.POLLIN) != 0) { 176 if ((newEvents & Net.POLLIN) == 0) { 177 KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE); 178 } 179 } else if ((newEvents & Net.POLLIN) != 0) { 180 KQueue.register(kqfd, fd, EVFILT_READ, EV_ADD); 181 } 182 183 // add or delete interest in write events 184 if ((registeredEvents & Net.POLLOUT) != 0) { 185 if ((newEvents & Net.POLLOUT) == 0) { 186 KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE); 187 } 188 } else if ((newEvents & Net.POLLOUT) != 0) { 189 KQueue.register(kqfd, fd, EVFILT_WRITE, EV_ADD); 190 } 191 192 ski.registeredEvents(newEvents); 193 } 194 } 195 } 196 } 197 } 198 199 /** 200 * Process the polled events. 201 * If the interrupt fd has been selected, drain it and clear the interrupt. 202 */ 203 private int processEvents(int numEntries, Consumer<SelectionKey> action) 204 throws IOException 205 { 206 assert Thread.holdsLock(this); 207 208 int numKeysUpdated = 0; 209 boolean interrupted = false; 210 211 // A file descriptor may be registered with kqueue with more than one 212 // filter and so there may be more than one event for a fd. The poll 213 // count is incremented here and compared against the SelectionKey's 214 // "lastPolled" field. This ensures that the ready ops is updated rather 215 // than replaced when a file descriptor is polled by both the read and 216 // write filter. 217 pollCount++; 218 219 for (int i = 0; i < numEntries; i++) { 220 long kevent = KQueue.getEvent(pollArrayAddress, i); 221 int fd = KQueue.getDescriptor(kevent); 222 if (fd == fd0) { 223 interrupted = true; 224 } else { 225 SelectionKeyImpl ski = fdToKey.get(fd); 226 if (ski != null) { 227 int rOps = 0; 228 short filter = KQueue.getFilter(kevent); 229 if (filter == EVFILT_READ) { 230 rOps |= Net.POLLIN; 231 } else if (filter == EVFILT_WRITE) { 232 rOps |= Net.POLLOUT; 233 } 234 int updated = processReadyEvents(rOps, ski, action); 235 if (updated > 0 && ski.lastPolled != pollCount) { 236 numKeysUpdated++; 237 ski.lastPolled = pollCount; 238 } 239 } 240 } 241 } 242 243 if (interrupted) { 244 clearInterrupt(); 245 } 246 return numKeysUpdated; 247 } 248 249 @Override 250 protected void implClose() throws IOException { 251 assert !isOpen(); 252 assert Thread.holdsLock(this); 253 254 // prevent further wakeup 255 synchronized (interruptLock) { 256 interruptTriggered = true; 257 } 258 259 FileDispatcherImpl.closeIntFD(kqfd); 260 KQueue.freePollArray(pollArrayAddress); 261 262 FileDispatcherImpl.closeIntFD(fd0); 263 FileDispatcherImpl.closeIntFD(fd1); 264 } 265 266 @Override 267 protected void implDereg(SelectionKeyImpl ski) throws IOException { 268 assert !ski.isValid(); 269 assert Thread.holdsLock(this); 270 271 int fd = ski.getFDVal(); 272 int registeredEvents = ski.registeredEvents(); 273 if (fdToKey.remove(fd) != null) { 274 if (registeredEvents != 0) { 275 if ((registeredEvents & Net.POLLIN) != 0) 276 KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE); 277 if ((registeredEvents & Net.POLLOUT) != 0) 278 KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE); 279 ski.registeredEvents(0); 280 } 281 } else { 282 assert registeredEvents == 0; 283 } 284 } 285 286 @Override 287 public void setEventOps(SelectionKeyImpl ski) { 288 ensureOpen(); 289 synchronized (updateLock) { 290 updateKeys.addLast(ski); 291 } 292 } 293 294 @Override 295 public Selector wakeup() { 296 synchronized (interruptLock) { 297 if (!interruptTriggered) { 298 try { 299 IOUtil.write1(fd1, (byte)0); 300 } catch (IOException ioe) { 301 throw new InternalError(ioe); 302 } 303 interruptTriggered = true; 304 } 305 } 306 return this; 307 } 308 309 private void clearInterrupt() throws IOException { 310 synchronized (interruptLock) { 311 IOUtil.drain(fd0); 312 interruptTriggered = false; 313 } 314 } 315 }