1 /* 2 * Copyright (c) 2017, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.nio.ch; 26 27 import java.io.IOException; 28 import java.util.Arrays; 29 import java.util.List; 30 import java.util.Map; 31 import java.util.Objects; 32 import java.util.concurrent.ConcurrentHashMap; 33 import java.util.concurrent.Executor; 34 import java.util.concurrent.Executors; 35 import java.util.concurrent.ThreadFactory; 36 import java.util.concurrent.locks.LockSupport; 37 import java.util.function.BooleanSupplier; 38 import jdk.internal.misc.InnocuousThread; 39 import sun.security.action.GetPropertyAction; 40 41 /** 42 * Polls file descriptors. Virtual threads invoke the poll method to park 43 * until a given file descriptor is ready for I/O. 44 */ 45 public abstract class Poller { 46 private static final Pollers POLLERS; 47 static { 48 try { 49 var pollers = new Pollers(); 50 pollers.start(); 51 POLLERS = pollers; 52 } catch (IOException ioe) { 53 throw new ExceptionInInitializerError(ioe); 54 } 55 } 56 57 // maps file descriptors to parked Thread 58 private final Map<Integer, Thread> map = new ConcurrentHashMap<>(); 59 60 /** 61 * Poller mode. 62 */ 63 enum Mode { 64 /** 65 * ReadPoller and WritePoller are dedicated platform threads that block waiting 66 * for events and unpark virtual threads when file descriptors are ready for I/O. 67 */ 68 SYSTEM_THREADS, 69 70 /** 71 * ReadPoller and WritePoller threads are virtual threads that poll for events, 72 * yielding between polls and unparking virtual threads when file descriptors are 73 * ready for I/O. If there are no events then the poller threads park until there 74 * are I/O events to poll. This mode helps to integrate polling with virtual 75 * thread scheduling. The approach is similar to the default scheme in "User-level 76 * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020 77 * (https://dl.acm.org/doi/10.1145/3379483). 78 */ 79 VTHREAD_POLLERS 80 } 81 82 /** 83 * Initialize a Poller. 84 */ 85 protected Poller() { 86 } 87 88 /** 89 * Returns the poller's file descriptor, used when the read and write poller threads 90 * are virtual threads. 91 * 92 * @throws UnsupportedOperationException if not supported 93 */ 94 int fdVal() { 95 throw new UnsupportedOperationException(); 96 } 97 98 /** 99 * Register the file descriptor. The registration is "one shot", meaning it should 100 * be polled at most once. 101 */ 102 abstract void implRegister(int fdVal) throws IOException; 103 104 /** 105 * Deregister the file descriptor. 106 * @param polled true if the file descriptor has already been polled 107 */ 108 abstract void implDeregister(int fdVal, boolean polled); 109 110 /** 111 * Poll for events. The {@link #polled(int)} method is invoked for each 112 * polled file descriptor. 113 * 114 * @param timeout if positive then block for up to {@code timeout} milliseconds, 115 * if zero then don't block, if -1 then block indefinitely 116 * @return the number of file descriptors polled 117 */ 118 abstract int poll(int timeout) throws IOException; 119 120 /** 121 * Callback by the poll method when a file descriptor is polled. 122 */ 123 final void polled(int fdVal) { 124 wakeup(fdVal); 125 } 126 127 /** 128 * Parks the current thread until a file descriptor is ready for the given op. 129 * @param fdVal the file descriptor 130 * @param event POLLIN or POLLOUT 131 * @param nanos the waiting time or 0 to wait indefinitely 132 * @param supplier supplies a boolean to indicate if the enclosing object is open 133 */ 134 static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier) 135 throws IOException 136 { 137 assert nanos >= 0L; 138 if (event == Net.POLLIN) { 139 POLLERS.readPoller(fdVal).poll(fdVal, nanos, supplier); 140 } else if (event == Net.POLLOUT) { 141 POLLERS.writePoller(fdVal).poll(fdVal, nanos, supplier); 142 } else { 143 assert false; 144 } 145 } 146 147 /** 148 * Parks the current thread until a Selector's file descriptor is ready. 149 * @param fdVal the Selector's file descriptor 150 * @param nanos the waiting time or 0 to wait indefinitely 151 */ 152 static void pollSelector(int fdVal, long nanos) throws IOException { 153 assert nanos >= 0L; 154 Poller poller = POLLERS.masterPoller(); 155 if (poller == null) { 156 poller = POLLERS.readPoller(fdVal); 157 } 158 poller.poll(fdVal, nanos, () -> true); 159 } 160 161 /** 162 * If there is a thread polling the given file descriptor for the given event then 163 * the thread is unparked. 164 */ 165 static void stopPoll(int fdVal, int event) { 166 if (event == Net.POLLIN) { 167 POLLERS.readPoller(fdVal).wakeup(fdVal); 168 } else if (event == Net.POLLOUT) { 169 POLLERS.writePoller(fdVal).wakeup(fdVal); 170 } else { 171 throw new IllegalArgumentException(); 172 } 173 } 174 175 /** 176 * If there are any threads polling the given file descriptor then they are unparked. 177 */ 178 static void stopPoll(int fdVal) { 179 stopPoll(fdVal, Net.POLLIN); 180 stopPoll(fdVal, Net.POLLOUT); 181 } 182 183 /** 184 * Parks the current thread until a file descriptor is ready. 185 */ 186 private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException { 187 register(fdVal); 188 try { 189 boolean isOpen = supplier.getAsBoolean(); 190 if (isOpen) { 191 if (nanos > 0) { 192 LockSupport.parkNanos(nanos); 193 } else { 194 LockSupport.park(); 195 } 196 } 197 } finally { 198 deregister(fdVal); 199 } 200 } 201 202 /** 203 * Registers the file descriptor to be polled at most once when the file descriptor 204 * is ready for I/O. 205 */ 206 private void register(int fdVal) throws IOException { 207 Thread previous = map.put(fdVal, Thread.currentThread()); 208 assert previous == null; 209 try { 210 implRegister(fdVal); 211 } catch (Throwable t) { 212 map.remove(fdVal); 213 throw t; 214 } 215 } 216 217 /** 218 * Deregister the file descriptor so that the file descriptor is not polled. 219 */ 220 private void deregister(int fdVal) { 221 Thread previous = map.remove(fdVal); 222 boolean polled = (previous == null); 223 assert polled || previous == Thread.currentThread(); 224 implDeregister(fdVal, polled); 225 } 226 227 /** 228 * Unparks any thread that is polling the given file descriptor. 229 */ 230 private void wakeup(int fdVal) { 231 Thread t = map.remove(fdVal); 232 if (t != null) { 233 LockSupport.unpark(t); 234 } 235 } 236 237 /** 238 * Master polling loop. The {@link #polled(int)} method is invoked for each file 239 * descriptor that is polled. 240 */ 241 private void pollerLoop() { 242 try { 243 for (;;) { 244 poll(-1); 245 } 246 } catch (Exception e) { 247 e.printStackTrace(); 248 } 249 } 250 251 /** 252 * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file 253 * descriptor that is polled. 254 * 255 * The sub-poller registers its file descriptor with the master poller to park until 256 * there are events to poll. When unparked, it does non-blocking polls and parks 257 * again when there are no more events. The sub-poller yields after each poll to help 258 * with fairness and to avoid re-registering with the master poller where possible. 259 */ 260 private void subPollerLoop(Poller masterPoller) { 261 assert Thread.currentThread().isVirtual(); 262 try { 263 int polled = 0; 264 for (;;) { 265 if (polled == 0) { 266 masterPoller.poll(fdVal(), 0, () -> true); // park 267 } else { 268 Thread.yield(); 269 } 270 polled = poll(0); 271 } 272 } catch (Exception e) { 273 e.printStackTrace(); 274 } 275 } 276 277 /** 278 * Returns the number I/O operations currently registered with this poller. 279 */ 280 public int registered() { 281 return map.size(); 282 } 283 284 @Override 285 public String toString() { 286 return Objects.toIdentityString(this) + " [registered = " + registered() + "]"; 287 } 288 289 /** 290 * The Pollers used for read and write events. 291 */ 292 private static class Pollers { 293 private final PollerProvider provider; 294 private final Poller.Mode pollerMode; 295 private final Poller masterPoller; 296 private final Poller[] readPollers; 297 private final Poller[] writePollers; 298 299 // used by start method to executor is kept alive 300 private Executor executor; 301 302 /** 303 * Creates the Poller instances based on configuration. 304 */ 305 Pollers() throws IOException { 306 PollerProvider provider = PollerProvider.provider(); 307 Poller.Mode mode; 308 String s = GetPropertyAction.privilegedGetProperty("jdk.pollerMode"); 309 if (s != null) { 310 if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) { 311 mode = Mode.SYSTEM_THREADS; 312 } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) { 313 mode = Mode.VTHREAD_POLLERS; 314 } else { 315 throw new RuntimeException("Can't parse '" + s + "' as polling mode"); 316 } 317 } else { 318 mode = provider.defaultPollerMode(); 319 } 320 321 // vthread poller mode needs a master poller 322 Poller masterPoller = (mode == Mode.VTHREAD_POLLERS) 323 ? provider.readPoller(false) 324 : null; 325 326 // read pollers (or sub-pollers) 327 int readPollerCount = pollerCount("jdk.readPollers", provider.defaultReadPollers(mode)); 328 Poller[] readPollers = new Poller[readPollerCount]; 329 for (int i = 0; i < readPollerCount; i++) { 330 readPollers[i] = provider.readPoller(mode == Mode.VTHREAD_POLLERS); 331 } 332 333 // write pollers (or sub-pollers) 334 int writePollerCount = pollerCount("jdk.writePollers", provider.defaultWritePollers(mode)); 335 Poller[] writePollers = new Poller[writePollerCount]; 336 for (int i = 0; i < writePollerCount; i++) { 337 writePollers[i] = provider.writePoller(mode == Mode.VTHREAD_POLLERS); 338 } 339 340 this.provider = provider; 341 this.pollerMode = mode; 342 this.masterPoller = masterPoller; 343 this.readPollers = readPollers; 344 this.writePollers = writePollers; 345 } 346 347 /** 348 * Starts the Poller threads. 349 */ 350 void start() { 351 if (pollerMode == Mode.VTHREAD_POLLERS) { 352 startPlatformThread("MasterPoller", masterPoller::pollerLoop); 353 ThreadFactory factory = Thread.ofVirtual() 354 .inheritInheritableThreadLocals(false) 355 .name("SubPoller-", 0) 356 .uncaughtExceptionHandler((t, e) -> e.printStackTrace()) 357 .factory(); 358 executor = Executors.newThreadPerTaskExecutor(factory); 359 Arrays.stream(readPollers).forEach(p -> { 360 executor.execute(() -> p.subPollerLoop(masterPoller)); 361 }); 362 Arrays.stream(writePollers).forEach(p -> { 363 executor.execute(() -> p.subPollerLoop(masterPoller)); 364 }); 365 } else { 366 Arrays.stream(readPollers).forEach(p -> { 367 startPlatformThread("Read-Poller", p::pollerLoop); 368 }); 369 Arrays.stream(writePollers).forEach(p -> { 370 startPlatformThread("Write-Poller", p::pollerLoop); 371 }); 372 } 373 } 374 375 /** 376 * Returns the master poller, or null if there is no master poller. 377 */ 378 Poller masterPoller() { 379 return masterPoller; 380 } 381 382 /** 383 * Returns the read poller for the given file descriptor. 384 */ 385 Poller readPoller(int fdVal) { 386 int index = provider.fdValToIndex(fdVal, readPollers.length); 387 return readPollers[index]; 388 } 389 390 /** 391 * Returns the write poller for the given file descriptor. 392 */ 393 Poller writePoller(int fdVal) { 394 int index = provider.fdValToIndex(fdVal, writePollers.length); 395 return writePollers[index]; 396 } 397 398 /** 399 * Return the list of read pollers. 400 */ 401 List<Poller> readPollers() { 402 return List.of(readPollers); 403 } 404 405 /** 406 * Return the list of write pollers. 407 */ 408 List<Poller> writePollers() { 409 return List.of(writePollers); 410 } 411 412 413 /** 414 * Reads the given property name to get the poller count. If the property is 415 * set then the value must be a power of 2. Returns 1 if the property is not 416 * set. 417 * @throws IllegalArgumentException if the property is set to a value that 418 * is not a power of 2. 419 */ 420 private static int pollerCount(String propName, int defaultCount) { 421 String s = GetPropertyAction.privilegedGetProperty(propName); 422 int count = (s != null) ? Integer.parseInt(s) : defaultCount; 423 424 // check power of 2 425 if (count != Integer.highestOneBit(count)) { 426 String msg = propName + " is set to a value that is not a power of 2"; 427 throw new IllegalArgumentException(msg); 428 } 429 return count; 430 } 431 432 /** 433 * Starts a platform thread to run the given task. 434 */ 435 private void startPlatformThread(String name, Runnable task) { 436 try { 437 Thread thread = InnocuousThread.newSystemThread(name, task); 438 thread.setDaemon(true); 439 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace()); 440 thread.start(); 441 } catch (Exception e) { 442 throw new InternalError(e); 443 } 444 } 445 } 446 447 /** 448 * Return the master poller or null if there is no master poller. 449 */ 450 public static Poller masterPoller() { 451 return POLLERS.masterPoller(); 452 } 453 454 /** 455 * Return the list of read pollers. 456 */ 457 public static List<Poller> readPollers() { 458 return POLLERS.readPollers(); 459 } 460 461 /** 462 * Return the list of write pollers. 463 */ 464 public static List<Poller> writePollers() { 465 return POLLERS.writePollers(); 466 } 467 }