1 /*
2 * Copyright (c) 2017, 2025, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package sun.nio.ch;
26
27 import java.io.IOException;
28 import java.io.UncheckedIOException;
29 import java.lang.ref.Reference;
30 import java.util.Arrays;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Objects;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.Executor;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.ThreadFactory;
39 import java.util.concurrent.locks.LockSupport;
40 import java.util.function.BooleanSupplier;
41 import jdk.internal.access.JavaLangAccess;
42 import jdk.internal.access.SharedSecrets;
43 import jdk.internal.misc.InnocuousThread;
44 import jdk.internal.misc.TerminatingThreadLocal;
45 import jdk.internal.vm.Continuation;
46 import jdk.internal.vm.ContinuationSupport;
47 import jdk.internal.vm.annotation.Stable;
48
49 /**
50 * I/O poller to allow virtual threads park until a file descriptor is ready for I/O.
51 */
52 public abstract class Poller {
53 private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
54
55 // the poller group for the I/O pollers and poller threads
56 private static final PollerGroup POLLER_GROUP = createPollerGroup();
57
58 // the poller or sub-poller thread (used for observability only)
59 private @Stable Thread owner;
60
61 // maps file descriptors to parked Thread
62 private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
63
64 // shutdown (if supported by poller group)
65 private volatile boolean shutdown;
66
67 /**
68 * Poller mode.
69 */
70 enum Mode {
71 /**
72 * Read and write pollers are platform threads that block waiting for events and
73 * unpark virtual threads when file descriptors are ready for I/O.
74 */
75 SYSTEM_THREADS,
76
77 /**
78 * Read and write pollers are virtual threads that poll for events, yielding
79 * between polls and unparking virtual threads when file descriptors are
80 * ready for I/O. If there are no events then the poller threads park until there
81 * are I/O events to poll. This mode helps to integrate polling with virtual
82 * thread scheduling. The approach is similar to the default scheme in "User-level
83 * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
84 * (https://dl.acm.org/doi/10.1145/3379483).
85 */
86 VTHREAD_POLLERS,
87
88 /**
89 * Read pollers are per-carrier virtual threads that poll for events, yielding
90 * between polls and unparking virtual threads when file descriptors are ready
91 * for I/O. If there are no events then the poller threads park until there
92 * are I/O events to poll. The write poller is a system-wide platform thread.
93 */
94 POLLER_PER_CARRIER
95 }
96
97 /**
98 * Create and return the PollerGroup.
99 */
100 private static PollerGroup createPollerGroup() {
101 try {
102 PollerProvider provider;
103 if (System.getProperty("jdk.pollerMode") instanceof String s) {
104 Mode mode = switch (s) {
105 case "1" -> Mode.SYSTEM_THREADS;
106 case "2" -> Mode.VTHREAD_POLLERS;
107 case "3" -> Mode.POLLER_PER_CARRIER;
108 default -> {
109 throw new RuntimeException(s + " is not a valid polling mode");
110 }
111 };
112 provider = PollerProvider.createProvider(mode);
113 } else {
114 provider = PollerProvider.createProvider();
115 }
116
117 int readPollers = pollerCount("jdk.readPollers", provider.defaultReadPollers());
118 int writePollers = pollerCount("jdk.writePollers", provider.defaultWritePollers());
119 PollerGroup group = switch (provider.pollerMode()) {
120 case SYSTEM_THREADS -> new SystemThreadsPollerGroup(provider, readPollers, writePollers);
121 case VTHREAD_POLLERS -> new VThreadsPollerGroup(provider, readPollers, writePollers);
122 case POLLER_PER_CARRIER -> new PollerPerCarrierPollerGroup(provider, writePollers);
123 };
124 group.start();
125 return group;
126 } catch (IOException ioe) {
127 throw new UncheckedIOException(ioe);
128 }
129 }
130
131 /**
132 * Initialize a Poller.
133 */
134 protected Poller() {
135 }
136
137 /**
138 * Closes the poller and release resources. This method can only be used to cleanup
139 * when creating a poller group fails.
140 */
141 abstract void close() throws IOException;
142
143 /**
144 * Sets the poller's thread owner.
145 */
146 private void setOwner() {
147 owner = Thread.currentThread();
148 }
149
150 /**
151 * Returns true if this poller is marked for shutdown.
152 */
153 boolean isShutdown() {
154 return shutdown;
155 }
156
157 /**
158 * Marks this poller for shutdown.
159 */
160 private void setShutdown() {
161 shutdown = true;
162 }
163
164 /**
165 * Returns the poller's file descriptor to use when polling with the master poller.
166 * @throws UnsupportedOperationException if not supported
167 */
168 int fdVal() {
169 throw new UnsupportedOperationException();
170 }
171
172 /**
173 * Register the file descriptor with the I/O event management facility so that it is
174 * polled when the file descriptor is ready for I/O. The registration is "one shot",
175 * meaning it should be polled at most once.
176 */
177 abstract void implStartPoll(int fdVal) throws IOException;
178
179 /**
180 * Deregister a file descriptor from the I/O event management facility. This may be
181 * a no-op in some implementations when the file descriptor has already been polled.
182 * @param polled true if the file descriptor has already been polled
183 */
184 abstract void implStopPoll(int fdVal, boolean polled) throws IOException;
185
186 /**
187 * Poll for events. The {@link #polled(int)} method is invoked for each
188 * polled file descriptor.
189 *
190 * @param timeout if positive then block for up to {@code timeout} milliseconds,
191 * if zero then don't block, if -1 then block indefinitely
192 * @return >0 if file descriptors are polled, 0 if no file descriptor polled
193 */
194 abstract int poll(int timeout) throws IOException;
195
196 /**
197 * Wakeup the poller thread if blocked in poll so it can shutdown.
198 * @throws UnsupportedOperationException if not supported
199 */
200 void wakeupPoller() throws IOException {
201 throw new UnsupportedOperationException();
202 }
203
204 /**
205 * Callback by the poll method when a file descriptor is polled.
206 */
207 final void polled(int fdVal) {
208 Thread t = map.remove(fdVal);
209 if (t != null) {
210 LockSupport.unpark(t);
211 }
212 }
213
214 /**
215 * Parks the current thread until a file descriptor is ready for the given op.
216 * @param fdVal the file descriptor
217 * @param event POLLIN or POLLOUT
218 * @param nanos the waiting time or 0 to wait indefinitely
219 * @param isOpen supplies a boolean to indicate if the enclosing object is open
220 */
221 public static void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
222 POLLER_GROUP.poll(fdVal, event, nanos, isOpen);
223 }
224
225 /**
226 * Parks the current thread until a Selector's file descriptor is ready.
227 * @param fdVal the Selector's file descriptor
228 * @param nanos the waiting time or 0 to wait indefinitely
229 */
230 public static void pollSelector(int fdVal, long nanos) throws IOException {
231 POLLER_GROUP.pollSelector(fdVal, nanos);
232 }
233
234 /**
235 * Unpark the given thread so that it stops polling.
236 */
237 public static void stopPoll(Thread thread) {
238 LockSupport.unpark(thread);
239 }
240
241 /**
242 * Parks the current thread until a file descriptor is ready.
243 */
244 private void poll(int fdVal, long nanos, BooleanSupplier isOpen) throws IOException {
245 startPoll(fdVal);
246 try {
247 if (isOpen.getAsBoolean() && !isShutdown()) {
248 if (nanos > 0) {
249 LockSupport.parkNanos(nanos);
250 } else {
251 LockSupport.park();
252 }
253 }
254 } finally {
255 stopPoll(fdVal);
256 }
257 }
258
259 /**
260 * Register a file descriptor with the I/O event management facility so that it is
261 * polled when the file descriptor is ready for I/O.
262 */
263 private void startPoll(int fdVal) throws IOException {
264 Thread previous = map.put(fdVal, Thread.currentThread());
265 assert previous == null;
266 try {
267 implStartPoll(fdVal);
268 } catch (Throwable t) {
269 map.remove(fdVal);
270 throw t;
271 } finally {
272 Reference.reachabilityFence(this);
273 }
274 }
275
276 /**
277 * Deregister a file descriptor from the I/O event management facility.
278 */
279 private void stopPoll(int fdVal) throws IOException {
280 Thread previous = map.remove(fdVal);
281 boolean polled = (previous == null);
282 assert polled || previous == Thread.currentThread();
283 try {
284 implStopPoll(fdVal, polled);
285 } finally {
286 Reference.reachabilityFence(this);
287 }
288 }
289
290 /**
291 * Master polling loop. The {@link #polled(int)} method is invoked for each file
292 * descriptor that is polled.
293 */
294 private void pollerLoop() {
295 setOwner();
296 try {
297 while (!isShutdown()) {
298 poll(-1);
299 }
300 } catch (Exception e) {
301 e.printStackTrace();
302 }
303 }
304
305 /**
306 * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
307 * descriptor that is polled.
308 *
309 * The sub-poller registers its file descriptor with the master poller to park until
310 * there are events to poll. When unparked, it does non-blocking polls and parks
311 * again when there are no more events. The sub-poller yields after each poll to help
312 * with fairness and to avoid re-registering with the master poller where possible.
313 */
314 private void subPollerLoop(Poller masterPoller) {
315 assert Thread.currentThread().isVirtual();
316 setOwner();
317 try {
318 int polled = 0;
319 while (!isShutdown()) {
320 if (polled == 0) {
321 masterPoller.poll(fdVal(), 0, () -> true); // park
322 } else {
323 Thread.yield();
324 }
325 polled = poll(0);
326 }
327 } catch (Exception e) {
328 e.printStackTrace();
329 }
330 }
331
332 /**
333 * Unparks all threads waiting on a file descriptor registered with this poller.
334 */
335 private void wakeupAll() {
336 map.values().forEach(LockSupport::unpark);
337 }
338
339 @Override
340 public String toString() {
341 return String.format("%s [registered = %d, owner = %s]",
342 Objects.toIdentityString(this), map.size(), owner);
343 }
344
345 /**
346 * A group of poller threads that support virtual threads polling file descriptors.
347 */
348 private static abstract class PollerGroup {
349 private final PollerProvider provider;
350
351 PollerGroup(PollerProvider provider) {
352 this.provider = provider;
353 }
354
355 final PollerProvider provider() {
356 return provider;
357 }
358
359 /**
360 * Starts the poller group and any system-wide poller threads.
361 */
362 abstract void start();
363
364 /**
365 * Parks the current thread until a file descriptor is ready for the given op.
366 */
367 abstract void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException;
368
369 /**
370 * Parks the current thread until a Selector's file descriptor is ready.
371 */
372 void pollSelector(int fdVal, long nanos) throws IOException {
373 poll(fdVal, Net.POLLIN, nanos, () -> true);
374 }
375
376 /**
377 * Starts a platform thread to run the given task.
378 */
379 protected final void startPlatformThread(String name, Runnable task) {
380 Thread thread = InnocuousThread.newSystemThread(name, task);
381 thread.setDaemon(true);
382 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
383 thread.start();
384 }
385
386 /**
387 * Return the master poller, or null if no master poller.
388 */
389 abstract Poller masterPoller();
390
391 /**
392 * Return the read pollers.
393 */
394 abstract List<Poller> readPollers();
395
396 /**
397 * Return the write pollers.
398 */
399 abstract List<Poller> writePollers();
400
401 /**
402 * Close the given pollers.
403 */
404 static void closeAll(Poller... pollers) {
405 for (Poller poller : pollers) {
406 if (poller != null) {
407 try {
408 poller.close();
409 } catch (IOException _) { }
410 }
411 }
412 }
413 }
414
415 /**
416 * SYSTEM_THREADS poller group. The read and write pollers are system-wide platform threads.
417 */
418 private static class SystemThreadsPollerGroup extends PollerGroup {
419 // system-wide read and write pollers
420 private final Poller[] readPollers;
421 private final Poller[] writePollers;
422
423 SystemThreadsPollerGroup(PollerProvider provider,
424 int readPollerCount,
425 int writePollerCount) throws IOException {
426 super(provider);
427 Poller[] readPollers = new Poller[readPollerCount];
428 Poller[] writePollers = new Poller[writePollerCount];
429 try {
430 for (int i = 0; i < readPollerCount; i++) {
431 readPollers[i] = provider.readPoller(false);
432 }
433 for (int i = 0; i < writePollerCount; i++) {
434 writePollers[i] = provider.writePoller(false);
435 }
436 } catch (Throwable e) {
437 closeAll(readPollers);
438 closeAll(writePollers);
439 throw e;
440 }
441
442 this.readPollers = readPollers;
443 this.writePollers = writePollers;
444 }
445
446 @Override
447 void start() {
448 Arrays.stream(readPollers).forEach(p -> {
449 startPlatformThread("Read-Poller", p::pollerLoop);
450 });
451 Arrays.stream(writePollers).forEach(p -> {
452 startPlatformThread("Write-Poller", p::pollerLoop);
453 });
454 }
455
456 private Poller readPoller(int fdVal) {
457 int index = provider().fdValToIndex(fdVal, readPollers.length);
458 return readPollers[index];
459 }
460
461 private Poller writePoller(int fdVal) {
462 int index = provider().fdValToIndex(fdVal, writePollers.length);
463 return writePollers[index];
464 }
465
466 @Override
467 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
468 Poller poller = (event == Net.POLLIN)
469 ? readPoller(fdVal)
470 : writePoller(fdVal);
471 poller.poll(fdVal, nanos, isOpen);
472 }
473
474 @Override
475 Poller masterPoller() {
476 return null;
477 }
478
479 @Override
480 List<Poller> readPollers() {
481 return List.of(readPollers);
482 }
483
484 @Override
485 List<Poller> writePollers() {
486 return List.of(writePollers);
487 }
488 }
489
490 /**
491 * VTHREAD_POLLERS poller group. The read and write pollers are virtual threads.
492 * When read and write pollers need to block then they register with a system-wide
493 * "master poller" that runs in a dedicated platform thread.
494 */
495 private static class VThreadsPollerGroup extends PollerGroup {
496 private final Poller masterPoller;
497 private final Poller[] readPollers;
498 private final Poller[] writePollers;
499
500 // keep virtual thread pollers alive
501 private final Executor executor;
502
503 VThreadsPollerGroup(PollerProvider provider,
504 int readPollerCount,
505 int writePollerCount) throws IOException {
506 super(provider);
507 Poller masterPoller = provider.readPoller(false);
508 Poller[] readPollers = new Poller[readPollerCount];
509 Poller[] writePollers = new Poller[writePollerCount];
510
511 try {
512 for (int i = 0; i < readPollerCount; i++) {
513 readPollers[i] = provider.readPoller(true);
514 }
515 for (int i = 0; i < writePollerCount; i++) {
516 writePollers[i] = provider.writePoller(true);
517 }
518 } catch (Throwable e) {
519 masterPoller.close();
520 closeAll(readPollers);
521 closeAll(writePollers);
522 throw e;
523 }
524
525 this.masterPoller = masterPoller;
526 this.readPollers = readPollers;
527 this.writePollers = writePollers;
528
529 ThreadFactory factory = Thread.ofVirtual()
530 .inheritInheritableThreadLocals(false)
531 .name("SubPoller-", 0)
532 .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
533 .factory();
534 this.executor = Executors.newThreadPerTaskExecutor(factory);
535 }
536
537 @Override
538 void start() {
539 startPlatformThread("Master-Poller", masterPoller::pollerLoop);
540 Arrays.stream(readPollers).forEach(p -> {
541 executor.execute(() -> p.subPollerLoop(masterPoller));
542 });
543 Arrays.stream(writePollers).forEach(p -> {
544 executor.execute(() -> p.subPollerLoop(masterPoller));
545 });
546 }
547
548 private Poller readPoller(int fdVal) {
549 int index = provider().fdValToIndex(fdVal, readPollers.length);
550 return readPollers[index];
551 }
552
553 private Poller writePoller(int fdVal) {
554 int index = provider().fdValToIndex(fdVal, writePollers.length);
555 return writePollers[index];
556 }
557
558 @Override
559 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
560 Poller poller = (event == Net.POLLIN)
561 ? readPoller(fdVal)
562 : writePoller(fdVal);
563 poller.poll(fdVal, nanos, isOpen);
564 }
565
566 @Override
567 void pollSelector(int fdVal, long nanos) throws IOException {
568 masterPoller.poll(fdVal, nanos, () -> true);
569 }
570
571 @Override
572 Poller masterPoller() {
573 return masterPoller;
574 }
575
576 @Override
577 List<Poller> readPollers() {
578 return List.of(readPollers);
579 }
580
581 @Override
582 List<Poller> writePollers() {
583 return List.of(writePollers);
584 }
585 }
586
587 /**
588 * POLLER_PER_CARRIER poller group. The read poller is a per-carrier virtual thread.
589 * When a virtual thread polls a file descriptor for POLLIN, then it will use (almost
590 * always, not guaranteed) the read poller for its carrier. When a read poller needs
591 * to block then it registers with a system-wide "master poller" that runs in a
592 * dedicated platform thread. The read poller terminates if the carrier terminates.
593 * The write pollers are system-wide platform threads (usually one).
594 */
595 private static class PollerPerCarrierPollerGroup extends PollerGroup {
596 private record CarrierPoller(PollerPerCarrierPollerGroup group, Poller readPoller) { }
597 private static final TerminatingThreadLocal<CarrierPoller> CARRIER_POLLER =
598 new TerminatingThreadLocal<>() {
599 @Override
600 protected void threadTerminated(CarrierPoller carrierPoller) {
601 Poller readPoller = carrierPoller.readPoller();
602 carrierPoller.group().carrierTerminated(readPoller);
603 }
604 };
605
606 private final Poller masterPoller;
607 private final Set<Poller> readPollers;
608 private final Poller[] writePollers;
609
610 /**
611 * Create a PollerPerCarrierPollerGroup with the given number of write pollers.
612 */
613 PollerPerCarrierPollerGroup(PollerProvider provider,
614 int writePollerCount) throws IOException {
615 super(provider);
616 Poller masterPoller = provider.readPoller(false);
617 Poller[] writePollers = new Poller[writePollerCount];
618 try {
619 for (int i = 0; i < writePollerCount; i++) {
620 writePollers[i] = provider.writePoller(false);
621 }
622 } catch (Throwable e) {
623 masterPoller.close();
624 closeAll(writePollers);
625 throw e;
626 }
627 this.masterPoller = masterPoller;
628 this.readPollers = ConcurrentHashMap.newKeySet();;
629 this.writePollers = writePollers;
630 }
631
632 @Override
633 void start() {
634 startPlatformThread("Master-Poller", masterPoller::pollerLoop);
635 Arrays.stream(writePollers).forEach(p -> {
636 startPlatformThread("Write-Poller", p::pollerLoop);
637 });
638 }
639
640 private Poller writePoller(int fdVal) {
641 int index = provider().fdValToIndex(fdVal, writePollers.length);
642 return writePollers[index];
643 }
644
645 /**
646 * Starts a read sub-poller in a virtual thread.
647 */
648 private Poller startReadPoller() throws IOException {
649 assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
650
651 // create read sub-poller
652 Poller readPoller = provider().readPoller(true);
653 readPollers.add(readPoller);
654
655 // start virtual thread to execute sub-polling loop
656 Thread carrier = JLA.currentCarrierThread();
657 Thread.ofVirtual()
658 .inheritInheritableThreadLocals(false)
659 .name(carrier.getName() + "-Read-Poller")
660 .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
661 .start(() -> subPollerLoop(readPoller));
662 return readPoller;
663 }
664
665 /**
666 * Returns the read poller for the current carrier, starting it if required.
667 */
668 private Poller readPoller() throws IOException {
669 assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
670 Continuation.pin();
671 try {
672 CarrierPoller carrierPoller = CARRIER_POLLER.get();
673 if (carrierPoller != null) {
674 return carrierPoller.readPoller();
675 } else {
676 // first poll on this carrier will start poller
677 Poller readPoller = startReadPoller();
678 CARRIER_POLLER.set(new CarrierPoller(this, readPoller));
679 return readPoller;
680 }
681 } finally {
682 Continuation.unpin();
683 }
684 }
685
686 @Override
687 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
688 // for POLLIN, get the read poller for this carrier
689 if (event == Net.POLLIN
690 && Thread.currentThread().isVirtual()
691 && ContinuationSupport.isSupported()) {
692 readPoller().poll(fdVal, nanos, isOpen);
693 return;
694 }
695
696 // -XX:-VMContinuations or POLLIN from platform thread does master poller
697 if (event == Net.POLLIN) {
698 masterPoller.poll(fdVal, nanos, isOpen);
699 } else {
700 writePoller(fdVal).poll(fdVal, nanos, isOpen);
701 }
702 }
703
704 @Override
705 void pollSelector(int fdVal, long nanos) throws IOException {
706 masterPoller.poll(fdVal, nanos, () -> true);
707 }
708
709 /**
710 * Sub-poller polling loop.
711 */
712 private void subPollerLoop(Poller readPoller) {
713 try {
714 readPoller.subPollerLoop(masterPoller);
715 } finally {
716 // wakeup all threads waiting on file descriptors registered with the
717 // read poller, these I/O operation will migrate to another carrier.
718 readPoller.wakeupAll();
719
720 // remove from serviceability view
721 readPollers.remove(readPoller);
722 }
723 }
724
725 /**
726 * Invoked by the carrier thread before it terminates.
727 */
728 private void carrierTerminated(Poller readPoller) {
729 readPoller.setShutdown();
730 try {
731 readPoller.wakeupPoller();
732 } catch (Throwable e) {
733 e.printStackTrace();
734 }
735 }
736
737 @Override
738 Poller masterPoller() {
739 return masterPoller;
740 }
741
742 @Override
743 List<Poller> readPollers() {
744 return readPollers.stream().toList();
745 }
746
747 @Override
748 List<Poller> writePollers() {
749 return List.of(writePollers);
750 }
751 }
752
753 /**
754 * Reads the given property name to get the poller count. If the property is
755 * set then the value must be a power of 2. Returns 1 if the property is not
756 * set.
757 * @throws IllegalArgumentException if the property is set to a value that
758 * is not a power of 2.
759 */
760 private static int pollerCount(String propName, int defaultCount) {
761 String s = System.getProperty(propName);
762 int count = (s != null) ? Integer.parseInt(s) : defaultCount;
763
764 // check power of 2
765 if (count != Integer.highestOneBit(count)) {
766 String msg = propName + " is set to a value that is not a power of 2";
767 throw new IllegalArgumentException(msg);
768 }
769 return count;
770 }
771
772 /**
773 * Return the master poller or null if there is no master poller.
774 */
775 public static Poller masterPoller() {
776 return POLLER_GROUP.masterPoller();
777 }
778
779 /**
780 * Return the list of read pollers.
781 */
782 public static List<Poller> readPollers() {
783 return POLLER_GROUP.readPollers();
784 }
785
786 /**
787 * Return the list of write pollers.
788 */
789 public static List<Poller> writePollers() {
790 return POLLER_GROUP.writePollers();
791 }
792 }