1 /* 2 * Copyright (c) 2017, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.nio.ch; 26 27 import java.io.IOException; 28 import java.io.UncheckedIOException; 29 import java.util.Arrays; 30 import java.util.List; 31 import java.util.Map; 32 import java.util.Objects; 33 import java.util.concurrent.ConcurrentHashMap; 34 import java.util.concurrent.Executor; 35 import java.util.concurrent.Executors; 36 import java.util.concurrent.ThreadFactory; 37 import java.util.concurrent.locks.LockSupport; 38 import java.util.function.BooleanSupplier; 39 import java.util.function.Supplier; 40 import jdk.internal.access.JavaLangAccess; 41 import jdk.internal.access.SharedSecrets; 42 import jdk.internal.misc.InnocuousThread; 43 import jdk.internal.vm.annotation.Stable; 44 45 /** 46 * Polls file descriptors. Virtual threads invoke the poll method to park 47 * until a given file descriptor is ready for I/O. 48 */ 49 public abstract class Poller { 50 private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess(); 51 52 private static final PollerProvider PROVIDER = PollerProvider.provider(); 53 54 private static final Executor DEFAULT_SCHEDULER = JLA.virtualThreadDefaultScheduler(); 55 56 private static Supplier<Mode> POLLER_MODE = StableValue.supplier(Poller::pollerMode); 57 58 private static Supplier<PollerGroup> DEFAULT_POLLER_GROUP = StableValue.supplier(PollerGroup::create); 59 60 // maps scheduler to PollerGroup, custom schedulers can't be GC'ed at this time 61 private static final Map<Executor, PollerGroup> POLLER_GROUPS = new ConcurrentHashMap<>(); 62 63 // the poller or sub-poller thread 64 private @Stable Thread owner; 65 66 // maps file descriptors to parked Thread 67 private final Map<Integer, Thread> map = new ConcurrentHashMap<>(); 68 69 /** 70 * Poller mode. 71 */ 72 enum Mode { 73 /** 74 * ReadPoller and WritePoller are dedicated platform threads that block waiting 75 * for events and unpark virtual threads when file descriptors are ready for I/O. 76 */ 77 SYSTEM_THREADS, 78 79 /** 80 * ReadPoller and WritePoller threads are virtual threads that poll for events, 81 * yielding between polls and unparking virtual threads when file descriptors are 82 * ready for I/O. If there are no events then the poller threads park until there 83 * are I/O events to poll. This mode helps to integrate polling with virtual 84 * thread scheduling. The approach is similar to the default scheme in "User-level 85 * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020 86 * (https://dl.acm.org/doi/10.1145/3379483). 87 */ 88 VTHREAD_POLLERS 89 } 90 91 /** 92 * Initialize a Poller. 93 */ 94 protected Poller() { 95 } 96 97 /** 98 * Returns the poller's file descriptor, used when the read and write poller threads 99 * are virtual threads. 100 * 101 * @throws UnsupportedOperationException if not supported 102 */ 103 int fdVal() { 104 throw new UnsupportedOperationException(); 105 } 106 107 /** 108 * Register the file descriptor. The registration is "one shot", meaning it should 109 * be polled at most once. 110 */ 111 abstract void implRegister(int fdVal) throws IOException; 112 113 /** 114 * Deregister the file descriptor. 115 * @param polled true if the file descriptor has already been polled 116 */ 117 abstract void implDeregister(int fdVal, boolean polled); 118 119 /** 120 * Poll for events. The {@link #polled(int)} method is invoked for each 121 * polled file descriptor. 122 * 123 * @param timeout if positive then block for up to {@code timeout} milliseconds, 124 * if zero then don't block, if -1 then block indefinitely 125 * @return the number of file descriptors polled 126 */ 127 abstract int poll(int timeout) throws IOException; 128 129 /** 130 * Callback by the poll method when a file descriptor is polled. 131 */ 132 final void polled(int fdVal) { 133 wakeup(fdVal); 134 } 135 136 /** 137 * Parks the current thread until a file descriptor is ready for the given op. 138 * @param fdVal the file descriptor 139 * @param event POLLIN or POLLOUT 140 * @param nanos the waiting time or 0 to wait indefinitely 141 * @param supplier supplies a boolean to indicate if the enclosing object is open 142 */ 143 static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier) 144 throws IOException 145 { 146 assert nanos >= 0L; 147 PollerGroup pollerGroup = PollerGroup.groupFor(Thread.currentThread()); 148 if (event == Net.POLLIN) { 149 pollerGroup.readPoller(fdVal).poll(fdVal, nanos, supplier); 150 } else if (event == Net.POLLOUT) { 151 pollerGroup.writePoller(fdVal).poll(fdVal, nanos, supplier); 152 } else { 153 assert false; 154 } 155 } 156 157 /** 158 * Parks the current thread until a Selector's file descriptor is ready. 159 * @param fdVal the Selector's file descriptor 160 * @param nanos the waiting time or 0 to wait indefinitely 161 */ 162 static void pollSelector(int fdVal, long nanos) throws IOException { 163 assert nanos >= 0L; 164 PollerGroup pollerGroup = PollerGroup.groupFor(Thread.currentThread()); 165 Poller poller = pollerGroup.masterPoller(); 166 if (poller == null) { 167 poller = pollerGroup.readPoller(fdVal); 168 } 169 poller.poll(fdVal, nanos, () -> true); 170 } 171 172 /** 173 * Unpark the given thread so that it stops polling. 174 */ 175 static void stopPoll(Thread thread) { 176 LockSupport.unpark(thread); 177 } 178 179 /** 180 * Parks the current thread until a file descriptor is ready. 181 */ 182 private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException { 183 register(fdVal); 184 try { 185 boolean isOpen = supplier.getAsBoolean(); 186 if (isOpen) { 187 if (nanos > 0) { 188 LockSupport.parkNanos(nanos); 189 } else { 190 LockSupport.park(); 191 } 192 } 193 } finally { 194 deregister(fdVal); 195 } 196 } 197 198 /** 199 * Registers the file descriptor to be polled at most once when the file descriptor 200 * is ready for I/O. 201 */ 202 private void register(int fdVal) throws IOException { 203 Thread previous = map.put(fdVal, Thread.currentThread()); 204 assert previous == null; 205 try { 206 implRegister(fdVal); 207 } catch (Throwable t) { 208 map.remove(fdVal); 209 throw t; 210 } 211 } 212 213 /** 214 * Deregister the file descriptor so that the file descriptor is not polled. 215 */ 216 private void deregister(int fdVal) { 217 Thread previous = map.remove(fdVal); 218 boolean polled = (previous == null); 219 assert polled || previous == Thread.currentThread(); 220 implDeregister(fdVal, polled); 221 } 222 223 /** 224 * Unparks any thread that is polling the given file descriptor. 225 */ 226 private void wakeup(int fdVal) { 227 Thread t = map.remove(fdVal); 228 if (t != null) { 229 LockSupport.unpark(t); 230 } 231 } 232 233 /** 234 * Master polling loop. The {@link #polled(int)} method is invoked for each file 235 * descriptor that is polled. 236 */ 237 private void pollerLoop() { 238 owner = Thread.currentThread(); 239 try { 240 for (;;) { 241 poll(-1); 242 } 243 } catch (Exception e) { 244 e.printStackTrace(); 245 } 246 } 247 248 /** 249 * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file 250 * descriptor that is polled. 251 * 252 * The sub-poller registers its file descriptor with the master poller to park until 253 * there are events to poll. When unparked, it does non-blocking polls and parks 254 * again when there are no more events. The sub-poller yields after each poll to help 255 * with fairness and to avoid re-registering with the master poller where possible. 256 */ 257 private void subPollerLoop(Poller masterPoller) { 258 assert Thread.currentThread().isVirtual(); 259 owner = Thread.currentThread(); 260 try { 261 int polled = 0; 262 for (;;) { 263 if (polled == 0) { 264 masterPoller.poll(fdVal(), 0, () -> true); // park 265 } else { 266 Thread.yield(); 267 } 268 polled = poll(0); 269 } 270 } catch (Exception e) { 271 e.printStackTrace(); 272 } 273 } 274 275 @Override 276 public String toString() { 277 return String.format("%s [registered = %d, owner = %s]", 278 Objects.toIdentityString(this), map.size(), owner); 279 } 280 281 /** 282 * The read/write pollers. 283 */ 284 private static class PollerGroup { 285 private final Executor scheduler; 286 private final Poller[] readPollers; 287 private final Poller[] writePollers; 288 private final Poller masterPoller; 289 private final Executor executor; 290 291 PollerGroup(Executor scheduler) throws IOException { 292 Mode mode = Poller.POLLER_MODE.get(); 293 int readPollerCount, writePollerCount; 294 Poller masterPoller; 295 if (scheduler == DEFAULT_SCHEDULER) { 296 readPollerCount = pollerCount("jdk.readPollers", PROVIDER.defaultReadPollers(mode)); 297 writePollerCount = pollerCount("jdk.writePollers", PROVIDER.defaultWritePollers(mode)); 298 masterPoller = (mode == Mode.VTHREAD_POLLERS) 299 ? PROVIDER.readPoller(false) 300 : null; 301 } else { 302 readPollerCount = 1; 303 writePollerCount = 1; 304 if (mode == Mode.VTHREAD_POLLERS) { 305 masterPoller = DEFAULT_POLLER_GROUP.get().masterPoller(); 306 } else { 307 masterPoller = null; 308 } 309 } 310 311 Executor executor = null; 312 if (mode == Mode.VTHREAD_POLLERS) { 313 String namePrefix; 314 if (scheduler == DEFAULT_SCHEDULER) { 315 namePrefix = "SubPoller-"; 316 } else { 317 namePrefix = Objects.toIdentityString(scheduler) + "-SubPoller-"; 318 } 319 @SuppressWarnings("restricted") 320 ThreadFactory factory = Thread.ofVirtual() 321 .scheduler(scheduler) 322 .inheritInheritableThreadLocals(false) 323 .name(namePrefix, 0) 324 .uncaughtExceptionHandler((_, e) -> e.printStackTrace()) 325 .factory(); 326 executor = Executors.newThreadPerTaskExecutor(factory); 327 } 328 329 // read pollers (or sub-pollers) 330 Poller[] readPollers = new Poller[readPollerCount]; 331 for (int i = 0; i < readPollerCount; i++) { 332 readPollers[i] = PROVIDER.readPoller(mode == Mode.VTHREAD_POLLERS); 333 } 334 335 // write pollers (or sub-pollers) 336 Poller[] writePollers = new Poller[writePollerCount]; 337 for (int i = 0; i < writePollerCount; i++) { 338 writePollers[i] = PROVIDER.writePoller(mode == Mode.VTHREAD_POLLERS); 339 } 340 341 this.scheduler = scheduler; 342 this.masterPoller = masterPoller; 343 this.readPollers = readPollers; 344 this.writePollers = writePollers; 345 this.executor = executor; 346 } 347 348 static PollerGroup create(Executor scheduler) { 349 try { 350 return new PollerGroup(scheduler).start(); 351 } catch (IOException ioe) { 352 throw new UncheckedIOException(ioe); 353 } 354 } 355 356 static PollerGroup create() { 357 return create(DEFAULT_SCHEDULER); 358 } 359 360 /** 361 * Start poller threads. 362 */ 363 private PollerGroup start() { 364 if (POLLER_MODE.get() == Mode.VTHREAD_POLLERS) { 365 if (scheduler == DEFAULT_SCHEDULER) { 366 startPlatformThread("Master-Poller", masterPoller::pollerLoop); 367 } 368 Arrays.stream(readPollers).forEach(p -> { 369 executor.execute(() -> p.subPollerLoop(masterPoller)); 370 }); 371 Arrays.stream(writePollers).forEach(p -> { 372 executor.execute(() -> p.subPollerLoop(masterPoller)); 373 }); 374 } else { 375 // Mode.SYSTEM_THREADS 376 Arrays.stream(readPollers).forEach(p -> { 377 startPlatformThread("Read-Poller", p::pollerLoop); 378 }); 379 Arrays.stream(writePollers).forEach(p -> { 380 startPlatformThread("Write-Poller", p::pollerLoop); 381 }); 382 } 383 return this; 384 } 385 386 Poller masterPoller() { 387 return masterPoller; 388 } 389 390 List<Poller> readPollers() { 391 return List.of(readPollers); 392 } 393 394 List<Poller> writePollers() { 395 return List.of(writePollers); 396 } 397 398 /** 399 * Returns the read poller for the given file descriptor. 400 */ 401 Poller readPoller(int fdVal) { 402 int index = PROVIDER.fdValToIndex(fdVal, readPollers.length); 403 return readPollers[index]; 404 } 405 406 /** 407 * Returns the write poller for the given file descriptor. 408 */ 409 Poller writePoller(int fdVal) { 410 int index = PROVIDER.fdValToIndex(fdVal, writePollers.length); 411 return writePollers[index]; 412 } 413 414 /** 415 * Reads the given property name to get the poller count. If the property is 416 * set then the value must be a power of 2. Returns 1 if the property is not 417 * set. 418 * @throws IllegalArgumentException if the property is set to a value that 419 * is not a power of 2. 420 */ 421 private static int pollerCount(String propName, int defaultCount) { 422 String s = System.getProperty(propName); 423 int count = (s != null) ? Integer.parseInt(s) : defaultCount; 424 425 // check power of 2 426 if (count != Integer.highestOneBit(count)) { 427 String msg = propName + " is set to a value that is not a power of 2"; 428 throw new IllegalArgumentException(msg); 429 } 430 return count; 431 } 432 433 /** 434 * Starts a platform thread to run the given task. 435 */ 436 private void startPlatformThread(String name, Runnable task) { 437 try { 438 Thread thread = InnocuousThread.newSystemThread(name, task); 439 thread.setDaemon(true); 440 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace()); 441 thread.start(); 442 } catch (Exception e) { 443 throw new InternalError(e); 444 } 445 } 446 447 /** 448 * Returns the PollerGroup that the given thread uses to poll file descriptors. 449 */ 450 static PollerGroup groupFor(Thread thread) { 451 if (POLLER_MODE.get() == Mode.SYSTEM_THREADS) { 452 return DEFAULT_POLLER_GROUP.get(); 453 } 454 Executor scheduler; 455 if (thread.isVirtual()) { 456 scheduler = JLA.virtualThreadScheduler(thread); 457 } else { 458 scheduler = DEFAULT_SCHEDULER; 459 } 460 return POLLER_GROUPS.computeIfAbsent(scheduler, _ -> PollerGroup.create(scheduler)); 461 } 462 } 463 464 /** 465 * Returns the poller mode. 466 */ 467 private static Mode pollerMode() { 468 String s = System.getProperty("jdk.pollerMode"); 469 if (s != null) { 470 if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) { 471 return Mode.SYSTEM_THREADS; 472 } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) { 473 return Mode.VTHREAD_POLLERS; 474 } else { 475 throw new RuntimeException("Can't parse '" + s + "' as polling mode"); 476 } 477 } else { 478 return PROVIDER.defaultPollerMode(); 479 } 480 } 481 482 /** 483 * Return the master poller or null if there is no master poller. 484 */ 485 public static Poller masterPoller() { 486 return DEFAULT_POLLER_GROUP.get().masterPoller(); 487 } 488 489 /** 490 * Return the list of read pollers. 491 */ 492 public static List<Poller> readPollers() { 493 return DEFAULT_POLLER_GROUP.get().readPollers(); 494 } 495 496 /** 497 * Return the list of write pollers. 498 */ 499 public static List<Poller> writePollers() { 500 return DEFAULT_POLLER_GROUP.get().writePollers(); 501 } 502 }