1 /* 2 * Copyright (c) 2017, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.nio.ch; 26 27 import java.io.IOException; 28 import java.io.UncheckedIOException; 29 import java.lang.ref.Reference; 30 import java.util.Arrays; 31 import java.util.List; 32 import java.util.Map; 33 import java.util.Objects; 34 import java.util.Set; 35 import java.util.concurrent.ConcurrentHashMap; 36 import java.util.concurrent.Executor; 37 import java.util.concurrent.Executors; 38 import java.util.concurrent.ThreadFactory; 39 import java.util.concurrent.locks.LockSupport; 40 import java.util.function.BooleanSupplier; 41 import jdk.internal.access.JavaLangAccess; 42 import jdk.internal.access.SharedSecrets; 43 import jdk.internal.misc.InnocuousThread; 44 import jdk.internal.misc.TerminatingThreadLocal; 45 import jdk.internal.vm.Continuation; 46 import jdk.internal.vm.ContinuationSupport; 47 import jdk.internal.vm.annotation.Stable; 48 49 /** 50 * Polls file descriptors. Virtual threads invoke the poll method to park 51 * until a given file descriptor is ready for I/O. 52 */ 53 public abstract class Poller { 54 private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess(); 55 56 // the poller group for the I/O pollers and poller threads 57 private static final PollerGroup POLLER_GROUP = createPollerGroup(); 58 59 // the poller or sub-poller thread (used for observability only) 60 private @Stable Thread owner; 61 62 // maps file descriptors to parked Thread 63 private final Map<Integer, Thread> map = new ConcurrentHashMap<>(); 64 65 // shutdown (if supported by poller group) 66 private volatile boolean shutdown; 67 68 /** 69 * Poller mode. 70 */ 71 enum Mode { 72 /** 73 * Read and write pollers are platform threads that block waiting for events and 74 * unpark virtual threads when file descriptors are ready for I/O. 75 */ 76 SYSTEM_THREADS, 77 78 /** 79 * Read and write pollers are virtual threads that poll for events, yielding 80 * between polls and unparking virtual threads when file descriptors are 81 * ready for I/O. If there are no events then the poller threads park until there 82 * are I/O events to poll. This mode helps to integrate polling with virtual 83 * thread scheduling. The approach is similar to the default scheme in "User-level 84 * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020 85 * (https://dl.acm.org/doi/10.1145/3379483). 86 */ 87 VTHREAD_POLLERS, 88 89 /** 90 * Read pollers are per-carrier virtual threads that poll for events, yielding 91 * between polls and unparking virtual threads when file descriptors are ready 92 * for I/O. If there are no events then the poller threads park until there 93 * are I/O events to poll. The write poller is a system-wide platform thread. 94 */ 95 POLLER_PER_CARRIER 96 } 97 98 /** 99 * Create and return the PollerGroup. 100 */ 101 private static PollerGroup createPollerGroup() { 102 try { 103 PollerProvider provider; 104 if (System.getProperty("jdk.pollerMode") instanceof String s) { 105 Mode mode = switch (s) { 106 case "1" -> Mode.SYSTEM_THREADS; 107 case "2" -> Mode.VTHREAD_POLLERS; 108 case "3" -> Mode.POLLER_PER_CARRIER; 109 default -> { 110 throw new RuntimeException(s + " is not a valid polling mode"); 111 } 112 }; 113 provider = PollerProvider.createProvider(mode); 114 } else { 115 provider = PollerProvider.createProvider(); 116 } 117 118 int readPollers = pollerCount("jdk.readPollers", provider.defaultReadPollers()); 119 int writePollers = pollerCount("jdk.writePollers", provider.defaultWritePollers()); 120 PollerGroup group = switch (provider.pollerMode()) { 121 case SYSTEM_THREADS -> new SystemThreadsPollerGroup(provider, readPollers, writePollers); 122 case VTHREAD_POLLERS -> new VThreadsPollerGroup(provider, readPollers, writePollers); 123 case POLLER_PER_CARRIER -> new PollerPerCarrierPollerGroup(provider, writePollers); 124 }; 125 group.start(); 126 return group; 127 } catch (IOException ioe) { 128 throw new UncheckedIOException(ioe); 129 } 130 } 131 132 /** 133 * Initialize a Poller. 134 */ 135 protected Poller() { 136 } 137 138 /** 139 * Closes the poller and release resources. This method can only be used to cleanup 140 * when creating a poller group fails. 141 */ 142 abstract void close() throws IOException; 143 144 /** 145 * Sets the poller's thread owner. 146 */ 147 private void setOwner() { 148 owner = Thread.currentThread(); 149 } 150 151 /** 152 * Returns true if this poller is marked for shutdown. 153 */ 154 boolean isShutdown() { 155 return shutdown; 156 } 157 158 /** 159 * Marks this poller for shutdown. 160 */ 161 private void setShutdown() { 162 shutdown = true; 163 } 164 165 /** 166 * Returns the poller's file descriptor to use when polling with the master poller. 167 * @throws UnsupportedOperationException if not supported 168 */ 169 int fdVal() { 170 throw new UnsupportedOperationException(); 171 } 172 173 /** 174 * Invoked if when this poller's file descriptor is polled by the master poller. 175 */ 176 void pollerPolled() throws IOException { 177 } 178 179 /** 180 * Register the file descriptor. The registration is "one shot", meaning it should 181 * be polled at most once. 182 */ 183 abstract void implRegister(int fdVal) throws IOException; 184 185 /** 186 * Deregister the file descriptor. 187 * @param polled true if the file descriptor has already been polled 188 */ 189 abstract void implDeregister(int fdVal, boolean polled) throws IOException; 190 191 /** 192 * Poll for events. The {@link #polled(int)} method is invoked for each 193 * polled file descriptor. 194 * 195 * @param timeout if positive then block for up to {@code timeout} milliseconds, 196 * if zero then don't block, if -1 then block indefinitely 197 * @return >0 if file descriptors are polled, 0 if no file descriptor polled 198 */ 199 abstract int poll(int timeout) throws IOException; 200 201 /** 202 * Wakeup the poller thread if blocked in poll. 203 * 204 * @throws UnsupportedOperationException if not supported 205 */ 206 void wakeupPoller() throws IOException { 207 throw new UnsupportedOperationException(); 208 } 209 210 /** 211 * Callback by the poll method when a file descriptor is polled. 212 */ 213 final void polled(int fdVal) { 214 Thread t = map.remove(fdVal); 215 if (t != null) { 216 LockSupport.unpark(t); 217 } 218 } 219 220 /** 221 * Parks the current thread until a file descriptor is ready for the given op. 222 * @param fdVal the file descriptor 223 * @param event POLLIN or POLLOUT 224 * @param nanos the waiting time or 0 to wait indefinitely 225 * @param isOpen supplies a boolean to indicate if the enclosing object is open 226 */ 227 public static void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException { 228 POLLER_GROUP.poll(fdVal, event, nanos, isOpen); 229 } 230 231 /** 232 * Parks the current thread until a Selector's file descriptor is ready. 233 * @param fdVal the Selector's file descriptor 234 * @param nanos the waiting time or 0 to wait indefinitely 235 */ 236 public static void pollSelector(int fdVal, long nanos) throws IOException { 237 POLLER_GROUP.pollSelector(fdVal, nanos); 238 } 239 240 /** 241 * Unpark the given thread so that it stops polling. 242 */ 243 public static void stopPoll(Thread thread) { 244 LockSupport.unpark(thread); 245 } 246 247 /** 248 * Parks the current thread until a file descriptor is ready. 249 */ 250 private void poll(int fdVal, long nanos, BooleanSupplier isOpen) throws IOException { 251 register(fdVal); 252 try { 253 if (isOpen.getAsBoolean() && !isShutdown()) { 254 if (nanos > 0) { 255 LockSupport.parkNanos(nanos); 256 } else { 257 LockSupport.park(); 258 } 259 } 260 } finally { 261 deregister(fdVal); 262 } 263 } 264 265 /** 266 * Registers the file descriptor to be polled at most once when the file descriptor 267 * is ready for I/O. 268 */ 269 private void register(int fdVal) throws IOException { 270 Thread previous = map.put(fdVal, Thread.currentThread()); 271 assert previous == null; 272 try { 273 implRegister(fdVal); 274 } catch (Throwable t) { 275 map.remove(fdVal); 276 throw t; 277 } finally { 278 Reference.reachabilityFence(this); 279 } 280 } 281 282 /** 283 * Deregister the file descriptor so that the file descriptor is not polled. 284 */ 285 private void deregister(int fdVal) throws IOException { 286 Thread previous = map.remove(fdVal); 287 boolean polled = (previous == null); 288 assert polled || previous == Thread.currentThread(); 289 try { 290 implDeregister(fdVal, polled); 291 } finally { 292 Reference.reachabilityFence(this); 293 } 294 } 295 296 /** 297 * Master polling loop. The {@link #polled(int)} method is invoked for each file 298 * descriptor that is polled. 299 */ 300 private void pollerLoop() { 301 setOwner(); 302 try { 303 while (!isShutdown()) { 304 poll(-1); 305 } 306 } catch (Exception e) { 307 e.printStackTrace(); 308 } 309 } 310 311 /** 312 * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file 313 * descriptor that is polled. 314 * 315 * The sub-poller registers its file descriptor with the master poller to park until 316 * there are events to poll. When unparked, it does non-blocking polls and parks 317 * again when there are no more events. The sub-poller yields after each poll to help 318 * with fairness and to avoid re-registering with the master poller where possible. 319 */ 320 private void subPollerLoop(Poller masterPoller) { 321 assert Thread.currentThread().isVirtual(); 322 setOwner(); 323 try { 324 int polled = 0; 325 while (!isShutdown()) { 326 if (polled == 0) { 327 masterPoller.poll(fdVal(), 0, () -> true); // park 328 pollerPolled(); 329 } else { 330 Thread.yield(); 331 } 332 polled = poll(0); 333 } 334 } catch (Exception e) { 335 e.printStackTrace(); 336 } 337 } 338 339 /** 340 * Unparks all threads waiting on a file descriptor registered with this poller. 341 */ 342 private void wakeupAll() { 343 map.values().forEach(LockSupport::unpark); 344 } 345 346 @Override 347 public String toString() { 348 return String.format("%s [registered = %d, owner = %s]", 349 Objects.toIdentityString(this), map.size(), owner); 350 } 351 352 /** 353 * A group of poller threads that support virtual threads polling file descriptors. 354 */ 355 private static abstract class PollerGroup { 356 private final PollerProvider provider; 357 358 PollerGroup(PollerProvider provider) { 359 this.provider = provider; 360 } 361 362 final PollerProvider provider() { 363 return provider; 364 } 365 366 /** 367 * Starts the poller group and any system-wide poller threads. 368 */ 369 abstract void start(); 370 371 /** 372 * Parks the current thread until a file descriptor is ready for the given op. 373 */ 374 abstract void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException; 375 376 /** 377 * Parks the current thread until a Selector's file descriptor is ready. 378 */ 379 void pollSelector(int fdVal, long nanos) throws IOException { 380 poll(fdVal, Net.POLLIN, nanos, () -> true); 381 } 382 383 /** 384 * Starts a platform thread to run the given task. 385 */ 386 protected final void startPlatformThread(String name, Runnable task) { 387 Thread thread = InnocuousThread.newSystemThread(name, task); 388 thread.setDaemon(true); 389 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace()); 390 thread.start(); 391 } 392 393 /** 394 * Return the master poller, or null if no master poller. 395 */ 396 abstract Poller masterPoller(); 397 398 /** 399 * Return the read pollers. 400 */ 401 abstract List<Poller> readPollers(); 402 403 /** 404 * Return the write pollers. 405 */ 406 abstract List<Poller> writePollers(); 407 408 /** 409 * Close the given pollers. 410 */ 411 static void closeAll(Poller... pollers) { 412 for (Poller poller : pollers) { 413 if (poller != null) { 414 try { 415 poller.close(); 416 } catch (IOException _) { } 417 } 418 } 419 } 420 } 421 422 /** 423 * SYSTEM_THREADS poller group. The read and write pollers are system-wide platform threads. 424 */ 425 private static class SystemThreadsPollerGroup extends PollerGroup { 426 // system-wide read and write pollers 427 private final Poller[] readPollers; 428 private final Poller[] writePollers; 429 430 SystemThreadsPollerGroup(PollerProvider provider, 431 int readPollerCount, 432 int writePollerCount) throws IOException { 433 super(provider); 434 Poller[] readPollers = new Poller[readPollerCount]; 435 Poller[] writePollers = new Poller[writePollerCount]; 436 try { 437 for (int i = 0; i < readPollerCount; i++) { 438 readPollers[i] = provider.readPoller(false); 439 } 440 for (int i = 0; i < writePollerCount; i++) { 441 writePollers[i] = provider.writePoller(false); 442 } 443 } catch (Throwable e) { 444 closeAll(readPollers); 445 closeAll(writePollers); 446 throw e; 447 } 448 449 this.readPollers = readPollers; 450 this.writePollers = writePollers; 451 } 452 453 @Override 454 void start() { 455 Arrays.stream(readPollers).forEach(p -> { 456 startPlatformThread("Read-Poller", p::pollerLoop); 457 }); 458 Arrays.stream(writePollers).forEach(p -> { 459 startPlatformThread("Write-Poller", p::pollerLoop); 460 }); 461 } 462 463 private Poller readPoller(int fdVal) { 464 int index = provider().fdValToIndex(fdVal, readPollers.length); 465 return readPollers[index]; 466 } 467 468 private Poller writePoller(int fdVal) { 469 int index = provider().fdValToIndex(fdVal, writePollers.length); 470 return writePollers[index]; 471 } 472 473 @Override 474 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException { 475 Poller poller = (event == Net.POLLIN) 476 ? readPoller(fdVal) 477 : writePoller(fdVal); 478 poller.poll(fdVal, nanos, isOpen); 479 } 480 481 @Override 482 Poller masterPoller() { 483 return null; 484 } 485 486 @Override 487 List<Poller> readPollers() { 488 return List.of(readPollers); 489 } 490 491 @Override 492 List<Poller> writePollers() { 493 return List.of(writePollers); 494 } 495 } 496 497 /** 498 * VTHREAD_POLLERS poller group. The read and write pollers are virtual threads. 499 * When read and write pollers need to block then they register with a system-wide 500 * "master poller" that runs in a dedicated platform thread. 501 */ 502 private static class VThreadsPollerGroup extends PollerGroup { 503 private final Poller masterPoller; 504 private final Poller[] readPollers; 505 private final Poller[] writePollers; 506 507 // keep virtual thread pollers alive 508 private final Executor executor; 509 510 VThreadsPollerGroup(PollerProvider provider, 511 int readPollerCount, 512 int writePollerCount) throws IOException { 513 super(provider); 514 Poller masterPoller = provider.readPoller(false); 515 Poller[] readPollers = new Poller[readPollerCount]; 516 Poller[] writePollers = new Poller[writePollerCount]; 517 518 try { 519 for (int i = 0; i < readPollerCount; i++) { 520 readPollers[i] = provider.readPoller(true); 521 } 522 for (int i = 0; i < writePollerCount; i++) { 523 writePollers[i] = provider.writePoller(true); 524 } 525 } catch (Throwable e) { 526 masterPoller.close(); 527 closeAll(readPollers); 528 closeAll(writePollers); 529 throw e; 530 } 531 532 this.masterPoller = masterPoller; 533 this.readPollers = readPollers; 534 this.writePollers = writePollers; 535 536 ThreadFactory factory = Thread.ofVirtual() 537 .inheritInheritableThreadLocals(false) 538 .name("SubPoller-", 0) 539 .uncaughtExceptionHandler((_, e) -> e.printStackTrace()) 540 .factory(); 541 this.executor = Executors.newThreadPerTaskExecutor(factory); 542 } 543 544 @Override 545 void start() { 546 startPlatformThread("Master-Poller", masterPoller::pollerLoop); 547 Arrays.stream(readPollers).forEach(p -> { 548 executor.execute(() -> p.subPollerLoop(masterPoller)); 549 }); 550 Arrays.stream(writePollers).forEach(p -> { 551 executor.execute(() -> p.subPollerLoop(masterPoller)); 552 }); 553 } 554 555 private Poller readPoller(int fdVal) { 556 int index = provider().fdValToIndex(fdVal, readPollers.length); 557 return readPollers[index]; 558 } 559 560 private Poller writePoller(int fdVal) { 561 int index = provider().fdValToIndex(fdVal, writePollers.length); 562 return writePollers[index]; 563 } 564 565 @Override 566 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException { 567 Poller poller = (event == Net.POLLIN) 568 ? readPoller(fdVal) 569 : writePoller(fdVal); 570 poller.poll(fdVal, nanos, isOpen); 571 } 572 573 @Override 574 void pollSelector(int fdVal, long nanos) throws IOException { 575 masterPoller.poll(fdVal, nanos, () -> true); 576 } 577 578 @Override 579 Poller masterPoller() { 580 return masterPoller; 581 } 582 583 @Override 584 List<Poller> readPollers() { 585 return List.of(readPollers); 586 } 587 588 @Override 589 List<Poller> writePollers() { 590 return List.of(writePollers); 591 } 592 } 593 594 /** 595 * POLLER_PER_CARRIER poller group. The read poller is a per-carrier virtual thread. 596 * When a virtual thread polls a file descriptor for POLLIN, then it will use (almost 597 * always, not guaranteed) the read poller for its carrier. When a read poller needs 598 * to block then it registers with a system-wide "master poller" that runs in a 599 * dedicated platform thread. The read poller terminates if the carrier terminates. 600 * The write pollers are system-wide platform threads (usually one). 601 */ 602 private static class PollerPerCarrierPollerGroup extends PollerGroup { 603 private record CarrierPoller(PollerPerCarrierPollerGroup group, Poller readPoller) { } 604 private static final TerminatingThreadLocal<CarrierPoller> CARRIER_POLLER = 605 new TerminatingThreadLocal<>() { 606 @Override 607 protected void threadTerminated(CarrierPoller carrierPoller) { 608 Poller readPoller = carrierPoller.readPoller(); 609 carrierPoller.group().carrierTerminated(readPoller); 610 } 611 }; 612 613 private final Poller masterPoller; 614 private final Set<Poller> readPollers; 615 private final Poller[] writePollers; 616 617 /** 618 * Create a PollerPerCarrierPollerGroup with the given number of write pollers. 619 */ 620 PollerPerCarrierPollerGroup(PollerProvider provider, 621 int writePollerCount) throws IOException { 622 super(provider); 623 Poller masterPoller = provider.readPoller(false); 624 Poller[] writePollers = new Poller[writePollerCount]; 625 try { 626 for (int i = 0; i < writePollerCount; i++) { 627 writePollers[i] = provider.writePoller(false); 628 } 629 } catch (Throwable e) { 630 masterPoller.close(); 631 closeAll(writePollers); 632 throw e; 633 } 634 this.masterPoller = masterPoller; 635 this.readPollers = ConcurrentHashMap.newKeySet();; 636 this.writePollers = writePollers; 637 } 638 639 @Override 640 void start() { 641 startPlatformThread("Master-Poller", masterPoller::pollerLoop); 642 Arrays.stream(writePollers).forEach(p -> { 643 startPlatformThread("Write-Poller", p::pollerLoop); 644 }); 645 } 646 647 private Poller writePoller(int fdVal) { 648 int index = provider().fdValToIndex(fdVal, writePollers.length); 649 return writePollers[index]; 650 } 651 652 /** 653 * Starts a read sub-poller in a virtual thread. 654 */ 655 private Poller startReadPoller() throws IOException { 656 assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported(); 657 658 // create read sub-poller 659 Poller readPoller = provider().readPoller(true); 660 readPollers.add(readPoller); 661 662 // start virtual thread to execute sub-polling loop 663 Thread carrier = JLA.currentCarrierThread(); 664 var scheduler = JLA.virtualThreadScheduler(Thread.currentThread()); 665 @SuppressWarnings("restricted") 666 var _ = Thread.ofVirtual() 667 .scheduler(scheduler) 668 .inheritInheritableThreadLocals(false) 669 .name(carrier.getName() + "-Read-Poller") 670 .uncaughtExceptionHandler((_, e) -> e.printStackTrace()) 671 .start(() -> subPollerLoop(readPoller)); 672 return readPoller; 673 } 674 675 /** 676 * Returns the read poller for the current carrier, starting it if required. 677 */ 678 private Poller readPoller() throws IOException { 679 assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported(); 680 Continuation.pin(); 681 try { 682 CarrierPoller carrierPoller = CARRIER_POLLER.get(); 683 if (carrierPoller != null) { 684 return carrierPoller.readPoller(); 685 } else { 686 // first poll on this carrier will start poller 687 Poller readPoller = startReadPoller(); 688 CARRIER_POLLER.set(new CarrierPoller(this, readPoller)); 689 return readPoller; 690 } 691 } finally { 692 Continuation.unpin(); 693 } 694 } 695 696 @Override 697 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException { 698 // for POLLIN, get the read poller for this carrier 699 if (event == Net.POLLIN 700 && Thread.currentThread().isVirtual() 701 && ContinuationSupport.isSupported()) { 702 readPoller().poll(fdVal, nanos, isOpen); 703 return; 704 } 705 706 // -XX:-VMContinuations or POLLIN from platform thread does master poller 707 if (event == Net.POLLIN) { 708 masterPoller.poll(fdVal, nanos, isOpen); 709 } else { 710 writePoller(fdVal).poll(fdVal, nanos, isOpen); 711 } 712 } 713 714 @Override 715 void pollSelector(int fdVal, long nanos) throws IOException { 716 masterPoller.poll(fdVal, nanos, () -> true); 717 } 718 719 /** 720 * Sub-poller polling loop. 721 */ 722 private void subPollerLoop(Poller readPoller) { 723 try { 724 readPoller.subPollerLoop(masterPoller); 725 } finally { 726 // wakeup all threads waiting on file descriptors registered with the 727 // read poller, these I/O operation will migrate to another carrier. 728 readPoller.wakeupAll(); 729 730 // remove from serviceability view 731 readPollers.remove(readPoller); 732 } 733 } 734 735 /** 736 * Invoked by the carrier thread before it terminates. 737 */ 738 private void carrierTerminated(Poller readPoller) { 739 readPoller.setShutdown(); 740 try { 741 readPoller.wakeupPoller(); 742 } catch (Throwable e) { 743 e.printStackTrace(); 744 } 745 } 746 747 @Override 748 Poller masterPoller() { 749 return masterPoller; 750 } 751 752 @Override 753 List<Poller> readPollers() { 754 return readPollers.stream().toList(); 755 } 756 757 @Override 758 List<Poller> writePollers() { 759 return List.of(writePollers); 760 } 761 } 762 763 /** 764 * Reads the given property name to get the poller count. If the property is 765 * set then the value must be a power of 2. Returns 1 if the property is not 766 * set. 767 * @throws IllegalArgumentException if the property is set to a value that 768 * is not a power of 2. 769 */ 770 private static int pollerCount(String propName, int defaultCount) { 771 String s = System.getProperty(propName); 772 int count = (s != null) ? Integer.parseInt(s) : defaultCount; 773 774 // check power of 2 775 if (count != Integer.highestOneBit(count)) { 776 String msg = propName + " is set to a value that is not a power of 2"; 777 throw new IllegalArgumentException(msg); 778 } 779 return count; 780 } 781 782 /** 783 * Return the master poller or null if there is no master poller. 784 */ 785 public static Poller masterPoller() { 786 return POLLER_GROUP.masterPoller(); 787 } 788 789 /** 790 * Return the list of read pollers. 791 */ 792 public static List<Poller> readPollers() { 793 return POLLER_GROUP.readPollers(); 794 } 795 796 /** 797 * Return the list of write pollers. 798 */ 799 public static List<Poller> writePollers() { 800 return POLLER_GROUP.writePollers(); 801 } 802 }