1 /*
  2  * Copyright (c) 2017, 2025, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package sun.nio.ch;
 26 
 27 import java.io.IOException;
 28 import java.io.UncheckedIOException;
 29 import java.lang.ref.Reference;
 30 import java.util.Arrays;
 31 import java.util.List;
 32 import java.util.Map;
 33 import java.util.Objects;
 34 import java.util.Set;
 35 import java.util.concurrent.ConcurrentHashMap;
 36 import java.util.concurrent.Executor;
 37 import java.util.concurrent.Executors;
 38 import java.util.concurrent.ThreadFactory;
 39 import java.util.concurrent.locks.LockSupport;
 40 import java.util.function.BooleanSupplier;
 41 import jdk.internal.access.JavaLangAccess;
 42 import jdk.internal.access.SharedSecrets;
 43 import jdk.internal.misc.InnocuousThread;
 44 import jdk.internal.misc.TerminatingThreadLocal;
 45 import jdk.internal.vm.Continuation;
 46 import jdk.internal.vm.ContinuationSupport;
 47 import jdk.internal.vm.annotation.Stable;
 48 
 49 /**
 50  * I/O poller to allow virtual threads park until a file descriptor is ready for I/O.
 51  */
 52 public abstract class Poller {
 53     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 54 
 55     // the poller group for the I/O pollers and poller threads
 56     private static final PollerGroup POLLER_GROUP = createPollerGroup();
 57 
 58     // the poller or sub-poller thread (used for observability only)
 59     private @Stable Thread owner;
 60 
 61     // maps file descriptors to parked Thread
 62     private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
 63 
 64     // shutdown (if supported by poller group)
 65     private volatile boolean shutdown;
 66 
 67     /**
 68      * Poller mode.
 69      */
 70     enum Mode {
 71         /**
 72          * Read and write pollers are platform threads that block waiting for events and
 73          * unpark virtual threads when file descriptors are ready for I/O.
 74          */
 75         SYSTEM_THREADS,
 76 
 77         /**
 78          * Read and write pollers are virtual threads that poll for events, yielding
 79          * between polls and unparking virtual threads when file descriptors are
 80          * ready for I/O. If there are no events then the poller threads park until there
 81          * are I/O events to poll. This mode helps to integrate polling with virtual
 82          * thread scheduling. The approach is similar to the default scheme in "User-level
 83          * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
 84          * (https://dl.acm.org/doi/10.1145/3379483).
 85          */
 86         VTHREAD_POLLERS,
 87 
 88         /**
 89          * Read pollers are per-carrier virtual threads that poll for events, yielding
 90          * between polls and unparking virtual threads when file descriptors are ready
 91          * for I/O. If there are no events then the poller threads park until there
 92          * are I/O events to poll. The write poller is a system-wide platform thread.
 93          */
 94         POLLER_PER_CARRIER
 95     }
 96 
 97     /**
 98      * Create and return the PollerGroup.
 99      */
100     private static PollerGroup createPollerGroup() {
101         try {
102             PollerProvider provider;
103             if (System.getProperty("jdk.pollerMode") instanceof String s) {
104                 Mode mode = switch (s) {
105                     case "1" -> Mode.SYSTEM_THREADS;
106                     case "2" -> Mode.VTHREAD_POLLERS;
107                     case "3" -> Mode.POLLER_PER_CARRIER;
108                     default -> {
109                         throw new RuntimeException(s + " is not a valid polling mode");
110                     }
111                 };
112                 provider = PollerProvider.createProvider(mode);
113             } else {
114                 provider = PollerProvider.createProvider();
115             }
116 
117             int readPollers = pollerCount("jdk.readPollers", provider.defaultReadPollers());
118             int writePollers = pollerCount("jdk.writePollers", provider.defaultWritePollers());
119             PollerGroup group = switch (provider.pollerMode()) {
120                 case SYSTEM_THREADS     -> new SystemThreadsPollerGroup(provider, readPollers, writePollers);
121                 case VTHREAD_POLLERS    -> new VThreadsPollerGroup(provider, readPollers, writePollers);
122                 case POLLER_PER_CARRIER -> new PollerPerCarrierPollerGroup(provider, writePollers);
123             };
124             group.start();
125             return group;
126         } catch (IOException ioe) {
127             throw new UncheckedIOException(ioe);
128         }
129     }
130 
131     /**
132      * Initialize a Poller.
133      */
134     protected Poller() {
135     }
136 
137     /**
138      * Closes the poller and release resources. This method can only be used to cleanup
139      * when creating a poller group fails.
140      */
141     abstract void close() throws IOException;
142 
143     /**
144      * Sets the poller's thread owner.
145      */
146     private void setOwner() {
147         owner = Thread.currentThread();
148     }
149 
150     /**
151      * Returns true if this poller is marked for shutdown.
152      */
153     boolean isShutdown() {
154         return shutdown;
155     }
156 
157     /**
158      * Marks this poller for shutdown.
159      */
160     private void setShutdown() {
161         shutdown = true;
162     }
163 
164     /**
165      * Returns the poller's file descriptor to use when polling with the master poller.
166      * @throws UnsupportedOperationException if not supported
167      */
168     int fdVal() {
169         throw new UnsupportedOperationException();
170     }
171 
172     /**
173      * Register the file descriptor with the I/O event management facility so that it is
174      * polled when the file descriptor is ready for I/O. The registration is "one shot",
175      * meaning it should be polled at most once.
176      */
177     abstract void implStartPoll(int fdVal) throws IOException;
178 
179     /**
180      * Deregister a file descriptor from the I/O event management facility. This may be
181      * a no-op in some implementations when the file descriptor has already been polled.
182      * @param polled true if the file descriptor has already been polled
183      */
184     abstract void implStopPoll(int fdVal, boolean polled) throws IOException;
185 
186     /**
187      * Poll for events. The {@link #polled(int)} method is invoked for each
188      * polled file descriptor.
189      *
190      * @param timeout if positive then block for up to {@code timeout} milliseconds,
191      *     if zero then don't block, if -1 then block indefinitely
192      * @return >0 if file descriptors are polled, 0 if no file descriptor polled
193      */
194     abstract int poll(int timeout) throws IOException;
195 
196     /**
197      * Wakeup the poller thread if blocked in poll so it can shutdown.
198      * @throws UnsupportedOperationException if not supported
199      */
200     void wakeupPoller() throws IOException {
201         throw new UnsupportedOperationException();
202     }
203 
204     /**
205      * Callback by the poll method when a file descriptor is polled.
206      */
207     final void polled(int fdVal) {
208         Thread t = map.remove(fdVal);
209         if (t != null) {
210             if (POLLER_GROUP.useLazyUnpark() && Thread.currentThread().isVirtual()) {
211                 JLA.lazyUnparkVirtualThread(t);
212             } else {
213                 LockSupport.unpark(t);
214             }
215         }
216     }
217 
218     /**
219      * Parks the current thread until a file descriptor is ready for the given op.
220      * @param fdVal the file descriptor
221      * @param event POLLIN or POLLOUT
222      * @param nanos the waiting time or 0 to wait indefinitely
223      * @param isOpen supplies a boolean to indicate if the enclosing object is open
224      */
225     public static void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
226         POLLER_GROUP.poll(fdVal, event, nanos, isOpen);
227     }
228 
229     /**
230      * Parks the current thread until a Selector's file descriptor is ready.
231      * @param fdVal the Selector's file descriptor
232      * @param nanos the waiting time or 0 to wait indefinitely
233      */
234     public static void pollSelector(int fdVal, long nanos) throws IOException {
235         POLLER_GROUP.pollSelector(fdVal, nanos);
236     }
237 
238     /**
239      * Unpark the given thread so that it stops polling.
240      */
241     public static void stopPoll(Thread thread) {
242         LockSupport.unpark(thread);
243     }
244 
245     /**
246      * Parks the current thread until a file descriptor is ready.
247      */
248     private void poll(int fdVal, long nanos, BooleanSupplier isOpen) throws IOException {
249         startPoll(fdVal);
250         try {
251             if (isOpen.getAsBoolean() && !isShutdown()) {
252                 if (nanos > 0) {
253                     LockSupport.parkNanos(nanos);
254                 } else {
255                     LockSupport.park();
256                 }
257             }
258         } finally {
259             stopPoll(fdVal);
260         }
261     }
262 
263     /**
264      * Register a file descriptor with the I/O event management facility so that it is
265      * polled when the file descriptor is ready for I/O.
266      */
267     private void startPoll(int fdVal) throws IOException {
268         Thread previous = map.put(fdVal, Thread.currentThread());
269         assert previous == null;
270         try {
271             implStartPoll(fdVal);
272         } catch (Throwable t) {
273             map.remove(fdVal);
274             throw t;
275         } finally {
276             Reference.reachabilityFence(this);
277         }
278     }
279 
280     /**
281      * Deregister a file descriptor from the I/O event management facility.
282      */
283     private void stopPoll(int fdVal) throws IOException {
284         Thread previous = map.remove(fdVal);
285         boolean polled = (previous == null);
286         assert polled || previous == Thread.currentThread();
287         try {
288             implStopPoll(fdVal, polled);
289         } finally {
290             Reference.reachabilityFence(this);
291         }
292     }
293 
294     /**
295      * Master polling loop. The {@link #polled(int)} method is invoked for each file
296      * descriptor that is polled.
297      */
298     private void pollerLoop() {
299         setOwner();
300         try {
301             while (!isShutdown()) {
302                 poll(-1);
303             }
304         } catch (Exception e) {
305             e.printStackTrace();
306         }
307     }
308 
309     /**
310      * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
311      * descriptor that is polled.
312      *
313      * The sub-poller registers its file descriptor with the master poller to park until
314      * there are events to poll. When unparked, it does non-blocking polls and parks
315      * again when there are no more events. The sub-poller yields after each poll to help
316      * with fairness and to avoid re-registering with the master poller where possible.
317      */
318     private void subPollerLoop(Poller masterPoller) {
319         assert Thread.currentThread().isVirtual();
320         setOwner();
321         try {
322             int polled = 0;
323             while (!isShutdown()) {
324                 if (polled == 0) {
325                     masterPoller.poll(fdVal(), 0, () -> true);  // park
326                 } else {
327                     Thread.yield();
328                 }
329                 polled = poll(0);
330             }
331         } catch (Exception e) {
332             e.printStackTrace();
333         }
334     }
335 
336     /**
337      * Unparks all threads waiting on a file descriptor registered with this poller.
338      */
339     private void wakeupAll() {
340         map.values().forEach(LockSupport::unpark);
341     }
342 
343     @Override
344     public String toString() {
345         return String.format("%s [registered = %d, owner = %s]",
346             Objects.toIdentityString(this), map.size(), owner);
347     }
348 
349     /**
350      * A group of poller threads that support virtual threads polling file descriptors.
351      */
352     private static abstract class PollerGroup {
353         private final PollerProvider provider;
354 
355         PollerGroup(PollerProvider provider) {
356             this.provider = provider;
357         }
358 
359         final PollerProvider provider() {
360             return provider;
361         }
362 
363         /**
364          * Starts the poller group and any system-wide poller threads.
365          */
366         abstract void start();
367 
368         /**
369          * Parks the current thread until a file descriptor is ready for the given op.
370          */
371         abstract void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException;
372 
373         /**
374          * Parks the current thread until a Selector's file descriptor is ready.
375          */
376         void pollSelector(int fdVal, long nanos) throws IOException {
377             poll(fdVal, Net.POLLIN, nanos, () -> true);
378         }
379 
380         /**
381          * Starts a platform thread to run the given task.
382          */
383         protected final void startPlatformThread(String name, Runnable task) {
384             Thread thread = InnocuousThread.newSystemThread(name, task);
385             thread.setDaemon(true);
386             thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
387             thread.start();
388         }
389 
390         /**
391          * Return the master poller, or null if no master poller.
392          */
393         abstract Poller masterPoller();
394 
395         /**
396          * Return the read pollers.
397          */
398         abstract List<Poller> readPollers();
399 
400         /**
401          * Return the write pollers.
402          */
403         abstract List<Poller> writePollers();
404 
405         /**
406          * Return true if the unparking threads should use lazyUnpark.
407          */
408         boolean useLazyUnpark() {
409             return false;
410         }
411 
412         /**
413          * Close the given pollers.
414          */
415         static void closeAll(Poller... pollers) {
416             for (Poller poller : pollers) {
417                 if (poller != null) {
418                     try {
419                         poller.close();
420                     } catch (IOException _) { }
421                 }
422             }
423         }
424     }
425 
426     /**
427      * SYSTEM_THREADS poller group. The read and write pollers are system-wide platform threads.
428      */
429     private static class SystemThreadsPollerGroup extends PollerGroup {
430         // system-wide read and write pollers
431         private final Poller[] readPollers;
432         private final Poller[] writePollers;
433 
434         SystemThreadsPollerGroup(PollerProvider provider,
435                                  int readPollerCount,
436                                  int writePollerCount) throws IOException {
437             super(provider);
438             Poller[] readPollers = new Poller[readPollerCount];
439             Poller[] writePollers = new Poller[writePollerCount];
440             try {
441                 for (int i = 0; i < readPollerCount; i++) {
442                     readPollers[i] = provider.readPoller(false);
443                 }
444                 for (int i = 0; i < writePollerCount; i++) {
445                     writePollers[i] = provider.writePoller(false);
446                 }
447             } catch (Throwable e) {
448                 closeAll(readPollers);
449                 closeAll(writePollers);
450                 throw e;
451             }
452 
453             this.readPollers = readPollers;
454             this.writePollers = writePollers;
455         }
456 
457         @Override
458         void start() {
459             Arrays.stream(readPollers).forEach(p -> {
460                 startPlatformThread("Read-Poller", p::pollerLoop);
461             });
462             Arrays.stream(writePollers).forEach(p -> {
463                 startPlatformThread("Write-Poller", p::pollerLoop);
464             });
465         }
466 
467         private Poller readPoller(int fdVal) {
468             int index = provider().fdValToIndex(fdVal, readPollers.length);
469             return readPollers[index];
470         }
471 
472         private Poller writePoller(int fdVal) {
473             int index = provider().fdValToIndex(fdVal, writePollers.length);
474             return writePollers[index];
475         }
476 
477         @Override
478         void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
479             Poller poller = (event == Net.POLLIN)
480                     ? readPoller(fdVal)
481                     : writePoller(fdVal);
482             poller.poll(fdVal, nanos, isOpen);
483         }
484 
485         @Override
486         Poller masterPoller() {
487             return null;
488         }
489 
490         @Override
491         List<Poller> readPollers() {
492             return List.of(readPollers);
493         }
494 
495         @Override
496         List<Poller> writePollers() {
497             return List.of(writePollers);
498         }
499     }
500 
501     /**
502      * VTHREAD_POLLERS poller group. The read and write pollers are virtual threads.
503      * When read and write pollers need to block then they register with a system-wide
504      * "master poller" that runs in a dedicated platform thread.
505      */
506     private static class VThreadsPollerGroup extends PollerGroup {
507         private final Poller masterPoller;
508         private final Poller[] readPollers;
509         private final Poller[] writePollers;
510 
511         // keep virtual thread pollers alive
512         private final Executor executor;
513 
514         VThreadsPollerGroup(PollerProvider provider,
515                             int readPollerCount,
516                             int writePollerCount) throws IOException {
517             super(provider);
518             Poller masterPoller = provider.readPoller(false);
519             Poller[] readPollers = new Poller[readPollerCount];
520             Poller[] writePollers = new Poller[writePollerCount];
521 
522             try {
523                 for (int i = 0; i < readPollerCount; i++) {
524                     readPollers[i] = provider.readPoller(true);
525                 }
526                 for (int i = 0; i < writePollerCount; i++) {
527                     writePollers[i] = provider.writePoller(true);
528                 }
529             } catch (Throwable e) {
530                 masterPoller.close();
531                 closeAll(readPollers);
532                 closeAll(writePollers);
533                 throw e;
534             }
535 
536             this.masterPoller = masterPoller;
537             this.readPollers = readPollers;
538             this.writePollers = writePollers;
539 
540             ThreadFactory factory = Thread.ofVirtual()
541                     .inheritInheritableThreadLocals(false)
542                     .name("SubPoller-", 0)
543                     .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
544                     .factory();
545             this.executor = Executors.newThreadPerTaskExecutor(factory);
546         }
547 
548         @Override
549         void start() {
550             startPlatformThread("Master-Poller", masterPoller::pollerLoop);
551             Arrays.stream(readPollers).forEach(p -> {
552                 executor.execute(() -> p.subPollerLoop(masterPoller));
553             });
554             Arrays.stream(writePollers).forEach(p -> {
555                 executor.execute(() -> p.subPollerLoop(masterPoller));
556             });
557         }
558 
559         private Poller readPoller(int fdVal) {
560             int index = provider().fdValToIndex(fdVal, readPollers.length);
561             return readPollers[index];
562         }
563 
564         private Poller writePoller(int fdVal) {
565             int index = provider().fdValToIndex(fdVal, writePollers.length);
566             return writePollers[index];
567         }
568 
569         @Override
570         void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
571             Poller poller = (event == Net.POLLIN)
572                     ? readPoller(fdVal)
573                     : writePoller(fdVal);
574             poller.poll(fdVal, nanos, isOpen);
575         }
576 
577         @Override
578         void pollSelector(int fdVal, long nanos) throws IOException {
579             masterPoller.poll(fdVal, nanos, () -> true);
580         }
581 
582         @Override
583         Poller masterPoller() {
584             return masterPoller;
585         }
586 
587         @Override
588         List<Poller> readPollers() {
589             return List.of(readPollers);
590         }
591 
592         @Override
593         List<Poller> writePollers() {
594             return List.of(writePollers);
595         }
596     }
597 
598     /**
599      * POLLER_PER_CARRIER poller group. The read poller is a per-carrier virtual thread.
600      * When a virtual thread polls a file descriptor for POLLIN, then it will use (almost
601      * always, not guaranteed) the read poller for its carrier. When a read poller needs
602      * to block then it registers with a system-wide "master poller" that runs in a
603      * dedicated platform thread. The read poller terminates if the carrier terminates.
604      * The write pollers are system-wide platform threads (usually one).
605      */
606     private static class PollerPerCarrierPollerGroup extends PollerGroup {
607         private record CarrierPoller(PollerPerCarrierPollerGroup group, Poller readPoller) { }
608         private static final TerminatingThreadLocal<CarrierPoller> CARRIER_POLLER =
609             new TerminatingThreadLocal<>() {
610                 @Override
611                 protected void threadTerminated(CarrierPoller carrierPoller) {
612                     Poller readPoller = carrierPoller.readPoller();
613                     carrierPoller.group().carrierTerminated(readPoller);
614                 }
615             };
616 
617         private final Poller masterPoller;
618         private final Set<Poller> readPollers;
619         private final Poller[] writePollers;
620 
621         /**
622          * Create a PollerPerCarrierPollerGroup with the given number of write pollers.
623          */
624         PollerPerCarrierPollerGroup(PollerProvider provider,
625                                     int writePollerCount) throws IOException {
626             super(provider);
627             Poller masterPoller = provider.readPoller(false);
628             Poller[] writePollers = new Poller[writePollerCount];
629             try {
630                 for (int i = 0; i < writePollerCount; i++) {
631                     writePollers[i] = provider.writePoller(false);
632                 }
633             } catch (Throwable e) {
634                 masterPoller.close();
635                 closeAll(writePollers);
636                 throw e;
637             }
638             this.masterPoller = masterPoller;
639             this.readPollers = ConcurrentHashMap.newKeySet();;
640             this.writePollers = writePollers;
641         }
642 
643         @Override
644         void start() {
645             startPlatformThread("Master-Poller", masterPoller::pollerLoop);
646             Arrays.stream(writePollers).forEach(p -> {
647                 startPlatformThread("Write-Poller", p::pollerLoop);
648             });
649         }
650 
651         private Poller writePoller(int fdVal) {
652             int index = provider().fdValToIndex(fdVal, writePollers.length);
653             return writePollers[index];
654         }
655 
656         /**
657          * Starts a read sub-poller in a virtual thread.
658          */
659         private Poller startReadPoller() throws IOException {
660             assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
661 
662             // create read sub-poller
663             Poller readPoller = provider().readPoller(true);
664             readPollers.add(readPoller);
665 
666             // start virtual thread to execute sub-polling loop
667             Thread carrier = JLA.currentCarrierThread();
668             Thread.Builder.OfVirtual builder = Thread.ofVirtual()
669                     .inheritInheritableThreadLocals(false)
670                     .name(carrier.getName() + "-Read-Poller")
671                     .uncaughtExceptionHandler((_, e) -> e.printStackTrace());
672             Thread thread = JLA.defaultVirtualThreadScheduler()
673                     .newThread(builder, carrier, () -> subPollerLoop(readPoller))
674                     .thread();
675             thread.start();
676             return readPoller;
677         }
678 
679         /**
680          * Returns the read poller for the current carrier, starting it if required.
681          */
682         private Poller readPoller() throws IOException {
683             assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
684             Continuation.pin();
685             try {
686                 CarrierPoller carrierPoller = CARRIER_POLLER.get();
687                 if (carrierPoller != null) {
688                     return carrierPoller.readPoller();
689                 } else {
690                     // first poll on this carrier will start poller
691                     Poller readPoller = startReadPoller();
692                     CARRIER_POLLER.set(new CarrierPoller(this, readPoller));
693                     return readPoller;
694                 }
695             } finally {
696                 Continuation.unpin();
697             }
698         }
699 
700         @Override
701         void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
702             // for POLLIN, get the read poller for this carrier
703             if (event == Net.POLLIN
704                     && Thread.currentThread().isVirtual()
705                     && ContinuationSupport.isSupported()) {
706                 readPoller().poll(fdVal, nanos, isOpen);
707                 return;
708             }
709 
710             // -XX:-VMContinuations or POLLIN from platform thread does master poller
711             if (event == Net.POLLIN) {
712                 masterPoller.poll(fdVal, nanos, isOpen);
713             } else {
714                 writePoller(fdVal).poll(fdVal, nanos, isOpen);
715             }
716         }
717 
718         @Override
719         void pollSelector(int fdVal, long nanos) throws IOException {
720             masterPoller.poll(fdVal, nanos, () -> true);
721         }
722 
723         /**
724          * Sub-poller polling loop.
725          */
726         private void subPollerLoop(Poller readPoller) {
727             try {
728                 readPoller.subPollerLoop(masterPoller);
729             } finally {
730                 // wakeup all threads waiting on file descriptors registered with the
731                 // read poller, these I/O operation will migrate to another carrier.
732                 readPoller.wakeupAll();
733 
734                 // remove from serviceability view
735                 readPollers.remove(readPoller);
736             }
737         }
738 
739         /**
740          * Invoked by the carrier thread before it terminates.
741          */
742         private void carrierTerminated(Poller readPoller) {
743             readPoller.setShutdown();
744             try {
745                 readPoller.wakeupPoller();
746             } catch (Throwable e) {
747                 e.printStackTrace();
748             }
749         }
750 
751         @Override
752         Poller masterPoller() {
753             return masterPoller;
754         }
755 
756         @Override
757         List<Poller> readPollers() {
758             return readPollers.stream().toList();
759         }
760 
761         @Override
762         List<Poller> writePollers() {
763             return List.of(writePollers);
764         }
765 
766         @Override
767         boolean useLazyUnpark() {
768             return true;
769         }
770     }
771 
772     /**
773      * Reads the given property name to get the poller count. If the property is
774      * set then the value must be a power of 2. Returns 1 if the property is not
775      * set.
776      * @throws IllegalArgumentException if the property is set to a value that
777      * is not a power of 2.
778      */
779     private static int pollerCount(String propName, int defaultCount) {
780         String s = System.getProperty(propName);
781         int count = (s != null) ? Integer.parseInt(s) : defaultCount;
782 
783         // check power of 2
784         if (count != Integer.highestOneBit(count)) {
785             String msg = propName + " is set to a value that is not a power of 2";
786             throw new IllegalArgumentException(msg);
787         }
788         return count;
789     }
790 
791     /**
792      * Return the master poller or null if there is no master poller.
793      */
794     public static Poller masterPoller() {
795         return POLLER_GROUP.masterPoller();
796     }
797 
798     /**
799      * Return the list of read pollers.
800      */
801     public static List<Poller> readPollers() {
802         return POLLER_GROUP.readPollers();
803     }
804 
805     /**
806      * Return the list of write pollers.
807      */
808     public static List<Poller> writePollers() {
809         return POLLER_GROUP.writePollers();
810     }
811 }