1 /* 2 * Copyright (c) 2017, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.nio.ch; 26 27 import java.io.IOException; 28 import java.io.UncheckedIOException; 29 import java.lang.ref.Reference; 30 import java.util.Arrays; 31 import java.util.List; 32 import java.util.Map; 33 import java.util.Objects; 34 import java.util.concurrent.ConcurrentHashMap; 35 import java.util.concurrent.ExecutorService; 36 import java.util.concurrent.Executors; 37 import java.util.concurrent.ThreadFactory; 38 import java.util.concurrent.locks.LockSupport; 39 import java.util.function.BooleanSupplier; 40 import jdk.internal.access.JavaLangAccess; 41 import jdk.internal.access.SharedSecrets; 42 import jdk.internal.misc.InnocuousThread; 43 import jdk.internal.misc.TerminatingThreadLocal; 44 import jdk.internal.vm.Continuation; 45 import jdk.internal.vm.ContinuationSupport; 46 import jdk.internal.vm.annotation.Stable; 47 48 /** 49 * Polls file descriptors. Virtual threads invoke the poll method to park 50 * until a given file descriptor is ready for I/O. 51 */ 52 public abstract class Poller { 53 private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess(); 54 55 private static final PollerGroup POLLER_GROUP; 56 static { 57 try { 58 PollerProvider provider = PollerProvider.provider(); 59 Mode mode = pollerMode(provider.defaultPollerMode()); 60 PollerGroup group = switch (mode) { 61 case SYSTEM_THREADS -> new SystemThreadsPollerGroup(provider); 62 case VTHREAD_POLLERS -> new VirtualThreadsPollerGroup(provider); 63 case PER_CARRIER -> new PerCarrierPollerGroup(provider); 64 }; 65 group.start(); 66 POLLER_GROUP = group; 67 } catch (IOException ioe) { 68 throw new ExceptionInInitializerError(ioe); 69 } 70 } 71 72 // the poller or sub-poller thread 73 private @Stable Thread owner; 74 75 // maps file descriptors to parked Thread 76 private final Map<Integer, Thread> map = new ConcurrentHashMap<>(); 77 78 // shutdown if supported by poller group. 79 private volatile boolean shutdown; 80 81 /** 82 * Poller mode. 83 */ 84 enum Mode { 85 /** 86 * ReadPoller and WritePoller are dedicated platform threads that block waiting 87 * for events and unpark virtual threads when file descriptors are ready for I/O. 88 */ 89 SYSTEM_THREADS, 90 91 /** 92 * ReadPoller and WritePoller threads are virtual threads that poll for events, 93 * yielding between polls and unparking virtual threads when file descriptors are 94 * ready for I/O. If there are no events then the poller threads park until there 95 * are I/O events to poll. This mode helps to integrate polling with virtual 96 * thread scheduling. The approach is similar to the default scheme in "User-level 97 * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020 98 * (https://dl.acm.org/doi/10.1145/3379483). 99 */ 100 VTHREAD_POLLERS, 101 102 /** 103 * A dedicated ReadPoller is created for each carrier thread. 104 * There is one system-wide (carrier agnostic) WritePoller. 105 */ 106 PER_CARRIER 107 } 108 109 /** 110 * Initialize a Poller. 111 */ 112 protected Poller() { 113 } 114 115 /** 116 * Closes the poller and release resources. This method can only be used to cleanup 117 * when creating a poller group fails. 118 */ 119 abstract void close() throws IOException; 120 121 /** 122 * Returns the poller's thread owner. 123 */ 124 private Thread owner() { 125 return owner; 126 } 127 128 /** 129 * Sets the poller's thread owner. 130 */ 131 private void setOwner() { 132 owner = Thread.currentThread(); 133 } 134 135 /** 136 * Returns true if this poller is marked for shutdown. 137 */ 138 final boolean isShutdown() { 139 return shutdown; 140 } 141 142 /** 143 * Marks this poller for shutdown. 144 */ 145 private void setShutdown() { 146 shutdown = true; 147 } 148 149 /** 150 * Returns the poller's file descriptor to use when polling with the master poller. 151 * @throws UnsupportedOperationException if not supported 152 */ 153 int fdVal() { 154 throw new UnsupportedOperationException(); 155 } 156 157 /** 158 * Invoked if when this poller's file descriptor is polled by the master poller. 159 */ 160 void pollerPolled() throws IOException { 161 } 162 163 /** 164 * Register the file descriptor. The registration is "one shot", meaning it should 165 * be polled at most once. 166 */ 167 abstract void implRegister(int fdVal) throws IOException; 168 169 /** 170 * Deregister the file descriptor. 171 * @param polled true if the file descriptor has already been polled 172 */ 173 abstract void implDeregister(int fdVal, boolean polled) throws IOException; 174 175 /** 176 * Poll for events. The {@link #polled(int)} method is invoked for each 177 * polled file descriptor. 178 * 179 * @param timeout if positive then block for up to {@code timeout} milliseconds, 180 * if zero then don't block, if -1 then block indefinitely 181 * @return >0 if file descriptors are polled, 0 if no file descriptor polled 182 */ 183 abstract int poll(int timeout) throws IOException; 184 185 /** 186 * Wakeup the poller thread if blocked in poll. 187 * 188 * @throws UnsupportedOperationException if not supported 189 */ 190 void wakeupPoller() throws IOException { 191 throw new UnsupportedOperationException(); 192 } 193 194 /** 195 * Callback by the poll method when a file descriptor is polled. 196 */ 197 final void polled(int fdVal) { 198 Thread t = map.remove(fdVal); 199 if (t != null) { 200 LockSupport.unpark(t); 201 } 202 } 203 204 /** 205 * Parks the current thread until a file descriptor is ready for the given op. 206 * @param fdVal the file descriptor 207 * @param event POLLIN or POLLOUT 208 * @param nanos the waiting time or 0 to wait indefinitely 209 * @param isOpen supplies a boolean to indicate if the enclosing object is open 210 */ 211 static void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException { 212 POLLER_GROUP.poll(fdVal, event, nanos, isOpen); 213 } 214 215 /** 216 * Parks the current thread until a Selector's file descriptor is ready. 217 * @param fdVal the Selector's file descriptor 218 * @param nanos the waiting time or 0 to wait indefinitely 219 */ 220 static void pollSelector(int fdVal, long nanos) throws IOException { 221 POLLER_GROUP.pollSelector(fdVal, nanos); 222 } 223 224 /** 225 * Unpark the given thread so that it stops polling. 226 */ 227 static void stopPoll(Thread thread) { 228 LockSupport.unpark(thread); 229 } 230 231 /** 232 * Parks the current thread until a file descriptor is ready. 233 */ 234 private void poll(int fdVal, long nanos, BooleanSupplier isOpen) throws IOException { 235 register(fdVal); 236 try { 237 if (isOpen.getAsBoolean() && !isShutdown()) { 238 if (nanos > 0) { 239 LockSupport.parkNanos(nanos); 240 } else { 241 LockSupport.park(); 242 } 243 } 244 } finally { 245 deregister(fdVal); 246 } 247 } 248 249 /** 250 * Registers the file descriptor to be polled at most once when the file descriptor 251 * is ready for I/O. 252 */ 253 private void register(int fdVal) throws IOException { 254 Thread previous = map.put(fdVal, Thread.currentThread()); 255 assert previous == null; 256 try { 257 implRegister(fdVal); 258 } catch (Throwable t) { 259 map.remove(fdVal); 260 throw t; 261 } finally { 262 Reference.reachabilityFence(this); 263 } 264 } 265 266 /** 267 * Deregister the file descriptor so that the file descriptor is not polled. 268 */ 269 private void deregister(int fdVal) throws IOException { 270 Thread previous = map.remove(fdVal); 271 boolean polled = (previous == null); 272 assert polled || previous == Thread.currentThread(); 273 try { 274 implDeregister(fdVal, polled); 275 } finally { 276 Reference.reachabilityFence(this); 277 } 278 } 279 280 /** 281 * Master polling loop. The {@link #polled(int)} method is invoked for each file 282 * descriptor that is polled. 283 */ 284 private void pollerLoop() { 285 setOwner(); 286 try { 287 while (!isShutdown()) { 288 poll(-1); 289 } 290 } catch (Exception e) { 291 e.printStackTrace(); 292 } 293 } 294 295 /** 296 * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file 297 * descriptor that is polled. 298 * 299 * The sub-poller registers its file descriptor with the master poller to park until 300 * there are events to poll. When unparked, it does non-blocking polls and parks 301 * again when there are no more events. The sub-poller yields after each poll to help 302 * with fairness and to avoid re-registering with the master poller where possible. 303 */ 304 private void subPollerLoop(Poller masterPoller) { 305 assert Thread.currentThread().isVirtual(); 306 setOwner(); 307 try { 308 int polled = 0; 309 while (!isShutdown()) { 310 if (polled == 0) { 311 masterPoller.poll(fdVal(), 0, () -> true); // park 312 pollerPolled(); 313 } else { 314 Thread.yield(); 315 } 316 polled = poll(0); 317 } 318 } catch (Exception e) { 319 e.printStackTrace(); 320 } 321 } 322 323 /** 324 * Unparks all threads waiting on a file descriptor registered with this poller. 325 */ 326 private void wakeupAll() { 327 map.values().forEach(LockSupport::unpark); 328 } 329 330 @Override 331 public String toString() { 332 return String.format("%s [registered = %d, owner = %s]", 333 Objects.toIdentityString(this), map.size(), owner); 334 } 335 336 /** 337 * A group of poller threads that support virtual threads polling file descriptors. 338 */ 339 private static abstract class PollerGroup { 340 private final PollerProvider provider; 341 342 PollerGroup(PollerProvider provider) { 343 this.provider = provider; 344 } 345 346 PollerProvider provider() { 347 return provider; 348 } 349 350 /** 351 * Starts the poller group and any system-wide poller threads. 352 */ 353 abstract void start(); 354 355 /** 356 * Parks the current thread until a file descriptor is ready for the given op. 357 */ 358 abstract void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException; 359 360 /** 361 * Parks the current thread until a Selector's file descriptor is ready. 362 */ 363 void pollSelector(int fdVal, long nanos) throws IOException { 364 poll(fdVal, Net.POLLIN, nanos, () -> true); 365 } 366 367 /** 368 * Starts a platform thread to run the given task. 369 */ 370 protected final void startPlatformThread(String name, Runnable task) { 371 Thread thread = InnocuousThread.newSystemThread(name, task); 372 thread.setDaemon(true); 373 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace()); 374 thread.start(); 375 } 376 377 /** 378 * Return the master poller for default scheduler, or null if no master poller. 379 */ 380 abstract Poller defaultMasterPoller(); 381 382 /** 383 * Return the read pollers for the default scheduler. 384 */ 385 abstract List<Poller> defaultReadPollers(); 386 387 /** 388 * Return the write pollers for the default scheduler. 389 */ 390 abstract List<Poller> defaultWritePollers(); 391 } 392 393 /** 394 * The poller group for the SYSTEM_THREADS polling mode. The read and write pollers 395 * are system-wide (scheduler agnostic) platform threads. 396 */ 397 private static class SystemThreadsPollerGroup extends PollerGroup { 398 // system-wide read and write pollers 399 private final Poller[] readPollers; 400 private final Poller[] writePollers; 401 402 SystemThreadsPollerGroup(PollerProvider provider) throws IOException { 403 super(provider); 404 405 int readPollerCount = pollerCount("jdk.readPollers", 406 provider.defaultReadPollers(Mode.SYSTEM_THREADS)); 407 int writePollerCount = pollerCount("jdk.writePollers", 408 provider.defaultWritePollers(Mode.SYSTEM_THREADS)); 409 410 Poller[] readPollers = new Poller[readPollerCount]; 411 Poller[] writePollers = new Poller[writePollerCount]; 412 try { 413 for (int i = 0; i < readPollerCount; i++) { 414 readPollers[i] = provider.readPoller(false); 415 } 416 for (int i = 0; i < writePollerCount; i++) { 417 writePollers[i] = provider.writePoller(false); 418 } 419 } catch (Throwable e) { 420 closeAll(readPollers); 421 closeAll(writePollers); 422 throw e; 423 } 424 425 this.readPollers = readPollers; 426 this.writePollers = writePollers; 427 } 428 429 /** 430 * Close the given pollers. 431 */ 432 private static void closeAll(Poller... pollers) { 433 for (Poller poller : pollers) { 434 if (poller != null) { 435 try { 436 poller.close(); 437 } catch (IOException _) { } 438 } 439 } 440 } 441 442 /** 443 * Start poller threads. 444 */ 445 @Override 446 void start() { 447 Arrays.stream(readPollers).forEach(p -> { 448 startPlatformThread("Read-Poller", p::pollerLoop); 449 }); 450 Arrays.stream(writePollers).forEach(p -> { 451 startPlatformThread("Write-Poller", p::pollerLoop); 452 }); 453 } 454 455 private Poller readPoller(int fdVal) { 456 int index = provider().fdValToIndex(fdVal, readPollers.length); 457 return readPollers[index]; 458 } 459 460 private Poller writePoller(int fdVal) { 461 int index = provider().fdValToIndex(fdVal, writePollers.length); 462 return writePollers[index]; 463 } 464 465 @Override 466 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException { 467 Poller poller = (event == Net.POLLIN) 468 ? readPoller(fdVal) 469 : writePoller(fdVal); 470 poller.poll(fdVal, nanos, isOpen); 471 } 472 473 @Override 474 Poller defaultMasterPoller() { 475 return null; 476 } 477 478 @Override 479 List<Poller> defaultReadPollers() { 480 return List.of(readPollers); 481 } 482 483 @Override 484 List<Poller> defaultWritePollers() { 485 return List.of(writePollers); 486 } 487 } 488 489 /** 490 * The poller group for the VTHREAD_POLLERS poller mode. The read and write pollers 491 * are scheduler-specific virtual threads. The default scheduler may have many read 492 * and write pollers. Custom schedulers have one read poller and one write poller. 493 * When read and write pollers must block then they register with a system-wide 494 * (scheduler agnostic) "master poller" that runs in a dedicated platform thread. 495 */ 496 private static class VirtualThreadsPollerGroup extends PollerGroup { 497 private static final Thread.VirtualThreadScheduler DEFAULT_SCHEDULER = JLA.defaultVirtualThreadScheduler(); 498 499 // system-wide master poller 500 private final Poller masterPoller; 501 502 // number of read and write pollers for default scheduler 503 private final int defaultReadPollerCount; 504 private final int defaultWritePollerCount; 505 506 // maps scheduler to a set of read and write pollers 507 private record Pollers(ExecutorService executor, Poller[] readPollers, Poller[] writePollers) { } 508 private final Map<Thread.VirtualThreadScheduler, Pollers> POLLERS = new ConcurrentHashMap<>(); 509 510 VirtualThreadsPollerGroup(PollerProvider provider) throws IOException { 511 super(provider); 512 513 var mode = Mode.VTHREAD_POLLERS; 514 this.defaultReadPollerCount = pollerCount("jdk.readPollers", 515 provider.defaultReadPollers(mode)); 516 this.defaultWritePollerCount = pollerCount("jdk.writePollers", 517 provider.defaultWritePollers(mode)); 518 this.masterPoller = provider.readPoller(false); 519 } 520 521 @Override 522 void start() { 523 startPlatformThread("Master-Poller", masterPoller::pollerLoop); 524 } 525 526 /** 527 * Create the read and write pollers for the given scheduler. 528 */ 529 private Pollers createPollers(Thread.VirtualThreadScheduler scheduler) { 530 int readPollerCount; 531 int writePollerCount; 532 if (scheduler == DEFAULT_SCHEDULER) { 533 readPollerCount = defaultReadPollerCount; 534 writePollerCount = defaultWritePollerCount; 535 } else { 536 readPollerCount = 1; 537 writePollerCount = 1; 538 } 539 540 Poller[] readPollers = new Poller[readPollerCount]; 541 Poller[] writePollers = new Poller[writePollerCount]; 542 try { 543 for (int i = 0; i < readPollerCount; i++) { 544 readPollers[i] = provider().readPoller(true); 545 } 546 for (int i = 0; i < writePollerCount; i++) { 547 writePollers[i] = provider().writePoller(true); 548 } 549 } catch (IOException ioe) { 550 closeAll(readPollers); 551 closeAll(writePollers); 552 throw new UncheckedIOException(ioe); 553 } 554 555 @SuppressWarnings("restricted") 556 ThreadFactory factory = Thread.ofVirtual() 557 .scheduler(scheduler) 558 .inheritInheritableThreadLocals(false) 559 .name("SubPoller-", 0) 560 .uncaughtExceptionHandler((_, e) -> e.printStackTrace()) 561 .factory(); 562 ExecutorService executor = Executors.newThreadPerTaskExecutor(factory); 563 564 Arrays.stream(readPollers).forEach(p -> { 565 executor.execute(() -> p.subPollerLoop(masterPoller)); 566 }); 567 Arrays.stream(writePollers).forEach(p -> { 568 executor.execute(() -> p.subPollerLoop(masterPoller)); 569 }); 570 571 return new Pollers(executor, readPollers, writePollers); 572 } 573 574 @Override 575 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException { 576 Thread.VirtualThreadScheduler scheduler; 577 if (Thread.currentThread().isVirtual()) { 578 scheduler = JLA.virtualThreadScheduler(Thread.currentThread()); 579 } else { 580 scheduler = DEFAULT_SCHEDULER; 581 } 582 Pollers pollers; 583 try { 584 pollers = POLLERS.computeIfAbsent(scheduler, _ -> createPollers(scheduler)); 585 } catch (UncheckedIOException uioe) { 586 throw uioe.getCause(); 587 } 588 589 Poller poller; 590 if (event == Net.POLLIN) { 591 Poller[] readPollers = pollers.readPollers(); 592 int index = provider().fdValToIndex(fdVal, readPollers.length); 593 poller = readPollers[index]; 594 } else { 595 Poller[] writePollers = pollers.writePollers(); 596 int index = provider().fdValToIndex(fdVal, writePollers.length); 597 poller = writePollers[index]; 598 } 599 poller.poll(fdVal, nanos, isOpen); 600 } 601 602 @Override 603 void pollSelector(int fdVal, long nanos) throws IOException { 604 masterPoller.poll(fdVal, nanos, () -> true); 605 } 606 607 /** 608 * Close the given pollers. 609 */ 610 private static void closeAll(Poller... pollers) { 611 for (Poller poller : pollers) { 612 if (poller != null) { 613 try { 614 poller.close(); 615 } catch (IOException _) { } 616 } 617 } 618 } 619 620 @Override 621 Poller defaultMasterPoller() { 622 return masterPoller; 623 } 624 625 @Override 626 List<Poller> defaultReadPollers() { 627 Pollers pollers = POLLERS.get(DEFAULT_SCHEDULER); 628 return (pollers != null) ? List.of(pollers.readPollers()) : List.of(); 629 } 630 631 @Override 632 List<Poller> defaultWritePollers() { 633 Pollers pollers = POLLERS.get(DEFAULT_SCHEDULER); 634 return (pollers != null) ? List.of(pollers.writePollers()) : List.of(); 635 } 636 } 637 638 /** 639 * The poller group for the PER_CARRIER polling mode. A dedicated read poller is 640 * created for each carrier thread. When a virtual thread polls a file descriptor 641 * for POLLIN, then it will use (almost always, not guaranteed) to see the read 642 * poller for its carrier. The read poller terminates if carrier thread terminates. 643 * There is one system-wide (carrier agnostic) write poller. 644 */ 645 private static class PerCarrierPollerGroup extends PollerGroup { 646 private static final boolean USE_SUBPOLLER; 647 static { 648 String s = System.getProperty("jdk.useSubPoller"); 649 USE_SUBPOLLER = (s == null) || s.isEmpty() || Boolean.parseBoolean(s); 650 } 651 652 private static final TerminatingThreadLocal<PerCarrierPollerGroup> POLLER_GROUP = 653 new TerminatingThreadLocal<>() { 654 @Override 655 protected void threadTerminated(PerCarrierPollerGroup pollerGroup) { 656 pollerGroup.carrierTerminated(); 657 } 658 }; 659 660 // -XX:-VMContinuations or called from platform thread 661 private static final Thread PLACEHOLDER = new Thread(); 662 663 // maps carrier thread to its read poller 664 private final Map<Thread, Poller> READ_POLLERS = new ConcurrentHashMap<>(); 665 666 private final Poller writePoller; 667 private final Poller masterPoller; 668 669 PerCarrierPollerGroup(PollerProvider provider) throws IOException { 670 super(provider); 671 672 this.writePoller = provider.writePoller(false); 673 if (USE_SUBPOLLER) { 674 this.masterPoller = provider.readPoller(false); 675 } else { 676 this.masterPoller = null; 677 } 678 } 679 680 @Override 681 void start() { 682 startPlatformThread("Write-Poller", writePoller::pollerLoop); 683 if (masterPoller != null) { 684 startPlatformThread("Master-Poller", masterPoller::pollerLoop); 685 } 686 } 687 688 /** 689 * Starts a read poller with the given name. 690 * @throws UncheckedIOException if an I/O error occurs 691 */ 692 private Poller startReadPoller(String name) { 693 try { 694 Poller readPoller = provider().readPoller(USE_SUBPOLLER); 695 if (USE_SUBPOLLER) { 696 var scheduler = JLA.virtualThreadScheduler(Thread.currentThread()); 697 @SuppressWarnings("restricted") 698 var _ = Thread.ofVirtual().scheduler(scheduler) 699 .inheritInheritableThreadLocals(false) 700 .name(name) 701 .uncaughtExceptionHandler((_, e) -> e.printStackTrace()) 702 .start(() -> subPollerLoop(readPoller)); 703 } else { 704 startPlatformThread(name, () -> pollLoop(readPoller)); 705 } 706 return readPoller; 707 } catch (IOException ioe) { 708 throw new UncheckedIOException(ioe); 709 } 710 } 711 712 @Override 713 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException { 714 if (event == Net.POLLOUT) { 715 writePoller.poll(fdVal, nanos, isOpen); 716 return; 717 } 718 719 assert event == Net.POLLIN; 720 if (Thread.currentThread().isVirtual() && ContinuationSupport.isSupported()) { 721 Poller readPoller; 722 // get read poller for this carrier 723 Continuation.pin(); 724 try { 725 Thread carrier = JLA.currentCarrierThread(); 726 try { 727 readPoller = READ_POLLERS.computeIfAbsent(carrier, _ -> { 728 String name = carrier.getName() + "-Read-Poller"; 729 return startReadPoller(name); 730 }); 731 } catch (UncheckedIOException uioe) { 732 throw uioe.getCause(); 733 } 734 POLLER_GROUP.set(this); 735 } finally { 736 Continuation.unpin(); 737 } 738 // may execute on a different carrier 739 readPoller.poll(fdVal, nanos, isOpen); 740 return; 741 } 742 743 // -XX:-VMContinuations or called from platform thread 744 if (masterPoller != null) { 745 masterPoller.poll(fdVal, nanos, isOpen); 746 } else { 747 Poller readPoller; 748 try { 749 readPoller = READ_POLLERS.computeIfAbsent(PLACEHOLDER, 750 _ -> startReadPoller("Read-Poller")); 751 } catch (UncheckedIOException uioe) { 752 throw uioe.getCause(); 753 } 754 readPoller.poll(fdVal, nanos, isOpen); 755 } 756 } 757 758 @Override 759 void pollSelector(int fdVal, long nanos) throws IOException { 760 masterPoller.poll(fdVal, nanos, () -> true); 761 } 762 763 /** 764 * Read-poller polling loop. 765 */ 766 private void pollLoop(Poller readPoller) { 767 try { 768 readPoller.pollerLoop(); 769 } finally { 770 // wakeup all threads waiting on file descriptors registered with the 771 // read poller, these I/O operation will migrate to another carrier. 772 readPoller.wakeupAll(); 773 } 774 } 775 776 /** 777 * Read-poll sub-poller polling loop. 778 */ 779 private void subPollerLoop(Poller readPoller) { 780 try { 781 readPoller.subPollerLoop(masterPoller); 782 } finally { 783 // wakeup all threads waiting on file descriptors registered with the 784 // read poller, these I/O operation will migrate to another carrier. 785 readPoller.wakeupAll(); 786 } 787 } 788 789 /** 790 * Invoked by the carrier thread before it terminates. 791 */ 792 private void carrierTerminated() { 793 Poller readPoller = READ_POLLERS.remove(Thread.currentThread()); 794 if (readPoller != null) { 795 readPoller.setShutdown(); 796 try { 797 readPoller.wakeupPoller(); 798 } catch (Throwable e) { 799 e.printStackTrace(); 800 } 801 } 802 } 803 804 @Override 805 Poller defaultMasterPoller() { 806 return masterPoller; 807 } 808 809 @Override 810 List<Poller> defaultReadPollers() { 811 // return all read pollers for now. 812 return READ_POLLERS.values().stream().toList(); 813 } 814 815 @Override 816 List<Poller> defaultWritePollers() { 817 return List.of(writePoller); 818 } 819 } 820 821 /** 822 * Returns the poller mode. 823 */ 824 private static Mode pollerMode(Mode defaultPollerMode) { 825 String s = System.getProperty("jdk.pollerMode"); 826 if (s != null) { 827 if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) { 828 return Mode.SYSTEM_THREADS; 829 } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) { 830 return Mode.VTHREAD_POLLERS; 831 } else if (s.equalsIgnoreCase(Mode.PER_CARRIER.name()) || s.equals("3")) { 832 return Mode.PER_CARRIER; 833 } else { 834 throw new RuntimeException("Can't parse '" + s + "' as polling mode"); 835 } 836 } else { 837 return defaultPollerMode; 838 } 839 } 840 841 /** 842 * Reads the given property name to get the poller count. If the property is 843 * set then the value must be a power of 2. Returns 1 if the property is not 844 * set. 845 * @throws IllegalArgumentException if the property is set to a value that 846 * is not a power of 2. 847 */ 848 private static int pollerCount(String propName, int defaultCount) { 849 String s = System.getProperty(propName); 850 int count = (s != null) ? Integer.parseInt(s) : defaultCount; 851 852 // check power of 2 853 if (count != Integer.highestOneBit(count)) { 854 String msg = propName + " is set to a value that is not a power of 2"; 855 throw new IllegalArgumentException(msg); 856 } 857 return count; 858 } 859 860 /** 861 * Return the master poller or null if there is no master poller. 862 */ 863 public static Poller masterPoller() { 864 return POLLER_GROUP.defaultMasterPoller(); 865 } 866 867 /** 868 * Return the list of read pollers. 869 */ 870 public static List<Poller> readPollers() { 871 return POLLER_GROUP.defaultReadPollers(); 872 } 873 874 /** 875 * Return the list of write pollers. 876 */ 877 public static List<Poller> writePollers() { 878 return POLLER_GROUP.defaultWritePollers(); 879 } 880 }