1 /*
  2  * Copyright (c) 2017, 2025, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package sun.nio.ch;
 26 
 27 import java.io.IOException;
 28 import java.io.UncheckedIOException;
 29 import java.lang.ref.Reference;
 30 import java.util.Arrays;
 31 import java.util.List;
 32 import java.util.Map;
 33 import java.util.Objects;
 34 import java.util.Set;
 35 import java.util.concurrent.ConcurrentHashMap;
 36 import java.util.concurrent.Executor;
 37 import java.util.concurrent.Executors;
 38 import java.util.concurrent.ThreadFactory;
 39 import java.util.concurrent.locks.LockSupport;
 40 import java.util.function.BooleanSupplier;
 41 import jdk.internal.access.JavaLangAccess;
 42 import jdk.internal.access.SharedSecrets;
 43 import jdk.internal.misc.InnocuousThread;
 44 import jdk.internal.misc.TerminatingThreadLocal;
 45 import jdk.internal.vm.Continuation;
 46 import jdk.internal.vm.ContinuationSupport;
 47 import jdk.internal.vm.annotation.Stable;
 48 
 49 /**
 50  * I/O poller to allow virtual threads park until a file descriptor is ready for I/O.
 51  */
 52 public abstract class Poller {
 53     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 54 
 55     // the poller group for the I/O pollers and poller threads
 56     private static final PollerGroup POLLER_GROUP = createPollerGroup();
 57 
 58     // the poller or sub-poller thread (used for observability only)
 59     private @Stable Thread owner;
 60 
 61     // maps file descriptors to parked Thread
 62     private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
 63 
 64     // shutdown (if supported by poller group)
 65     private volatile boolean shutdown;
 66 
 67     /**
 68      * Poller mode.
 69      */
 70     enum Mode {
 71         /**
 72          * Read and write pollers are platform threads that block waiting for events and
 73          * unpark virtual threads when file descriptors are ready for I/O.
 74          */
 75         SYSTEM_THREADS,
 76 
 77         /**
 78          * Read and write pollers are virtual threads that poll for events, yielding
 79          * between polls and unparking virtual threads when file descriptors are
 80          * ready for I/O. If there are no events then the poller threads park until there
 81          * are I/O events to poll. This mode helps to integrate polling with virtual
 82          * thread scheduling. The approach is similar to the default scheme in "User-level
 83          * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
 84          * (https://dl.acm.org/doi/10.1145/3379483).
 85          */
 86         VTHREAD_POLLERS,
 87 
 88         /**
 89          * Read pollers are per-carrier virtual threads that poll for events, yielding
 90          * between polls and unparking virtual threads when file descriptors are ready
 91          * for I/O. If there are no events then the poller threads park until there
 92          * are I/O events to poll. The write poller is a system-wide platform thread.
 93          */
 94         POLLER_PER_CARRIER
 95     }
 96 
 97     /**
 98      * Create and return the PollerGroup.
 99      */
100     private static PollerGroup createPollerGroup() {
101         try {
102             PollerProvider provider;
103             if (System.getProperty("jdk.pollerMode") instanceof String s) {
104                 Mode mode = switch (s) {
105                     case "1" -> Mode.SYSTEM_THREADS;
106                     case "2" -> Mode.VTHREAD_POLLERS;
107                     case "3" -> Mode.POLLER_PER_CARRIER;
108                     default -> {
109                         throw new RuntimeException(s + " is not a valid polling mode");
110                     }
111                 };
112                 provider = PollerProvider.createProvider(mode);
113             } else {
114                 provider = PollerProvider.createProvider();
115             }
116 
117             int readPollers = pollerCount("jdk.readPollers", provider.defaultReadPollers());
118             int writePollers = pollerCount("jdk.writePollers", provider.defaultWritePollers());
119             PollerGroup group = switch (provider.pollerMode()) {
120                 case SYSTEM_THREADS     -> new SystemThreadsPollerGroup(provider, readPollers, writePollers);
121                 case VTHREAD_POLLERS    -> new VThreadsPollerGroup(provider, readPollers, writePollers);
122                 case POLLER_PER_CARRIER -> new PollerPerCarrierPollerGroup(provider, writePollers);
123             };
124             group.start();
125             return group;
126         } catch (IOException ioe) {
127             throw new UncheckedIOException(ioe);
128         }
129     }
130 
131     /**
132      * Initialize a Poller.
133      */
134     protected Poller() {
135     }
136 
137     /**
138      * Closes the poller and release resources. This method can only be used to cleanup
139      * when creating a poller group fails.
140      */
141     abstract void close() throws IOException;
142 
143     /**
144      * Sets the poller's thread owner.
145      */
146     private void setOwner() {
147         owner = Thread.currentThread();
148     }
149 
150     /**
151      * Returns true if this poller is marked for shutdown.
152      */
153     boolean isShutdown() {
154         return shutdown;
155     }
156 
157     /**
158      * Marks this poller for shutdown.
159      */
160     private void setShutdown() {
161         shutdown = true;
162     }
163 
164     /**
165      * Returns the poller's file descriptor to use when polling with the master poller.
166      * @throws UnsupportedOperationException if not supported
167      */
168     int fdVal() {
169         throw new UnsupportedOperationException();
170     }
171 
172     /**
173      * Register the file descriptor with the I/O event management facility so that it is
174      * polled when the file descriptor is ready for I/O. The registration is "one shot",
175      * meaning it should be polled at most once.
176      */
177     abstract void implStartPoll(int fdVal) throws IOException;
178 
179     /**
180      * Deregister a file descriptor from the I/O event management facility. This may be
181      * a no-op in some implementations when the file descriptor has already been polled.
182      * @param polled true if the file descriptor has already been polled
183      */
184     abstract void implStopPoll(int fdVal, boolean polled) throws IOException;
185 
186     /**
187      * Poll for events. The {@link #polled(int)} method is invoked for each
188      * polled file descriptor.
189      *
190      * @param timeout if positive then block for up to {@code timeout} milliseconds,
191      *     if zero then don't block, if -1 then block indefinitely
192      * @return >0 if file descriptors are polled, 0 if no file descriptor polled
193      */
194     abstract int poll(int timeout) throws IOException;
195 
196     /**
197      * Wakeup the poller thread if blocked in poll so it can shutdown.
198      * @throws UnsupportedOperationException if not supported
199      */
200     void wakeupPoller() throws IOException {
201         throw new UnsupportedOperationException();
202     }
203 
204     /**
205      * Callback by the poll method when a file descriptor is polled.
206      */
207     final void polled(int fdVal) {
208         Thread t = map.remove(fdVal);
209         if (t != null) {
210             LockSupport.unpark(t);
211         }
212     }
213 
214     /**
215      * Parks the current thread until a file descriptor is ready for the given op.
216      * @param fdVal the file descriptor
217      * @param event POLLIN or POLLOUT
218      * @param nanos the waiting time or 0 to wait indefinitely
219      * @param isOpen supplies a boolean to indicate if the enclosing object is open
220      */
221     public static void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
222         POLLER_GROUP.poll(fdVal, event, nanos, isOpen);
223     }
224 
225     /**
226      * Parks the current thread until a Selector's file descriptor is ready.
227      * @param fdVal the Selector's file descriptor
228      * @param nanos the waiting time or 0 to wait indefinitely
229      */
230     public static void pollSelector(int fdVal, long nanos) throws IOException {
231         POLLER_GROUP.pollSelector(fdVal, nanos);
232     }
233 
234     /**
235      * Unpark the given thread so that it stops polling.
236      */
237     public static void stopPoll(Thread thread) {
238         LockSupport.unpark(thread);
239     }
240 
241     /**
242      * Parks the current thread until a file descriptor is ready.
243      */
244     private void poll(int fdVal, long nanos, BooleanSupplier isOpen) throws IOException {
245         startPoll(fdVal);
246         try {
247             if (isOpen.getAsBoolean() && !isShutdown()) {
248                 if (nanos > 0) {
249                     LockSupport.parkNanos(nanos);
250                 } else {
251                     LockSupport.park();
252                 }
253             }
254         } finally {
255             stopPoll(fdVal);
256         }
257     }
258 
259     /**
260      * Register a file descriptor with the I/O event management facility so that it is
261      * polled when the file descriptor is ready for I/O.
262      */
263     private void startPoll(int fdVal) throws IOException {
264         Thread previous = map.put(fdVal, Thread.currentThread());
265         assert previous == null;
266         try {
267             implStartPoll(fdVal);
268         } catch (Throwable t) {
269             map.remove(fdVal);
270             throw t;
271         } finally {
272             Reference.reachabilityFence(this);
273         }
274     }
275 
276     /**
277      * Deregister a file descriptor from the I/O event management facility.
278      */
279     private void stopPoll(int fdVal) throws IOException {
280         Thread previous = map.remove(fdVal);
281         boolean polled = (previous == null);
282         assert polled || previous == Thread.currentThread();
283         try {
284             implStopPoll(fdVal, polled);
285         } finally {
286             Reference.reachabilityFence(this);
287         }
288     }
289 
290     /**
291      * Master polling loop. The {@link #polled(int)} method is invoked for each file
292      * descriptor that is polled.
293      */
294     private void pollerLoop() {
295         setOwner();
296         try {
297             while (!isShutdown()) {
298                 poll(-1);
299             }
300         } catch (Exception e) {
301             e.printStackTrace();
302         }
303     }
304 
305     /**
306      * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
307      * descriptor that is polled.
308      *
309      * The sub-poller registers its file descriptor with the master poller to park until
310      * there are events to poll. When unparked, it does non-blocking polls and parks
311      * again when there are no more events. The sub-poller yields after each poll to help
312      * with fairness and to avoid re-registering with the master poller where possible.
313      */
314     private void subPollerLoop(Poller masterPoller) {
315         assert Thread.currentThread().isVirtual();
316         setOwner();
317         try {
318             int polled = 0;
319             while (!isShutdown()) {
320                 if (polled == 0) {
321                     masterPoller.poll(fdVal(), 0, () -> true);  // park
322                 } else {
323                     Thread.yield();
324                 }
325                 polled = poll(0);
326             }
327         } catch (Exception e) {
328             e.printStackTrace();
329         }
330     }
331 
332     /**
333      * Unparks all threads waiting on a file descriptor registered with this poller.
334      */
335     private void wakeupAll() {
336         map.values().forEach(LockSupport::unpark);
337     }
338 
339     @Override
340     public String toString() {
341         return String.format("%s [registered = %d, owner = %s]",
342             Objects.toIdentityString(this), map.size(), owner);
343     }
344 
345     /**
346      * A group of poller threads that support virtual threads polling file descriptors.
347      */
348     private static abstract class PollerGroup {
349         private final PollerProvider provider;
350 
351         PollerGroup(PollerProvider provider) {
352             this.provider = provider;
353         }
354 
355         final PollerProvider provider() {
356             return provider;
357         }
358 
359         /**
360          * Starts the poller group and any system-wide poller threads.
361          */
362         abstract void start();
363 
364         /**
365          * Parks the current thread until a file descriptor is ready for the given op.
366          */
367         abstract void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException;
368 
369         /**
370          * Parks the current thread until a Selector's file descriptor is ready.
371          */
372         void pollSelector(int fdVal, long nanos) throws IOException {
373             poll(fdVal, Net.POLLIN, nanos, () -> true);
374         }
375 
376         /**
377          * Starts a platform thread to run the given task.
378          */
379         protected final void startPlatformThread(String name, Runnable task) {
380             Thread thread = InnocuousThread.newSystemThread(name, task);
381             thread.setDaemon(true);
382             thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
383             thread.start();
384         }
385 
386         /**
387          * Return the master poller, or null if no master poller.
388          */
389         abstract Poller masterPoller();
390 
391         /**
392          * Return the read pollers.
393          */
394         abstract List<Poller> readPollers();
395 
396         /**
397          * Return the write pollers.
398          */
399         abstract List<Poller> writePollers();
400 
401         /**
402          * Close the given pollers.
403          */
404         static void closeAll(Poller... pollers) {
405             for (Poller poller : pollers) {
406                 if (poller != null) {
407                     try {
408                         poller.close();
409                     } catch (IOException _) { }
410                 }
411             }
412         }
413     }
414 
415     /**
416      * SYSTEM_THREADS poller group. The read and write pollers are system-wide platform threads.
417      */
418     private static class SystemThreadsPollerGroup extends PollerGroup {
419         // system-wide read and write pollers
420         private final Poller[] readPollers;
421         private final Poller[] writePollers;
422 
423         SystemThreadsPollerGroup(PollerProvider provider,
424                                  int readPollerCount,
425                                  int writePollerCount) throws IOException {
426             super(provider);
427             Poller[] readPollers = new Poller[readPollerCount];
428             Poller[] writePollers = new Poller[writePollerCount];
429             try {
430                 for (int i = 0; i < readPollerCount; i++) {
431                     readPollers[i] = provider.readPoller(false);
432                 }
433                 for (int i = 0; i < writePollerCount; i++) {
434                     writePollers[i] = provider.writePoller(false);
435                 }
436             } catch (Throwable e) {
437                 closeAll(readPollers);
438                 closeAll(writePollers);
439                 throw e;
440             }
441 
442             this.readPollers = readPollers;
443             this.writePollers = writePollers;
444         }
445 
446         @Override
447         void start() {
448             Arrays.stream(readPollers).forEach(p -> {
449                 startPlatformThread("Read-Poller", p::pollerLoop);
450             });
451             Arrays.stream(writePollers).forEach(p -> {
452                 startPlatformThread("Write-Poller", p::pollerLoop);
453             });
454         }
455 
456         private Poller readPoller(int fdVal) {
457             int index = provider().fdValToIndex(fdVal, readPollers.length);
458             return readPollers[index];
459         }
460 
461         private Poller writePoller(int fdVal) {
462             int index = provider().fdValToIndex(fdVal, writePollers.length);
463             return writePollers[index];
464         }
465 
466         @Override
467         void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
468             Poller poller = (event == Net.POLLIN)
469                     ? readPoller(fdVal)
470                     : writePoller(fdVal);
471             poller.poll(fdVal, nanos, isOpen);
472         }
473 
474         @Override
475         Poller masterPoller() {
476             return null;
477         }
478 
479         @Override
480         List<Poller> readPollers() {
481             return List.of(readPollers);
482         }
483 
484         @Override
485         List<Poller> writePollers() {
486             return List.of(writePollers);
487         }
488     }
489 
490     /**
491      * VTHREAD_POLLERS poller group. The read and write pollers are virtual threads.
492      * When read and write pollers need to block then they register with a system-wide
493      * "master poller" that runs in a dedicated platform thread.
494      */
495     private static class VThreadsPollerGroup extends PollerGroup {
496         private final Poller masterPoller;
497         private final Poller[] readPollers;
498         private final Poller[] writePollers;
499 
500         // keep virtual thread pollers alive
501         private final Executor executor;
502 
503         VThreadsPollerGroup(PollerProvider provider,
504                             int readPollerCount,
505                             int writePollerCount) throws IOException {
506             super(provider);
507             Poller masterPoller = provider.readPoller(false);
508             Poller[] readPollers = new Poller[readPollerCount];
509             Poller[] writePollers = new Poller[writePollerCount];
510 
511             try {
512                 for (int i = 0; i < readPollerCount; i++) {
513                     readPollers[i] = provider.readPoller(true);
514                 }
515                 for (int i = 0; i < writePollerCount; i++) {
516                     writePollers[i] = provider.writePoller(true);
517                 }
518             } catch (Throwable e) {
519                 masterPoller.close();
520                 closeAll(readPollers);
521                 closeAll(writePollers);
522                 throw e;
523             }
524 
525             this.masterPoller = masterPoller;
526             this.readPollers = readPollers;
527             this.writePollers = writePollers;
528 
529             ThreadFactory factory = Thread.ofVirtual()
530                     .inheritInheritableThreadLocals(false)
531                     .name("SubPoller-", 0)
532                     .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
533                     .factory();
534             this.executor = Executors.newThreadPerTaskExecutor(factory);
535         }
536 
537         @Override
538         void start() {
539             startPlatformThread("Master-Poller", masterPoller::pollerLoop);
540             Arrays.stream(readPollers).forEach(p -> {
541                 executor.execute(() -> p.subPollerLoop(masterPoller));
542             });
543             Arrays.stream(writePollers).forEach(p -> {
544                 executor.execute(() -> p.subPollerLoop(masterPoller));
545             });
546         }
547 
548         private Poller readPoller(int fdVal) {
549             int index = provider().fdValToIndex(fdVal, readPollers.length);
550             return readPollers[index];
551         }
552 
553         private Poller writePoller(int fdVal) {
554             int index = provider().fdValToIndex(fdVal, writePollers.length);
555             return writePollers[index];
556         }
557 
558         @Override
559         void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
560             Poller poller = (event == Net.POLLIN)
561                     ? readPoller(fdVal)
562                     : writePoller(fdVal);
563             poller.poll(fdVal, nanos, isOpen);
564         }
565 
566         @Override
567         void pollSelector(int fdVal, long nanos) throws IOException {
568             masterPoller.poll(fdVal, nanos, () -> true);
569         }
570 
571         @Override
572         Poller masterPoller() {
573             return masterPoller;
574         }
575 
576         @Override
577         List<Poller> readPollers() {
578             return List.of(readPollers);
579         }
580 
581         @Override
582         List<Poller> writePollers() {
583             return List.of(writePollers);
584         }
585     }
586 
587     /**
588      * POLLER_PER_CARRIER poller group. The read poller is a per-carrier virtual thread.
589      * When a virtual thread polls a file descriptor for POLLIN, then it will use (almost
590      * always, not guaranteed) the read poller for its carrier. When a read poller needs
591      * to block then it registers with a system-wide "master poller" that runs in a
592      * dedicated platform thread. The read poller terminates if the carrier terminates.
593      * The write pollers are system-wide platform threads (usually one).
594      */
595     private static class PollerPerCarrierPollerGroup extends PollerGroup {
596         private record CarrierPoller(PollerPerCarrierPollerGroup group, Poller readPoller) { }
597         private static final TerminatingThreadLocal<CarrierPoller> CARRIER_POLLER =
598             new TerminatingThreadLocal<>() {
599                 @Override
600                 protected void threadTerminated(CarrierPoller carrierPoller) {
601                     Poller readPoller = carrierPoller.readPoller();
602                     carrierPoller.group().carrierTerminated(readPoller);
603                 }
604             };
605 
606         private final Poller masterPoller;
607         private final Set<Poller> readPollers;
608         private final Poller[] writePollers;
609 
610         /**
611          * Create a PollerPerCarrierPollerGroup with the given number of write pollers.
612          */
613         PollerPerCarrierPollerGroup(PollerProvider provider,
614                                     int writePollerCount) throws IOException {
615             super(provider);
616             Poller masterPoller = provider.readPoller(false);
617             Poller[] writePollers = new Poller[writePollerCount];
618             try {
619                 for (int i = 0; i < writePollerCount; i++) {
620                     writePollers[i] = provider.writePoller(false);
621                 }
622             } catch (Throwable e) {
623                 masterPoller.close();
624                 closeAll(writePollers);
625                 throw e;
626             }
627             this.masterPoller = masterPoller;
628             this.readPollers = ConcurrentHashMap.newKeySet();;
629             this.writePollers = writePollers;
630         }
631 
632         @Override
633         void start() {
634             startPlatformThread("Master-Poller", masterPoller::pollerLoop);
635             Arrays.stream(writePollers).forEach(p -> {
636                 startPlatformThread("Write-Poller", p::pollerLoop);
637             });
638         }
639 
640         private Poller writePoller(int fdVal) {
641             int index = provider().fdValToIndex(fdVal, writePollers.length);
642             return writePollers[index];
643         }
644 
645         /**
646          * Starts a read sub-poller in a virtual thread.
647          */
648         private Poller startReadPoller() throws IOException {
649             assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
650 
651             // create read sub-poller
652             Poller readPoller = provider().readPoller(true);
653             readPollers.add(readPoller);
654 
655             // start virtual thread to execute sub-polling loop
656             Thread carrier = JLA.currentCarrierThread();
657             Thread.ofVirtual()
658                     .inheritInheritableThreadLocals(false)
659                     .name(carrier.getName() + "-Read-Poller")
660                     .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
661                     .start(() -> subPollerLoop(readPoller));
662             return readPoller;
663         }
664 
665         /**
666          * Returns the read poller for the current carrier, starting it if required.
667          */
668         private Poller readPoller() throws IOException {
669             assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
670             Continuation.pin();
671             try {
672                 CarrierPoller carrierPoller = CARRIER_POLLER.get();
673                 if (carrierPoller != null) {
674                     return carrierPoller.readPoller();
675                 } else {
676                     // first poll on this carrier will start poller
677                     Poller readPoller = startReadPoller();
678                     CARRIER_POLLER.set(new CarrierPoller(this, readPoller));
679                     return readPoller;
680                 }
681             } finally {
682                 Continuation.unpin();
683             }
684         }
685 
686         @Override
687         void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
688             // for POLLIN, get the read poller for this carrier
689             if (event == Net.POLLIN
690                     && Thread.currentThread().isVirtual()
691                     && ContinuationSupport.isSupported()) {
692                 readPoller().poll(fdVal, nanos, isOpen);
693                 return;
694             }
695 
696             // -XX:-VMContinuations or POLLIN from platform thread does master poller
697             if (event == Net.POLLIN) {
698                 masterPoller.poll(fdVal, nanos, isOpen);
699             } else {
700                 writePoller(fdVal).poll(fdVal, nanos, isOpen);
701             }
702         }
703 
704         @Override
705         void pollSelector(int fdVal, long nanos) throws IOException {
706             masterPoller.poll(fdVal, nanos, () -> true);
707         }
708 
709         /**
710          * Sub-poller polling loop.
711          */
712         private void subPollerLoop(Poller readPoller) {
713             try {
714                 readPoller.subPollerLoop(masterPoller);
715             } finally {
716                 // wakeup all threads waiting on file descriptors registered with the
717                 // read poller, these I/O operation will migrate to another carrier.
718                 readPoller.wakeupAll();
719 
720                 // remove from serviceability view
721                 readPollers.remove(readPoller);
722             }
723         }
724 
725         /**
726          * Invoked by the carrier thread before it terminates.
727          */
728         private void carrierTerminated(Poller readPoller) {
729             readPoller.setShutdown();
730             try {
731                 readPoller.wakeupPoller();
732             } catch (Throwable e) {
733                 e.printStackTrace();
734             }
735         }
736 
737         @Override
738         Poller masterPoller() {
739             return masterPoller;
740         }
741 
742         @Override
743         List<Poller> readPollers() {
744             return readPollers.stream().toList();
745         }
746 
747         @Override
748         List<Poller> writePollers() {
749             return List.of(writePollers);
750         }
751     }
752 
753     /**
754      * Reads the given property name to get the poller count. If the property is
755      * set then the value must be a power of 2. Returns 1 if the property is not
756      * set.
757      * @throws IllegalArgumentException if the property is set to a value that
758      * is not a power of 2.
759      */
760     private static int pollerCount(String propName, int defaultCount) {
761         String s = System.getProperty(propName);
762         int count = (s != null) ? Integer.parseInt(s) : defaultCount;
763 
764         // check power of 2
765         if (count != Integer.highestOneBit(count)) {
766             String msg = propName + " is set to a value that is not a power of 2";
767             throw new IllegalArgumentException(msg);
768         }
769         return count;
770     }
771 
772     /**
773      * Return the master poller or null if there is no master poller.
774      */
775     public static Poller masterPoller() {
776         return POLLER_GROUP.masterPoller();
777     }
778 
779     /**
780      * Return the list of read pollers.
781      */
782     public static List<Poller> readPollers() {
783         return POLLER_GROUP.readPollers();
784     }
785 
786     /**
787      * Return the list of write pollers.
788      */
789     public static List<Poller> writePollers() {
790         return POLLER_GROUP.writePollers();
791     }
792 }