1 /* 2 * Copyright (c) 2005, 2022, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.io.IOException; 29 import java.nio.channels.SelectionKey; 30 import java.nio.channels.Selector; 31 import java.nio.channels.spi.SelectorProvider; 32 import java.util.ArrayDeque; 33 import java.util.Deque; 34 import java.util.HashMap; 35 import java.util.Map; 36 import java.util.concurrent.TimeUnit; 37 import java.util.function.Consumer; 38 39 import static sun.nio.ch.EPoll.EPOLLIN; 40 import static sun.nio.ch.EPoll.EPOLL_CTL_ADD; 41 import static sun.nio.ch.EPoll.EPOLL_CTL_DEL; 42 import static sun.nio.ch.EPoll.EPOLL_CTL_MOD; 43 44 45 /** 46 * Linux epoll based Selector implementation 47 */ 48 49 class EPollSelectorImpl extends SelectorImpl { 50 51 // maximum number of events to poll in one call to epoll_wait 52 private static final int NUM_EPOLLEVENTS = Math.min(IOUtil.fdLimit(), 1024); 53 54 // epoll file descriptor 55 private final int epfd; 56 57 // address of poll array when polling with epoll_wait 58 private final long pollArrayAddress; 59 60 // eventfd object used for interrupt 61 private final EventFD eventfd; 62 63 // maps file descriptor to selection key, synchronize on selector 64 private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>(); 65 66 // pending new registrations/updates, queued by setEventOps 67 private final Object updateLock = new Object(); 68 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); 69 70 // interrupt triggering and clearing 71 private final Object interruptLock = new Object(); 72 private boolean interruptTriggered; 73 74 EPollSelectorImpl(SelectorProvider sp) throws IOException { 75 super(sp); 76 77 this.epfd = EPoll.create(); 78 this.pollArrayAddress = EPoll.allocatePollArray(NUM_EPOLLEVENTS); 79 80 try { 81 this.eventfd = new EventFD(); 82 IOUtil.configureBlocking(IOUtil.newFD(eventfd.efd()), false); 83 } catch (IOException ioe) { 84 EPoll.freePollArray(pollArrayAddress); 85 FileDispatcherImpl.closeIntFD(epfd); 86 throw ioe; 87 } 88 89 // register the eventfd object for wakeups 90 EPoll.ctl(epfd, EPOLL_CTL_ADD, eventfd.efd(), EPOLLIN); 91 } 92 93 @Override 94 protected int doSelect(Consumer<SelectionKey> action, long timeout) 95 throws IOException 96 { 97 assert Thread.holdsLock(this); 98 99 // epoll_wait timeout is int 100 int to = (int) Math.min(timeout, Integer.MAX_VALUE); 101 boolean blocking = (to != 0); 102 boolean timedPoll = (to > 0); 103 104 int numEntries; 105 processUpdateQueue(); 106 processDeregisterQueue(); 107 108 if (Thread.currentThread().isVirtual()) { 109 numEntries = (timedPoll) 110 ? timedPoll(TimeUnit.MILLISECONDS.toNanos(to)) 111 : untimedPoll(blocking); 112 } else { 113 try { 114 begin(blocking); 115 do { 116 long startTime = timedPoll ? System.nanoTime() : 0; 117 numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to); 118 if (numEntries == IOStatus.INTERRUPTED && timedPoll) { 119 // timed poll interrupted so need to adjust timeout 120 long adjust = System.nanoTime() - startTime; 121 to -= (int) TimeUnit.NANOSECONDS.toMillis(adjust); 122 if (to <= 0) { 123 // timeout expired so no retry 124 numEntries = 0; 125 } 126 } 127 } while (numEntries == IOStatus.INTERRUPTED); 128 } finally { 129 end(blocking); 130 } 131 } 132 assert IOStatus.check(numEntries); 133 134 processDeregisterQueue(); 135 return processEvents(numEntries, action); 136 } 137 138 /** 139 * If blocking, parks the current virtual thread until a file descriptor is polled 140 * or the thread is interrupted. 141 */ 142 private int untimedPoll(boolean block) throws IOException { 143 int numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, 0); 144 if (block) { 145 while (numEntries == 0 && !Thread.currentThread().isInterrupted()) { 146 Poller.pollSelector(epfd, 0); 147 numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, 0); 148 } 149 } 150 return numEntries; 151 } 152 153 /** 154 * Parks the current virtual thread until a file descriptor is polled, or the thread 155 * is interrupted, for up to the specified waiting time. 156 */ 157 private int timedPoll(long nanos) throws IOException { 158 long startNanos = System.nanoTime(); 159 int numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, 0); 160 while (numEntries == 0 && !Thread.currentThread().isInterrupted()) { 161 long remainingNanos = nanos - (System.nanoTime() - startNanos); 162 if (remainingNanos <= 0) { 163 // timeout 164 break; 165 } 166 Poller.pollSelector(epfd, remainingNanos); 167 numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, 0); 168 } 169 return numEntries; 170 } 171 172 /** 173 * Process changes to the interest ops. 174 */ 175 private void processUpdateQueue() { 176 assert Thread.holdsLock(this); 177 178 synchronized (updateLock) { 179 SelectionKeyImpl ski; 180 while ((ski = updateKeys.pollFirst()) != null) { 181 if (ski.isValid()) { 182 int fd = ski.getFDVal(); 183 // add to fdToKey if needed 184 SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski); 185 assert (previous == null) || (previous == ski); 186 187 int newEvents = ski.translateInterestOps(); 188 int registeredEvents = ski.registeredEvents(); 189 if (newEvents != registeredEvents) { 190 if (newEvents == 0) { 191 // remove from epoll 192 EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0); 193 } else { 194 if (registeredEvents == 0) { 195 // add to epoll 196 EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, newEvents); 197 } else { 198 // modify events 199 EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, newEvents); 200 } 201 } 202 ski.registeredEvents(newEvents); 203 } 204 } 205 } 206 } 207 } 208 209 /** 210 * Process the polled events. 211 * If the interrupt fd has been selected, drain it and clear the interrupt. 212 */ 213 private int processEvents(int numEntries, Consumer<SelectionKey> action) 214 throws IOException 215 { 216 assert Thread.holdsLock(this); 217 218 boolean interrupted = false; 219 int numKeysUpdated = 0; 220 for (int i=0; i<numEntries; i++) { 221 long event = EPoll.getEvent(pollArrayAddress, i); 222 int fd = EPoll.getDescriptor(event); 223 if (fd == eventfd.efd()) { 224 interrupted = true; 225 } else { 226 SelectionKeyImpl ski = fdToKey.get(fd); 227 if (ski != null) { 228 int rOps = EPoll.getEvents(event); 229 numKeysUpdated += processReadyEvents(rOps, ski, action); 230 } 231 } 232 } 233 234 if (interrupted) { 235 clearInterrupt(); 236 } 237 238 return numKeysUpdated; 239 } 240 241 @Override 242 protected void implClose() throws IOException { 243 assert Thread.holdsLock(this); 244 245 // prevent further wakeup 246 synchronized (interruptLock) { 247 interruptTriggered = true; 248 } 249 250 FileDispatcherImpl.closeIntFD(epfd); 251 EPoll.freePollArray(pollArrayAddress); 252 253 eventfd.close(); 254 } 255 256 @Override 257 protected void implDereg(SelectionKeyImpl ski) throws IOException { 258 assert !ski.isValid(); 259 assert Thread.holdsLock(this); 260 261 int fd = ski.getFDVal(); 262 if (fdToKey.remove(fd) != null) { 263 if (ski.registeredEvents() != 0) { 264 EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0); 265 ski.registeredEvents(0); 266 } 267 } else { 268 assert ski.registeredEvents() == 0; 269 } 270 } 271 272 @Override 273 public void setEventOps(SelectionKeyImpl ski) { 274 synchronized (updateLock) { 275 updateKeys.addLast(ski); 276 } 277 } 278 279 @Override 280 public Selector wakeup() { 281 synchronized (interruptLock) { 282 if (!interruptTriggered) { 283 try { 284 eventfd.set(); 285 } catch (IOException ioe) { 286 throw new InternalError(ioe); 287 } 288 interruptTriggered = true; 289 } 290 } 291 return this; 292 } 293 294 private void clearInterrupt() throws IOException { 295 synchronized (interruptLock) { 296 eventfd.reset(); 297 interruptTriggered = false; 298 } 299 } 300 }