1 /* 2 * Copyright (c) 2001, 2022, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.nio.ch; 26 27 import java.io.IOException; 28 import java.nio.channels.ClosedSelectorException; 29 import java.nio.channels.SelectionKey; 30 import java.nio.channels.Selector; 31 import java.nio.channels.spi.SelectorProvider; 32 import java.util.ArrayDeque; 33 import java.util.ArrayList; 34 import java.util.Deque; 35 import java.util.List; 36 import java.util.concurrent.TimeUnit; 37 import java.util.function.Consumer; 38 import jdk.internal.misc.Blocker; 39 import jdk.internal.misc.Unsafe; 40 41 /** 42 * Selector implementation based on poll 43 */ 44 45 class PollSelectorImpl extends SelectorImpl { 46 47 // initial capacity of poll array 48 private static final int INITIAL_CAPACITY = 16; 49 50 // poll array, grows as needed 51 private int pollArrayCapacity = INITIAL_CAPACITY; 52 private int pollArraySize; 53 private AllocatedNativeObject pollArray; 54 55 // file descriptors used for interrupt 56 private final int fd0; 57 private final int fd1; 58 59 // keys for file descriptors in poll array, synchronize on selector 60 private final List<SelectionKeyImpl> pollKeys = new ArrayList<>(); 61 62 // pending updates, queued by putEventOps 63 private final Object updateLock = new Object(); 64 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); 65 66 // interrupt triggering and clearing 67 private final Object interruptLock = new Object(); 68 private boolean interruptTriggered; 69 70 PollSelectorImpl(SelectorProvider sp) throws IOException { 71 super(sp); 72 73 int size = pollArrayCapacity * SIZE_POLLFD; 74 this.pollArray = new AllocatedNativeObject(size, false); 75 76 try { 77 long fds = IOUtil.makePipe(false); 78 this.fd0 = (int) (fds >>> 32); 79 this.fd1 = (int) fds; 80 } catch (IOException ioe) { 81 pollArray.free(); 82 throw ioe; 83 } 84 85 // wakeup support 86 synchronized (this) { 87 setFirst(fd0, Net.POLLIN); 88 } 89 } 90 91 private void ensureOpen() { 92 if (!isOpen()) 93 throw new ClosedSelectorException(); 94 } 95 96 @Override 97 protected int doSelect(Consumer<SelectionKey> action, long timeout) 98 throws IOException 99 { 100 assert Thread.holdsLock(this); 101 102 int to = (int) Math.min(timeout, Integer.MAX_VALUE); // max poll timeout 103 boolean blocking = (to != 0); 104 boolean timedPoll = (to > 0); 105 106 processUpdateQueue(); 107 processDeregisterQueue(); 108 try { 109 begin(blocking); 110 111 int numPolled; 112 do { 113 long startTime = timedPoll ? System.nanoTime() : 0; 114 long comp = Blocker.begin(); 115 try { 116 numPolled = poll(pollArray.address(), pollArraySize, to); 117 } finally { 118 Blocker.end(comp); 119 } 120 if (numPolled == IOStatus.INTERRUPTED && timedPoll) { 121 // timed poll interrupted so need to adjust timeout 122 long adjust = System.nanoTime() - startTime; 123 to -= (int) TimeUnit.NANOSECONDS.toMillis(adjust); 124 if (to <= 0) { 125 // timeout expired so no retry 126 numPolled = 0; 127 } 128 } 129 } while (numPolled == IOStatus.INTERRUPTED); 130 assert numPolled <= pollArraySize; 131 132 } finally { 133 end(blocking); 134 } 135 136 processDeregisterQueue(); 137 return processEvents(action); 138 } 139 140 /** 141 * Process changes to the interest ops. 142 */ 143 private void processUpdateQueue() { 144 assert Thread.holdsLock(this); 145 146 synchronized (updateLock) { 147 SelectionKeyImpl ski; 148 while ((ski = updateKeys.pollFirst()) != null) { 149 int newEvents = ski.translateInterestOps(); 150 if (ski.isValid()) { 151 int index = ski.getIndex(); 152 assert index >= 0 && index < pollArraySize; 153 if (index > 0) { 154 assert pollKeys.get(index) == ski; 155 if (newEvents == 0) { 156 remove(ski); 157 } else { 158 update(ski, newEvents); 159 } 160 } else if (newEvents != 0) { 161 add(ski, newEvents); 162 } 163 } 164 } 165 } 166 } 167 168 /** 169 * Process the polled events. 170 * If the interrupt fd has been selected, drain it and clear the interrupt. 171 */ 172 private int processEvents(Consumer<SelectionKey> action) 173 throws IOException 174 { 175 assert Thread.holdsLock(this); 176 assert pollArraySize > 0 && pollArraySize == pollKeys.size(); 177 178 int numKeysUpdated = 0; 179 for (int i = 1; i < pollArraySize; i++) { 180 int rOps = getReventOps(i); 181 if (rOps != 0) { 182 SelectionKeyImpl ski = pollKeys.get(i); 183 assert ski.getFDVal() == getDescriptor(i); 184 if (ski.isValid()) { 185 numKeysUpdated += processReadyEvents(rOps, ski, action); 186 } 187 } 188 } 189 190 // check for interrupt 191 if (getReventOps(0) != 0) { 192 assert getDescriptor(0) == fd0; 193 clearInterrupt(); 194 } 195 196 return numKeysUpdated; 197 } 198 199 @Override 200 protected void implClose() throws IOException { 201 assert !isOpen(); 202 assert Thread.holdsLock(this); 203 204 // prevent further wakeup 205 synchronized (interruptLock) { 206 interruptTriggered = true; 207 } 208 209 pollArray.free(); 210 FileDispatcherImpl.closeIntFD(fd0); 211 FileDispatcherImpl.closeIntFD(fd1); 212 } 213 214 @Override 215 protected void implRegister(SelectionKeyImpl ski) { 216 assert ski.getIndex() == 0; 217 ensureOpen(); 218 } 219 220 @Override 221 protected void implDereg(SelectionKeyImpl ski) throws IOException { 222 assert !ski.isValid(); 223 assert Thread.holdsLock(this); 224 225 // remove from poll array 226 int index = ski.getIndex(); 227 if (index > 0) { 228 remove(ski); 229 } 230 } 231 232 @Override 233 public void setEventOps(SelectionKeyImpl ski) { 234 ensureOpen(); 235 synchronized (updateLock) { 236 updateKeys.addLast(ski); 237 } 238 } 239 240 @Override 241 public Selector wakeup() { 242 synchronized (interruptLock) { 243 if (!interruptTriggered) { 244 try { 245 IOUtil.write1(fd1, (byte)0); 246 } catch (IOException ioe) { 247 throw new InternalError(ioe); 248 } 249 interruptTriggered = true; 250 } 251 } 252 return this; 253 } 254 255 private void clearInterrupt() throws IOException { 256 synchronized (interruptLock) { 257 IOUtil.drain(fd0); 258 interruptTriggered = false; 259 } 260 } 261 262 /** 263 * Sets the first pollfd entry in the poll array to the given fd 264 */ 265 private void setFirst(int fd, int ops) { 266 assert pollArraySize == 0; 267 assert pollKeys.isEmpty(); 268 269 putDescriptor(0, fd); 270 putEventOps(0, ops); 271 pollArraySize = 1; 272 273 pollKeys.add(null); // dummy element 274 } 275 276 /** 277 * Adds a pollfd entry to the poll array, expanding the poll array if needed. 278 */ 279 private void add(SelectionKeyImpl ski, int ops) { 280 expandIfNeeded(); 281 282 int index = pollArraySize; 283 assert index > 0; 284 putDescriptor(index, ski.getFDVal()); 285 putEventOps(index, ops); 286 putReventOps(index, 0); 287 ski.setIndex(index); 288 pollArraySize++; 289 290 pollKeys.add(ski); 291 assert pollKeys.size() == pollArraySize; 292 } 293 294 /** 295 * Update the events of pollfd entry. 296 */ 297 private void update(SelectionKeyImpl ski, int ops) { 298 int index = ski.getIndex(); 299 assert index > 0 && index < pollArraySize; 300 assert getDescriptor(index) == ski.getFDVal(); 301 putEventOps(index, ops); 302 } 303 304 /** 305 * Removes a pollfd entry from the poll array 306 */ 307 private void remove(SelectionKeyImpl ski) { 308 int index = ski.getIndex(); 309 assert index > 0 && index < pollArraySize; 310 assert getDescriptor(index) == ski.getFDVal(); 311 312 // replace pollfd at index with the last pollfd in array 313 int lastIndex = pollArraySize - 1; 314 if (lastIndex != index) { 315 SelectionKeyImpl lastKey = pollKeys.get(lastIndex); 316 assert lastKey.getIndex() == lastIndex; 317 int lastFd = getDescriptor(lastIndex); 318 int lastOps = getEventOps(lastIndex); 319 int lastRevents = getReventOps(lastIndex); 320 assert lastKey.getFDVal() == lastFd; 321 putDescriptor(index, lastFd); 322 putEventOps(index, lastOps); 323 putReventOps(index, lastRevents); 324 pollKeys.set(index, lastKey); 325 lastKey.setIndex(index); 326 } 327 pollKeys.remove(lastIndex); 328 pollArraySize--; 329 assert pollKeys.size() == pollArraySize; 330 331 ski.setIndex(0); 332 } 333 334 /** 335 * Expand poll array if at capacity 336 */ 337 private void expandIfNeeded() { 338 if (pollArraySize == pollArrayCapacity) { 339 int oldSize = pollArrayCapacity * SIZE_POLLFD; 340 int newCapacity = pollArrayCapacity + INITIAL_CAPACITY; 341 int newSize = newCapacity * SIZE_POLLFD; 342 AllocatedNativeObject newPollArray = new AllocatedNativeObject(newSize, false); 343 Unsafe.getUnsafe().copyMemory(pollArray.address(), newPollArray.address(), oldSize); 344 pollArray.free(); 345 pollArray = newPollArray; 346 pollArrayCapacity = newCapacity; 347 } 348 } 349 350 private static final short SIZE_POLLFD = 8; 351 private static final short FD_OFFSET = 0; 352 private static final short EVENT_OFFSET = 4; 353 private static final short REVENT_OFFSET = 6; 354 355 private void putDescriptor(int i, int fd) { 356 int offset = SIZE_POLLFD * i + FD_OFFSET; 357 pollArray.putInt(offset, fd); 358 } 359 360 private int getDescriptor(int i) { 361 int offset = SIZE_POLLFD * i + FD_OFFSET; 362 return pollArray.getInt(offset); 363 } 364 365 private void putEventOps(int i, int event) { 366 int offset = SIZE_POLLFD * i + EVENT_OFFSET; 367 pollArray.putShort(offset, (short)event); 368 } 369 370 private int getEventOps(int i) { 371 int offset = SIZE_POLLFD * i + EVENT_OFFSET; 372 return pollArray.getShort(offset); 373 } 374 375 private void putReventOps(int i, int revent) { 376 int offset = SIZE_POLLFD * i + REVENT_OFFSET; 377 pollArray.putShort(offset, (short)revent); 378 } 379 380 private int getReventOps(int i) { 381 int offset = SIZE_POLLFD * i + REVENT_OFFSET; 382 return pollArray.getShort(offset); 383 } 384 385 private static native int poll(long pollAddress, int numfds, int timeout); 386 387 static { 388 IOUtil.load(); 389 } 390 }