1 /* 2 * Copyright (c) 2017, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.nio.ch; 26 27 import java.io.IOException; 28 import java.io.UncheckedIOException; 29 import java.util.Arrays; 30 import java.util.List; 31 import java.util.Map; 32 import java.util.Objects; 33 import java.util.concurrent.ConcurrentHashMap; 34 import java.util.concurrent.Executor; 35 import java.util.concurrent.Executors; 36 import java.util.concurrent.ThreadFactory; 37 import java.util.concurrent.locks.LockSupport; 38 import java.util.function.BooleanSupplier; 39 import java.util.function.Supplier; 40 import jdk.internal.access.JavaLangAccess; 41 import jdk.internal.access.SharedSecrets; 42 import jdk.internal.misc.InnocuousThread; 43 import jdk.internal.vm.annotation.Stable; 44 45 /** 46 * Polls file descriptors. Virtual threads invoke the poll method to park 47 * until a given file descriptor is ready for I/O. 48 */ 49 public abstract class Poller { 50 private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess(); 51 52 private static final PollerProvider PROVIDER = PollerProvider.provider(); 53 54 private static final Mode POLLER_MODE = pollerMode(); 55 56 private static final Executor DEFAULT_SCHEDULER = JLA.defaultVirtualThreadScheduler(); 57 58 // poller group for default scheduler 59 private static final Supplier<PollerGroup> DEFAULT_POLLER_GROUP = StableValue.supplier(PollerGroup::create); 60 61 // maps scheduler to PollerGroup, custom schedulers can't be GC'ed at this time 62 private static final Map<Executor, PollerGroup> POLLER_GROUPS = new ConcurrentHashMap<>(); 63 64 // the poller or sub-poller thread 65 private @Stable Thread owner; 66 67 // maps file descriptors to parked Thread 68 private final Map<Integer, Thread> map = new ConcurrentHashMap<>(); 69 70 /** 71 * Poller mode. 72 */ 73 enum Mode { 74 /** 75 * ReadPoller and WritePoller are dedicated platform threads that block waiting 76 * for events and unpark virtual threads when file descriptors are ready for I/O. 77 */ 78 SYSTEM_THREADS, 79 80 /** 81 * ReadPoller and WritePoller threads are virtual threads that poll for events, 82 * yielding between polls and unparking virtual threads when file descriptors are 83 * ready for I/O. If there are no events then the poller threads park until there 84 * are I/O events to poll. This mode helps to integrate polling with virtual 85 * thread scheduling. The approach is similar to the default scheme in "User-level 86 * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020 87 * (https://dl.acm.org/doi/10.1145/3379483). 88 */ 89 VTHREAD_POLLERS 90 } 91 92 /** 93 * Initialize a Poller. 94 */ 95 protected Poller() { 96 } 97 98 /** 99 * Closes the poller and release resources. This method can only be used to cleanup 100 * when creating a poller group fails. 101 */ 102 abstract void close(); 103 104 /** 105 * Returns the poller's file descriptor, used when the read and write poller threads 106 * are virtual threads. 107 * 108 * @throws UnsupportedOperationException if not supported 109 */ 110 int fdVal() { 111 throw new UnsupportedOperationException(); 112 } 113 114 /** 115 * Register the file descriptor. The registration is "one shot", meaning it should 116 * be polled at most once. 117 */ 118 abstract void implRegister(int fdVal) throws IOException; 119 120 /** 121 * Deregister the file descriptor. 122 * @param polled true if the file descriptor has already been polled 123 */ 124 abstract void implDeregister(int fdVal, boolean polled); 125 126 /** 127 * Poll for events. The {@link #polled(int)} method is invoked for each 128 * polled file descriptor. 129 * 130 * @param timeout if positive then block for up to {@code timeout} milliseconds, 131 * if zero then don't block, if -1 then block indefinitely 132 * @return the number of file descriptors polled 133 */ 134 abstract int poll(int timeout) throws IOException; 135 136 /** 137 * Callback by the poll method when a file descriptor is polled. 138 */ 139 final void polled(int fdVal) { 140 wakeup(fdVal); 141 } 142 143 /** 144 * Parks the current thread until a file descriptor is ready for the given op. 145 * @param fdVal the file descriptor 146 * @param event POLLIN or POLLOUT 147 * @param nanos the waiting time or 0 to wait indefinitely 148 * @param supplier supplies a boolean to indicate if the enclosing object is open 149 */ 150 static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier) 151 throws IOException 152 { 153 assert nanos >= 0L; 154 PollerGroup pollerGroup = pollerGroup(Thread.currentThread()); 155 if (event == Net.POLLIN) { 156 pollerGroup.readPoller(fdVal).poll(fdVal, nanos, supplier); 157 } else if (event == Net.POLLOUT) { 158 pollerGroup.writePoller(fdVal).poll(fdVal, nanos, supplier); 159 } else { 160 assert false; 161 } 162 } 163 164 /** 165 * Parks the current thread until a Selector's file descriptor is ready. 166 * @param fdVal the Selector's file descriptor 167 * @param nanos the waiting time or 0 to wait indefinitely 168 */ 169 static void pollSelector(int fdVal, long nanos) throws IOException { 170 assert nanos >= 0L; 171 PollerGroup pollerGroup = pollerGroup(Thread.currentThread()); 172 Poller poller = pollerGroup.masterPoller(); 173 if (poller == null) { 174 poller = pollerGroup.readPoller(fdVal); 175 } 176 poller.poll(fdVal, nanos, () -> true); 177 } 178 179 /** 180 * Unpark the given thread so that it stops polling. 181 */ 182 static void stopPoll(Thread thread) { 183 LockSupport.unpark(thread); 184 } 185 186 /** 187 * Parks the current thread until a file descriptor is ready. 188 */ 189 private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException { 190 register(fdVal); 191 try { 192 boolean isOpen = supplier.getAsBoolean(); 193 if (isOpen) { 194 if (nanos > 0) { 195 LockSupport.parkNanos(nanos); 196 } else { 197 LockSupport.park(); 198 } 199 } 200 } finally { 201 deregister(fdVal); 202 } 203 } 204 205 /** 206 * Registers the file descriptor to be polled at most once when the file descriptor 207 * is ready for I/O. 208 */ 209 private void register(int fdVal) throws IOException { 210 Thread previous = map.put(fdVal, Thread.currentThread()); 211 assert previous == null; 212 try { 213 implRegister(fdVal); 214 } catch (Throwable t) { 215 map.remove(fdVal); 216 throw t; 217 } 218 } 219 220 /** 221 * Deregister the file descriptor so that the file descriptor is not polled. 222 */ 223 private void deregister(int fdVal) { 224 Thread previous = map.remove(fdVal); 225 boolean polled = (previous == null); 226 assert polled || previous == Thread.currentThread(); 227 implDeregister(fdVal, polled); 228 } 229 230 /** 231 * Unparks any thread that is polling the given file descriptor. 232 */ 233 private void wakeup(int fdVal) { 234 Thread t = map.remove(fdVal); 235 if (t != null) { 236 LockSupport.unpark(t); 237 } 238 } 239 240 /** 241 * Master polling loop. The {@link #polled(int)} method is invoked for each file 242 * descriptor that is polled. 243 */ 244 private void pollerLoop() { 245 owner = Thread.currentThread(); 246 try { 247 for (;;) { 248 poll(-1); 249 } 250 } catch (Exception e) { 251 e.printStackTrace(); 252 } 253 } 254 255 /** 256 * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file 257 * descriptor that is polled. 258 * 259 * The sub-poller registers its file descriptor with the master poller to park until 260 * there are events to poll. When unparked, it does non-blocking polls and parks 261 * again when there are no more events. The sub-poller yields after each poll to help 262 * with fairness and to avoid re-registering with the master poller where possible. 263 */ 264 private void subPollerLoop(PollerGroup pollerGroup, Poller masterPoller) { 265 assert Thread.currentThread().isVirtual(); 266 owner = Thread.currentThread(); 267 try { 268 int polled = 0; 269 while (!pollerGroup.isShutdown()) { 270 if (polled == 0) { 271 masterPoller.poll(fdVal(), 0, () -> true); // park 272 } else { 273 Thread.yield(); 274 } 275 polled = poll(0); 276 } 277 } catch (Exception e) { 278 e.printStackTrace(); 279 } 280 } 281 282 @Override 283 public String toString() { 284 return String.format("%s [registered = %d, owner = %s]", 285 Objects.toIdentityString(this), map.size(), owner); 286 } 287 288 /** 289 * The read/write pollers for a virtual thread scheduler. 290 */ 291 private static class PollerGroup { 292 private final Executor scheduler; 293 private final Poller[] readPollers; 294 private final Poller[] writePollers; 295 private final Poller masterPoller; 296 private final Executor executor; 297 private volatile boolean shutdown; 298 299 PollerGroup(Executor scheduler, 300 Poller masterPoller, 301 int readPollerCount, 302 int writePollerCount) throws IOException { 303 boolean subPoller = (POLLER_MODE == Mode.VTHREAD_POLLERS); 304 Executor executor = null; 305 if (subPoller) { 306 String namePrefix; 307 if (scheduler == DEFAULT_SCHEDULER) { 308 namePrefix = "SubPoller-"; 309 } else { 310 namePrefix = Objects.toIdentityString(scheduler) + "-SubPoller-"; 311 } 312 @SuppressWarnings("restricted") 313 ThreadFactory factory = Thread.ofVirtual() 314 .scheduler(scheduler) 315 .inheritInheritableThreadLocals(false) 316 .name(namePrefix, 0) 317 .uncaughtExceptionHandler((_, e) -> e.printStackTrace()) 318 .factory(); 319 executor = Executors.newThreadPerTaskExecutor(factory); 320 } 321 322 // read and write pollers (or sub-pollers) 323 Poller[] readPollers = new Poller[readPollerCount]; 324 Poller[] writePollers = new Poller[writePollerCount]; 325 try { 326 for (int i = 0; i < readPollerCount; i++) { 327 readPollers[i] = PROVIDER.readPoller(subPoller); 328 } 329 for (int i = 0; i < writePollerCount; i++) { 330 writePollers[i] = PROVIDER.writePoller(subPoller); 331 } 332 } catch (Exception e) { 333 closeAll(readPollers); 334 closeAll(writePollers); 335 throw e; 336 } 337 338 this.scheduler = scheduler; 339 this.masterPoller = masterPoller; 340 this.readPollers = readPollers; 341 this.writePollers = writePollers; 342 this.executor = executor; 343 } 344 345 /** 346 * Create and starts the poller group for the default scheduler. 347 */ 348 static PollerGroup create() { 349 try { 350 Poller masterPoller = (POLLER_MODE == Mode.VTHREAD_POLLERS) 351 ? PROVIDER.readPoller(false) 352 : null; 353 PollerGroup pollerGroup; 354 try { 355 int rc = pollerCount("jdk.readPollers", PROVIDER.defaultReadPollers(POLLER_MODE)); 356 int wc = pollerCount("jdk.writePollers", PROVIDER.defaultWritePollers(POLLER_MODE)); 357 pollerGroup = new PollerGroup(DEFAULT_SCHEDULER, masterPoller, rc, wc); 358 } catch (Exception e) { 359 masterPoller.close(); 360 throw e; 361 } 362 pollerGroup.start(); 363 return pollerGroup; 364 } catch (IOException ioe) { 365 throw new UncheckedIOException(ioe); 366 } 367 } 368 369 /** 370 * Create and starts the poller group for a custom scheduler. 371 */ 372 static PollerGroup create(Executor scheduler) { 373 try { 374 Poller masterPoller = DEFAULT_POLLER_GROUP.get().masterPoller(); 375 var pollerGroup = new PollerGroup(scheduler, masterPoller, 1, 1); 376 pollerGroup.start(); 377 return pollerGroup; 378 } catch (IOException ioe) { 379 throw new UncheckedIOException(ioe); 380 } 381 } 382 383 /** 384 * Start poller threads. 385 */ 386 private void start() { 387 if (POLLER_MODE == Mode.VTHREAD_POLLERS) { 388 if (scheduler == DEFAULT_SCHEDULER) { 389 startPlatformThread("Master-Poller", masterPoller::pollerLoop); 390 } 391 Arrays.stream(readPollers).forEach(p -> { 392 executor.execute(() -> p.subPollerLoop(this, masterPoller)); 393 }); 394 Arrays.stream(writePollers).forEach(p -> { 395 executor.execute(() -> p.subPollerLoop(this, masterPoller)); 396 }); 397 } else { 398 // Mode.SYSTEM_THREADS 399 Arrays.stream(readPollers).forEach(p -> { 400 startPlatformThread("Read-Poller", p::pollerLoop); 401 }); 402 Arrays.stream(writePollers).forEach(p -> { 403 startPlatformThread("Write-Poller", p::pollerLoop); 404 }); 405 } 406 } 407 408 /** 409 * Close the given pollers. 410 */ 411 private void closeAll(Poller... pollers) { 412 for (Poller poller : pollers) { 413 if (poller != null) { 414 poller.close(); 415 } 416 } 417 } 418 419 /** 420 * Invoked during shutdown to unpark all subpoller threads and wait for 421 * them to terminate. 422 */ 423 private void shutdownPollers(Poller... pollers) { 424 boolean interrupted = false; 425 for (Poller poller : pollers) { 426 if (poller.owner instanceof Thread owner) { 427 LockSupport.unpark(owner); 428 while (owner.isAlive()) { 429 try { 430 owner.join(); 431 } catch (InterruptedException e) { 432 interrupted = true; 433 } 434 } 435 } 436 } 437 if (interrupted) { 438 Thread.currentThread().interrupt(); 439 } 440 } 441 442 void shutdown() { 443 if (scheduler == DEFAULT_SCHEDULER || POLLER_MODE == Mode.SYSTEM_THREADS) { 444 throw new UnsupportedOperationException(); 445 } 446 shutdown = true; 447 shutdownPollers(readPollers); 448 shutdownPollers(writePollers); 449 } 450 451 /** 452 * 453 * @return 454 */ 455 boolean isShutdown() { 456 return shutdown; 457 } 458 459 Poller masterPoller() { 460 return masterPoller; 461 } 462 463 List<Poller> readPollers() { 464 return List.of(readPollers); 465 } 466 467 List<Poller> writePollers() { 468 return List.of(writePollers); 469 } 470 471 /** 472 * Returns the read poller for the given file descriptor. 473 */ 474 Poller readPoller(int fdVal) { 475 int index = PROVIDER.fdValToIndex(fdVal, readPollers.length); 476 return readPollers[index]; 477 } 478 479 /** 480 * Returns the write poller for the given file descriptor. 481 */ 482 Poller writePoller(int fdVal) { 483 int index = PROVIDER.fdValToIndex(fdVal, writePollers.length); 484 return writePollers[index]; 485 } 486 487 /** 488 * Reads the given property name to get the poller count. If the property is 489 * set then the value must be a power of 2. Returns 1 if the property is not 490 * set. 491 * @throws IllegalArgumentException if the property is set to a value that 492 * is not a power of 2. 493 */ 494 private static int pollerCount(String propName, int defaultCount) { 495 String s = System.getProperty(propName); 496 int count = (s != null) ? Integer.parseInt(s) : defaultCount; 497 498 // check power of 2 499 if (count != Integer.highestOneBit(count)) { 500 String msg = propName + " is set to a value that is not a power of 2"; 501 throw new IllegalArgumentException(msg); 502 } 503 return count; 504 } 505 506 /** 507 * Starts a platform thread to run the given task. 508 */ 509 private void startPlatformThread(String name, Runnable task) { 510 try { 511 Thread thread = InnocuousThread.newSystemThread(name, task); 512 thread.setDaemon(true); 513 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace()); 514 thread.start(); 515 } catch (Exception e) { 516 throw new InternalError(e); 517 } 518 } 519 } 520 521 /** 522 * Returns the poller mode. 523 */ 524 private static Mode pollerMode() { 525 String s = System.getProperty("jdk.pollerMode"); 526 if (s != null) { 527 if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) { 528 return Mode.SYSTEM_THREADS; 529 } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) { 530 return Mode.VTHREAD_POLLERS; 531 } else { 532 throw new RuntimeException("Can't parse '" + s + "' as polling mode"); 533 } 534 } else { 535 return PROVIDER.defaultPollerMode(); 536 } 537 } 538 539 /** 540 * Returns the PollerGroup that the given thread uses to poll file descriptors. 541 */ 542 private static PollerGroup pollerGroup(Thread thread) { 543 if (POLLER_MODE == Mode.SYSTEM_THREADS) { 544 return DEFAULT_POLLER_GROUP.get(); 545 } 546 Executor scheduler; 547 if (thread.isVirtual()) { 548 scheduler = JLA.virtualThreadScheduler(thread); 549 } else { 550 scheduler = DEFAULT_SCHEDULER; 551 } 552 return POLLER_GROUPS.computeIfAbsent(scheduler, _ -> PollerGroup.create(scheduler)); 553 } 554 555 /** 556 * Invoked before the given scheduler is shutdown. In VTHREAD_POLLERS mode, this 557 * method will arrange for the sub poller threads to terminate. Does nothing in 558 * SYSTEM_THREADS mode. 559 */ 560 public static void beforeShutdown(Executor executor) { 561 if (POLLER_MODE == Mode.VTHREAD_POLLERS) { 562 PollerGroup group = POLLER_GROUPS.remove(executor); 563 if (group != null) { 564 group.shutdown(); 565 } 566 } 567 } 568 569 /** 570 * Return the master poller or null if there is no master poller. 571 */ 572 public static Poller masterPoller() { 573 return DEFAULT_POLLER_GROUP.get().masterPoller(); 574 } 575 576 /** 577 * Return the list of read pollers. 578 */ 579 public static List<Poller> readPollers() { 580 return DEFAULT_POLLER_GROUP.get().readPollers(); 581 } 582 583 /** 584 * Return the list of write pollers. 585 */ 586 public static List<Poller> writePollers() { 587 return DEFAULT_POLLER_GROUP.get().writePollers(); 588 } 589 590 }