1 /* 2 * Copyright (c) 2005, 2022, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.io.IOException; 29 import java.nio.channels.ClosedSelectorException; 30 import java.nio.channels.SelectionKey; 31 import java.nio.channels.Selector; 32 import java.nio.channels.spi.SelectorProvider; 33 import java.util.ArrayDeque; 34 import java.util.Deque; 35 import java.util.HashMap; 36 import java.util.Map; 37 import java.util.concurrent.TimeUnit; 38 import java.util.function.Consumer; 39 40 import static sun.nio.ch.EPoll.EPOLLIN; 41 import static sun.nio.ch.EPoll.EPOLL_CTL_ADD; 42 import static sun.nio.ch.EPoll.EPOLL_CTL_DEL; 43 import static sun.nio.ch.EPoll.EPOLL_CTL_MOD; 44 45 46 /** 47 * Linux epoll based Selector implementation 48 */ 49 50 class EPollSelectorImpl extends SelectorImpl { 51 52 // maximum number of events to poll in one call to epoll_wait 53 private static final int NUM_EPOLLEVENTS = Math.min(IOUtil.fdLimit(), 1024); 54 55 // epoll file descriptor 56 private final int epfd; 57 58 // address of poll array when polling with epoll_wait 59 private final long pollArrayAddress; 60 61 // eventfd object used for interrupt 62 private final EventFD eventfd; 63 64 // maps file descriptor to selection key, synchronize on selector 65 private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>(); 66 67 // pending new registrations/updates, queued by setEventOps 68 private final Object updateLock = new Object(); 69 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); 70 71 // interrupt triggering and clearing 72 private final Object interruptLock = new Object(); 73 private boolean interruptTriggered; 74 75 EPollSelectorImpl(SelectorProvider sp) throws IOException { 76 super(sp); 77 78 this.epfd = EPoll.create(); 79 this.pollArrayAddress = EPoll.allocatePollArray(NUM_EPOLLEVENTS); 80 81 try { 82 this.eventfd = new EventFD(); 83 IOUtil.configureBlocking(IOUtil.newFD(eventfd.efd()), false); 84 } catch (IOException ioe) { 85 EPoll.freePollArray(pollArrayAddress); 86 FileDispatcherImpl.closeIntFD(epfd); 87 throw ioe; 88 } 89 90 // register the eventfd object for wakeups 91 EPoll.ctl(epfd, EPOLL_CTL_ADD, eventfd.efd(), EPOLLIN); 92 } 93 94 private void ensureOpen() { 95 if (!isOpen()) 96 throw new ClosedSelectorException(); 97 } 98 99 @Override 100 protected int doSelect(Consumer<SelectionKey> action, long timeout) 101 throws IOException 102 { 103 assert Thread.holdsLock(this); 104 105 // epoll_wait timeout is int 106 int to = (int) Math.min(timeout, Integer.MAX_VALUE); 107 boolean blocking = (to != 0); 108 boolean timedPoll = (to > 0); 109 110 int numEntries; 111 processUpdateQueue(); 112 processDeregisterQueue(); 113 114 if (Thread.currentThread().isVirtual()) { 115 numEntries = (timedPoll) 116 ? timedPoll(TimeUnit.MILLISECONDS.toNanos(to)) 117 : untimedPoll(blocking); 118 } else { 119 try { 120 begin(blocking); 121 do { 122 long startTime = timedPoll ? System.nanoTime() : 0; 123 numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to); 124 if (numEntries == IOStatus.INTERRUPTED && timedPoll) { 125 // timed poll interrupted so need to adjust timeout 126 long adjust = System.nanoTime() - startTime; 127 to -= (int) TimeUnit.NANOSECONDS.toMillis(adjust); 128 if (to <= 0) { 129 // timeout expired so no retry 130 numEntries = 0; 131 } 132 } 133 } while (numEntries == IOStatus.INTERRUPTED); 134 } finally { 135 end(blocking); 136 } 137 } 138 assert IOStatus.check(numEntries); 139 140 processDeregisterQueue(); 141 return processEvents(numEntries, action); 142 } 143 144 /** 145 * If blocking, parks the current virtual thread until a file descriptor is polled 146 * or the thread is interrupted. 147 */ 148 private int untimedPoll(boolean block) throws IOException { 149 int numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, 0); 150 if (block) { 151 while (numEntries == 0 && !Thread.currentThread().isInterrupted()) { 152 Poller.pollSelector(epfd, 0); 153 numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, 0); 154 } 155 } 156 return numEntries; 157 } 158 159 /** 160 * Parks the current virtual thread until a file descriptor is polled, or the thread 161 * is interrupted, for up to the specified waiting time. 162 */ 163 private int timedPoll(long nanos) throws IOException { 164 long startNanos = System.nanoTime(); 165 int numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, 0); 166 while (numEntries == 0 && !Thread.currentThread().isInterrupted()) { 167 long remainingNanos = nanos - (System.nanoTime() - startNanos); 168 if (remainingNanos <= 0) { 169 // timeout 170 break; 171 } 172 Poller.pollSelector(epfd, remainingNanos); 173 numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, 0); 174 } 175 return numEntries; 176 } 177 178 /** 179 * Process changes to the interest ops. 180 */ 181 private void processUpdateQueue() { 182 assert Thread.holdsLock(this); 183 184 synchronized (updateLock) { 185 SelectionKeyImpl ski; 186 while ((ski = updateKeys.pollFirst()) != null) { 187 if (ski.isValid()) { 188 int fd = ski.getFDVal(); 189 // add to fdToKey if needed 190 SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski); 191 assert (previous == null) || (previous == ski); 192 193 int newEvents = ski.translateInterestOps(); 194 int registeredEvents = ski.registeredEvents(); 195 if (newEvents != registeredEvents) { 196 if (newEvents == 0) { 197 // remove from epoll 198 EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0); 199 } else { 200 if (registeredEvents == 0) { 201 // add to epoll 202 EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, newEvents); 203 } else { 204 // modify events 205 EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, newEvents); 206 } 207 } 208 ski.registeredEvents(newEvents); 209 } 210 } 211 } 212 } 213 } 214 215 /** 216 * Process the polled events. 217 * If the interrupt fd has been selected, drain it and clear the interrupt. 218 */ 219 private int processEvents(int numEntries, Consumer<SelectionKey> action) 220 throws IOException 221 { 222 assert Thread.holdsLock(this); 223 224 boolean interrupted = false; 225 int numKeysUpdated = 0; 226 for (int i=0; i<numEntries; i++) { 227 long event = EPoll.getEvent(pollArrayAddress, i); 228 int fd = EPoll.getDescriptor(event); 229 if (fd == eventfd.efd()) { 230 interrupted = true; 231 } else { 232 SelectionKeyImpl ski = fdToKey.get(fd); 233 if (ski != null) { 234 int rOps = EPoll.getEvents(event); 235 numKeysUpdated += processReadyEvents(rOps, ski, action); 236 } 237 } 238 } 239 240 if (interrupted) { 241 clearInterrupt(); 242 } 243 244 return numKeysUpdated; 245 } 246 247 @Override 248 protected void implClose() throws IOException { 249 assert Thread.holdsLock(this); 250 251 // prevent further wakeup 252 synchronized (interruptLock) { 253 interruptTriggered = true; 254 } 255 256 FileDispatcherImpl.closeIntFD(epfd); 257 EPoll.freePollArray(pollArrayAddress); 258 259 eventfd.close(); 260 } 261 262 @Override 263 protected void implDereg(SelectionKeyImpl ski) throws IOException { 264 assert !ski.isValid(); 265 assert Thread.holdsLock(this); 266 267 int fd = ski.getFDVal(); 268 if (fdToKey.remove(fd) != null) { 269 if (ski.registeredEvents() != 0) { 270 EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0); 271 ski.registeredEvents(0); 272 } 273 } else { 274 assert ski.registeredEvents() == 0; 275 } 276 } 277 278 @Override 279 public void setEventOps(SelectionKeyImpl ski) { 280 ensureOpen(); 281 synchronized (updateLock) { 282 updateKeys.addLast(ski); 283 } 284 } 285 286 @Override 287 public Selector wakeup() { 288 synchronized (interruptLock) { 289 if (!interruptTriggered) { 290 try { 291 eventfd.set(); 292 } catch (IOException ioe) { 293 throw new InternalError(ioe); 294 } 295 interruptTriggered = true; 296 } 297 } 298 return this; 299 } 300 301 private void clearInterrupt() throws IOException { 302 synchronized (interruptLock) { 303 eventfd.reset(); 304 interruptTriggered = false; 305 } 306 } 307 }