1 /* 2 * Copyright (c) 2017, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.nio.ch; 26 27 import java.io.IOException; 28 import java.util.Arrays; 29 import java.util.List; 30 import java.util.Map; 31 import java.util.Objects; 32 import java.util.concurrent.ConcurrentHashMap; 33 import java.util.concurrent.Executor; 34 import java.util.concurrent.Executors; 35 import java.util.concurrent.ThreadFactory; 36 import java.util.concurrent.locks.LockSupport; 37 import java.util.function.BooleanSupplier; 38 import jdk.internal.misc.InnocuousThread; 39 import jdk.internal.vm.annotation.Stable; 40 41 /** 42 * Polls file descriptors. Virtual threads invoke the poll method to park 43 * until a given file descriptor is ready for I/O. 44 */ 45 public abstract class Poller { 46 private static final Pollers POLLERS; 47 static { 48 try { 49 var pollers = new Pollers(); 50 pollers.start(); 51 POLLERS = pollers; 52 } catch (IOException ioe) { 53 throw new ExceptionInInitializerError(ioe); 54 } 55 } 56 57 // the poller or sub-poller thread 58 private @Stable Thread owner; 59 60 // maps file descriptors to parked Thread 61 private final Map<Integer, Thread> map = new ConcurrentHashMap<>(); 62 63 /** 64 * Poller mode. 65 */ 66 enum Mode { 67 /** 68 * ReadPoller and WritePoller are dedicated platform threads that block waiting 69 * for events and unpark virtual threads when file descriptors are ready for I/O. 70 */ 71 SYSTEM_THREADS, 72 73 /** 74 * ReadPoller and WritePoller threads are virtual threads that poll for events, 75 * yielding between polls and unparking virtual threads when file descriptors are 76 * ready for I/O. If there are no events then the poller threads park until there 77 * are I/O events to poll. This mode helps to integrate polling with virtual 78 * thread scheduling. The approach is similar to the default scheme in "User-level 79 * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020 80 * (https://dl.acm.org/doi/10.1145/3379483). 81 */ 82 VTHREAD_POLLERS 83 } 84 85 /** 86 * Initialize a Poller. 87 */ 88 protected Poller() { 89 } 90 91 /** 92 * Returns the poller's file descriptor, used when the read and write poller threads 93 * are virtual threads. 94 * 95 * @throws UnsupportedOperationException if not supported 96 */ 97 int fdVal() { 98 throw new UnsupportedOperationException(); 99 } 100 101 /** 102 * Register the file descriptor. The registration is "one shot", meaning it should 103 * be polled at most once. 104 */ 105 abstract void implRegister(int fdVal) throws IOException; 106 107 /** 108 * Deregister the file descriptor. 109 * @param polled true if the file descriptor has already been polled 110 */ 111 abstract void implDeregister(int fdVal, boolean polled); 112 113 /** 114 * Poll for events. The {@link #polled(int)} method is invoked for each 115 * polled file descriptor. 116 * 117 * @param timeout if positive then block for up to {@code timeout} milliseconds, 118 * if zero then don't block, if -1 then block indefinitely 119 * @return the number of file descriptors polled 120 */ 121 abstract int poll(int timeout) throws IOException; 122 123 /** 124 * Callback by the poll method when a file descriptor is polled. 125 */ 126 final void polled(int fdVal) { 127 wakeup(fdVal); 128 } 129 130 /** 131 * Parks the current thread until a file descriptor is ready for the given op. 132 * @param fdVal the file descriptor 133 * @param event POLLIN or POLLOUT 134 * @param nanos the waiting time or 0 to wait indefinitely 135 * @param supplier supplies a boolean to indicate if the enclosing object is open 136 */ 137 static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier) 138 throws IOException 139 { 140 assert nanos >= 0L; 141 if (event == Net.POLLIN) { 142 POLLERS.readPoller(fdVal).poll(fdVal, nanos, supplier); 143 } else if (event == Net.POLLOUT) { 144 POLLERS.writePoller(fdVal).poll(fdVal, nanos, supplier); 145 } else { 146 assert false; 147 } 148 } 149 150 /** 151 * Parks the current thread until a Selector's file descriptor is ready. 152 * @param fdVal the Selector's file descriptor 153 * @param nanos the waiting time or 0 to wait indefinitely 154 */ 155 static void pollSelector(int fdVal, long nanos) throws IOException { 156 assert nanos >= 0L; 157 Poller poller = POLLERS.masterPoller(); 158 if (poller == null) { 159 poller = POLLERS.readPoller(fdVal); 160 } 161 poller.poll(fdVal, nanos, () -> true); 162 } 163 164 /** 165 * If there is a thread polling the given file descriptor for the given event then 166 * the thread is unparked. 167 */ 168 static void stopPoll(int fdVal, int event) { 169 if (event == Net.POLLIN) { 170 POLLERS.readPoller(fdVal).wakeup(fdVal); 171 } else if (event == Net.POLLOUT) { 172 POLLERS.writePoller(fdVal).wakeup(fdVal); 173 } else { 174 throw new IllegalArgumentException(); 175 } 176 } 177 178 /** 179 * If there are any threads polling the given file descriptor then they are unparked. 180 */ 181 static void stopPoll(int fdVal) { 182 stopPoll(fdVal, Net.POLLIN); 183 stopPoll(fdVal, Net.POLLOUT); 184 } 185 186 /** 187 * Parks the current thread until a file descriptor is ready. 188 */ 189 private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException { 190 register(fdVal); 191 try { 192 boolean isOpen = supplier.getAsBoolean(); 193 if (isOpen) { 194 if (nanos > 0) { 195 LockSupport.parkNanos(nanos); 196 } else { 197 LockSupport.park(); 198 } 199 } 200 } finally { 201 deregister(fdVal); 202 } 203 } 204 205 /** 206 * Registers the file descriptor to be polled at most once when the file descriptor 207 * is ready for I/O. 208 */ 209 private void register(int fdVal) throws IOException { 210 Thread previous = map.put(fdVal, Thread.currentThread()); 211 assert previous == null; 212 try { 213 implRegister(fdVal); 214 } catch (Throwable t) { 215 map.remove(fdVal); 216 throw t; 217 } 218 } 219 220 /** 221 * Deregister the file descriptor so that the file descriptor is not polled. 222 */ 223 private void deregister(int fdVal) { 224 Thread previous = map.remove(fdVal); 225 boolean polled = (previous == null); 226 assert polled || previous == Thread.currentThread(); 227 implDeregister(fdVal, polled); 228 } 229 230 /** 231 * Unparks any thread that is polling the given file descriptor. 232 */ 233 private void wakeup(int fdVal) { 234 Thread t = map.remove(fdVal); 235 if (t != null) { 236 LockSupport.unpark(t); 237 } 238 } 239 240 /** 241 * Master polling loop. The {@link #polled(int)} method is invoked for each file 242 * descriptor that is polled. 243 */ 244 private void pollerLoop() { 245 owner = Thread.currentThread(); 246 try { 247 for (;;) { 248 poll(-1); 249 } 250 } catch (Exception e) { 251 e.printStackTrace(); 252 } 253 } 254 255 /** 256 * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file 257 * descriptor that is polled. 258 * 259 * The sub-poller registers its file descriptor with the master poller to park until 260 * there are events to poll. When unparked, it does non-blocking polls and parks 261 * again when there are no more events. The sub-poller yields after each poll to help 262 * with fairness and to avoid re-registering with the master poller where possible. 263 */ 264 private void subPollerLoop(Poller masterPoller) { 265 assert Thread.currentThread().isVirtual(); 266 owner = Thread.currentThread(); 267 try { 268 int polled = 0; 269 for (;;) { 270 if (polled == 0) { 271 masterPoller.poll(fdVal(), 0, () -> true); // park 272 } else { 273 Thread.yield(); 274 } 275 polled = poll(0); 276 } 277 } catch (Exception e) { 278 e.printStackTrace(); 279 } 280 } 281 282 /** 283 * Returns the number I/O operations currently registered with this poller. 284 */ 285 public int registered() { 286 return map.size(); 287 } 288 289 @Override 290 public String toString() { 291 return String.format("%s [registered = %d, owner = %s]", 292 Objects.toIdentityString(this), registered(), owner); 293 } 294 295 /** 296 * The Pollers used for read and write events. 297 */ 298 private static class Pollers { 299 private final PollerProvider provider; 300 private final Poller.Mode pollerMode; 301 private final Poller masterPoller; 302 private final Poller[] readPollers; 303 private final Poller[] writePollers; 304 305 // used by start method to executor is kept alive 306 private Executor executor; 307 308 /** 309 * Creates the Poller instances based on configuration. 310 */ 311 Pollers() throws IOException { 312 PollerProvider provider = PollerProvider.provider(); 313 Poller.Mode mode; 314 String s = System.getProperty("jdk.pollerMode"); 315 if (s != null) { 316 if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) { 317 mode = Mode.SYSTEM_THREADS; 318 } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) { 319 mode = Mode.VTHREAD_POLLERS; 320 } else { 321 throw new RuntimeException("Can't parse '" + s + "' as polling mode"); 322 } 323 } else { 324 mode = provider.defaultPollerMode(); 325 } 326 327 // vthread poller mode needs a master poller 328 Poller masterPoller = (mode == Mode.VTHREAD_POLLERS) 329 ? provider.readPoller(false) 330 : null; 331 332 // read pollers (or sub-pollers) 333 int readPollerCount = pollerCount("jdk.readPollers", provider.defaultReadPollers(mode)); 334 Poller[] readPollers = new Poller[readPollerCount]; 335 for (int i = 0; i < readPollerCount; i++) { 336 readPollers[i] = provider.readPoller(mode == Mode.VTHREAD_POLLERS); 337 } 338 339 // write pollers (or sub-pollers) 340 int writePollerCount = pollerCount("jdk.writePollers", provider.defaultWritePollers(mode)); 341 Poller[] writePollers = new Poller[writePollerCount]; 342 for (int i = 0; i < writePollerCount; i++) { 343 writePollers[i] = provider.writePoller(mode == Mode.VTHREAD_POLLERS); 344 } 345 346 this.provider = provider; 347 this.pollerMode = mode; 348 this.masterPoller = masterPoller; 349 this.readPollers = readPollers; 350 this.writePollers = writePollers; 351 } 352 353 /** 354 * Starts the Poller threads. 355 */ 356 void start() { 357 if (pollerMode == Mode.VTHREAD_POLLERS) { 358 startPlatformThread("MasterPoller", masterPoller::pollerLoop); 359 ThreadFactory factory = Thread.ofVirtual() 360 .inheritInheritableThreadLocals(false) 361 .name("SubPoller-", 0) 362 .uncaughtExceptionHandler((t, e) -> e.printStackTrace()) 363 .factory(); 364 executor = Executors.newThreadPerTaskExecutor(factory); 365 Arrays.stream(readPollers).forEach(p -> { 366 executor.execute(() -> p.subPollerLoop(masterPoller)); 367 }); 368 Arrays.stream(writePollers).forEach(p -> { 369 executor.execute(() -> p.subPollerLoop(masterPoller)); 370 }); 371 } else { 372 Arrays.stream(readPollers).forEach(p -> { 373 startPlatformThread("Read-Poller", p::pollerLoop); 374 }); 375 Arrays.stream(writePollers).forEach(p -> { 376 startPlatformThread("Write-Poller", p::pollerLoop); 377 }); 378 } 379 } 380 381 /** 382 * Returns the master poller, or null if there is no master poller. 383 */ 384 Poller masterPoller() { 385 return masterPoller; 386 } 387 388 /** 389 * Returns the read poller for the given file descriptor. 390 */ 391 Poller readPoller(int fdVal) { 392 int index = provider.fdValToIndex(fdVal, readPollers.length); 393 return readPollers[index]; 394 } 395 396 /** 397 * Returns the write poller for the given file descriptor. 398 */ 399 Poller writePoller(int fdVal) { 400 int index = provider.fdValToIndex(fdVal, writePollers.length); 401 return writePollers[index]; 402 } 403 404 /** 405 * Return the list of read pollers. 406 */ 407 List<Poller> readPollers() { 408 return List.of(readPollers); 409 } 410 411 /** 412 * Return the list of write pollers. 413 */ 414 List<Poller> writePollers() { 415 return List.of(writePollers); 416 } 417 418 419 /** 420 * Reads the given property name to get the poller count. If the property is 421 * set then the value must be a power of 2. Returns 1 if the property is not 422 * set. 423 * @throws IllegalArgumentException if the property is set to a value that 424 * is not a power of 2. 425 */ 426 private static int pollerCount(String propName, int defaultCount) { 427 String s = System.getProperty(propName); 428 int count = (s != null) ? Integer.parseInt(s) : defaultCount; 429 430 // check power of 2 431 if (count != Integer.highestOneBit(count)) { 432 String msg = propName + " is set to a value that is not a power of 2"; 433 throw new IllegalArgumentException(msg); 434 } 435 return count; 436 } 437 438 /** 439 * Starts a platform thread to run the given task. 440 */ 441 private void startPlatformThread(String name, Runnable task) { 442 try { 443 Thread thread = InnocuousThread.newSystemThread(name, task); 444 thread.setDaemon(true); 445 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace()); 446 thread.start(); 447 } catch (Exception e) { 448 throw new InternalError(e); 449 } 450 } 451 } 452 453 /** 454 * Return the master poller or null if there is no master poller. 455 */ 456 public static Poller masterPoller() { 457 return POLLERS.masterPoller(); 458 } 459 460 /** 461 * Return the list of read pollers. 462 */ 463 public static List<Poller> readPollers() { 464 return POLLERS.readPollers(); 465 } 466 467 /** 468 * Return the list of write pollers. 469 */ 470 public static List<Poller> writePollers() { 471 return POLLERS.writePollers(); 472 } 473 }