1 /*
  2  * Copyright (c) 2017, 2024, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package sun.nio.ch;
 26 
 27 import java.io.IOException;
 28 import java.io.UncheckedIOException;
 29 import java.lang.ref.Reference;
 30 import java.util.Arrays;
 31 import java.util.List;
 32 import java.util.Map;
 33 import java.util.Objects;
 34 import java.util.concurrent.ConcurrentHashMap;
 35 import java.util.concurrent.ExecutorService;
 36 import java.util.concurrent.Executors;
 37 import java.util.concurrent.ThreadFactory;
 38 import java.util.concurrent.locks.LockSupport;
 39 import java.util.function.BooleanSupplier;
 40 import jdk.internal.access.JavaLangAccess;
 41 import jdk.internal.access.SharedSecrets;
 42 import jdk.internal.misc.InnocuousThread;
 43 import jdk.internal.misc.TerminatingThreadLocal;
 44 import jdk.internal.vm.Continuation;
 45 import jdk.internal.vm.ContinuationSupport;
 46 import jdk.internal.vm.annotation.Stable;
 47 
 48 /**
 49  * Polls file descriptors. Virtual threads invoke the poll method to park
 50  * until a given file descriptor is ready for I/O.
 51  */
 52 public abstract class Poller {
 53     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 54 
 55     private static final PollerGroup POLLER_GROUP;
 56     static {
 57         try {
 58             PollerProvider provider = PollerProvider.provider();
 59             Mode mode = pollerMode(provider.defaultPollerMode());
 60             PollerGroup group = switch (mode) {
 61                 case SYSTEM_THREADS -> new SystemThreadsPollerGroup(provider);
 62                 case VTHREAD_POLLERS -> new VirtualThreadsPollerGroup(provider);
 63                 case PER_CARRIER -> new PerCarrierPollerGroup(provider);
 64             };
 65             group.start();
 66             POLLER_GROUP = group;
 67         } catch (IOException ioe) {
 68             throw new ExceptionInInitializerError(ioe);
 69         }
 70     }
 71 
 72     // the poller or sub-poller thread
 73     private @Stable Thread owner;
 74 
 75     // maps file descriptors to parked Thread
 76     private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
 77 
 78     // shutdown if supported by poller group.
 79     private volatile boolean shutdown;
 80 
 81     /**
 82      * Poller mode.
 83      */
 84     enum Mode {
 85         /**
 86          * ReadPoller and WritePoller are dedicated platform threads that block waiting
 87          * for events and unpark virtual threads when file descriptors are ready for I/O.
 88          */
 89         SYSTEM_THREADS,
 90 
 91         /**
 92          * ReadPoller and WritePoller threads are virtual threads that poll for events,
 93          * yielding between polls and unparking virtual threads when file descriptors are
 94          * ready for I/O. If there are no events then the poller threads park until there
 95          * are I/O events to poll. This mode helps to integrate polling with virtual
 96          * thread scheduling. The approach is similar to the default scheme in "User-level
 97          * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
 98          * (https://dl.acm.org/doi/10.1145/3379483).
 99          */
100         VTHREAD_POLLERS,
101 
102         /**
103          * A dedicated ReadPoller is created for each carrier thread.
104          * There is one system-wide (carrier agnostic) WritePoller.
105          */
106         PER_CARRIER
107     }
108 
109     /**
110      * Initialize a Poller.
111      */
112     protected Poller() {
113     }
114 
115     /**
116      * Closes the poller and release resources. This method can only be used to cleanup
117      * when creating a poller group fails.
118      */
119     abstract void close() throws IOException;
120 
121     /**
122      * Returns the poller's thread owner.
123      */
124     private Thread owner() {
125         return owner;
126     }
127 
128     /**
129      * Sets the poller's thread owner.
130      */
131     private void setOwner() {
132         owner = Thread.currentThread();
133     }
134 
135     /**
136      * Returns true if this poller is marked for shutdown.
137      */
138     final boolean isShutdown() {
139         return shutdown;
140     }
141 
142     /**
143      * Marks this poller for shutdown.
144      */
145     private void setShutdown() {
146         shutdown = true;
147     }
148 
149     /**
150      * Returns the poller's file descriptor to use when polling with the master poller.
151      * @throws UnsupportedOperationException if not supported
152      */
153     int fdVal() {
154         throw new UnsupportedOperationException();
155     }
156 
157     /**
158      * Invoked if when this poller's file descriptor is polled by the master poller.
159      */
160     void pollerPolled() throws IOException {
161     }
162 
163     /**
164      * Register the file descriptor. The registration is "one shot", meaning it should
165      * be polled at most once.
166      */
167     abstract void implRegister(int fdVal) throws IOException;
168 
169     /**
170      * Deregister the file descriptor.
171      * @param polled true if the file descriptor has already been polled
172      */
173     abstract void implDeregister(int fdVal, boolean polled) throws IOException;
174 
175     /**
176      * Poll for events. The {@link #polled(int)} method is invoked for each
177      * polled file descriptor.
178      *
179      * @param timeout if positive then block for up to {@code timeout} milliseconds,
180      *     if zero then don't block, if -1 then block indefinitely
181      * @return >0 if file descriptors are polled, 0 if no file descriptor polled
182      */
183     abstract int poll(int timeout) throws IOException;
184 
185     /**
186      * Wakeup the poller thread if blocked in poll.
187      *
188      * @throws UnsupportedOperationException if not supported
189      */
190     void wakeupPoller() throws IOException {
191         throw new UnsupportedOperationException();
192     }
193 
194     /**
195      * Callback by the poll method when a file descriptor is polled.
196      */
197     final void polled(int fdVal) {
198         Thread t = map.remove(fdVal);
199         if (t != null) {
200             LockSupport.unpark(t);
201         }
202     }
203 
204     /**
205      * Parks the current thread until a file descriptor is ready for the given op.
206      * @param fdVal the file descriptor
207      * @param event POLLIN or POLLOUT
208      * @param nanos the waiting time or 0 to wait indefinitely
209      * @param isOpen supplies a boolean to indicate if the enclosing object is open
210      */
211     static void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
212         POLLER_GROUP.poll(fdVal, event, nanos, isOpen);
213     }
214 
215     /**
216      * Parks the current thread until a Selector's file descriptor is ready.
217      * @param fdVal the Selector's file descriptor
218      * @param nanos the waiting time or 0 to wait indefinitely
219      */
220     static void pollSelector(int fdVal, long nanos) throws IOException {
221         POLLER_GROUP.pollSelector(fdVal, nanos);
222     }
223 
224     /**
225      * Unpark the given thread so that it stops polling.
226      */
227     static void stopPoll(Thread thread) {
228         LockSupport.unpark(thread);
229     }
230 
231     /**
232      * Parks the current thread until a file descriptor is ready.
233      */
234     private void poll(int fdVal, long nanos, BooleanSupplier isOpen) throws IOException {
235         register(fdVal);
236         try {
237             if (isOpen.getAsBoolean() && !isShutdown()) {
238                 if (nanos > 0) {
239                     LockSupport.parkNanos(nanos);
240                 } else {
241                     LockSupport.park();
242                 }
243             }
244         } finally {
245             deregister(fdVal);
246         }
247     }
248 
249     /**
250      * Registers the file descriptor to be polled at most once when the file descriptor
251      * is ready for I/O.
252      */
253     private void register(int fdVal) throws IOException {
254         Thread previous = map.put(fdVal, Thread.currentThread());
255         assert previous == null;
256         try {
257             implRegister(fdVal);
258         } catch (Throwable t) {
259             map.remove(fdVal);
260             throw t;
261         } finally {
262             Reference.reachabilityFence(this);
263         }
264     }
265 
266     /**
267      * Deregister the file descriptor so that the file descriptor is not polled.
268      */
269     private void deregister(int fdVal) throws IOException {
270         Thread previous = map.remove(fdVal);
271         boolean polled = (previous == null);
272         assert polled || previous == Thread.currentThread();
273         try {
274             implDeregister(fdVal, polled);
275         } finally {
276             Reference.reachabilityFence(this);
277         }
278     }
279 
280     /**
281      * Master polling loop. The {@link #polled(int)} method is invoked for each file
282      * descriptor that is polled.
283      */
284     private void pollerLoop() {
285         setOwner();
286         try {
287             while (!isShutdown()) {
288                 poll(-1);
289             }
290         } catch (Exception e) {
291             e.printStackTrace();
292         }
293     }
294 
295     /**
296      * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
297      * descriptor that is polled.
298      *
299      * The sub-poller registers its file descriptor with the master poller to park until
300      * there are events to poll. When unparked, it does non-blocking polls and parks
301      * again when there are no more events. The sub-poller yields after each poll to help
302      * with fairness and to avoid re-registering with the master poller where possible.
303      */
304     private void subPollerLoop(Poller masterPoller) {
305         assert Thread.currentThread().isVirtual();
306         setOwner();
307         try {
308             int polled = 0;
309             while (!isShutdown()) {
310                 if (polled == 0) {
311                     masterPoller.poll(fdVal(), 0, () -> true);  // park
312                     pollerPolled();
313                 } else {
314                     Thread.yield();
315                 }
316                 polled = poll(0);
317             }
318         } catch (Exception e) {
319             e.printStackTrace();
320         }
321     }
322 
323     /**
324      * Unparks all threads waiting on a file descriptor registered with this poller.
325      */
326     private void wakeupAll() {
327         map.values().forEach(LockSupport::unpark);
328     }
329 
330     @Override
331     public String toString() {
332         return String.format("%s [registered = %d, owner = %s]",
333             Objects.toIdentityString(this), map.size(), owner);
334     }
335 
336     /**
337      * A group of poller threads that support virtual threads polling file descriptors.
338      */
339     private static abstract class PollerGroup {
340         private final PollerProvider provider;
341 
342         PollerGroup(PollerProvider provider) {
343             this.provider = provider;
344         }
345 
346         PollerProvider provider() {
347             return provider;
348         }
349 
350         /**
351          * Starts the poller group and any system-wide poller threads.
352          */
353         abstract void start();
354 
355         /**
356          * Parks the current thread until a file descriptor is ready for the given op.
357          */
358         abstract void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException;
359 
360        /**
361         * Parks the current thread until a Selector's file descriptor is ready.
362         */
363         void pollSelector(int fdVal, long nanos) throws IOException {
364             poll(fdVal, Net.POLLIN, nanos, () -> true);
365         }
366 
367         /**
368          * Starts a platform thread to run the given task.
369          */
370         protected final void startPlatformThread(String name, Runnable task) {
371             Thread thread = InnocuousThread.newSystemThread(name, task);
372             thread.setDaemon(true);
373             thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
374             thread.start();
375         }
376 
377         /**
378          * Return the master poller for default scheduler, or null if no master poller.
379          */
380         abstract Poller defaultMasterPoller();
381 
382         /**
383          * Return the read pollers for the default scheduler.
384          */
385         abstract List<Poller> defaultReadPollers();
386 
387         /**
388          * Return the write pollers for the default scheduler.
389          */
390         abstract List<Poller> defaultWritePollers();
391     }
392 
393     /**
394      * The poller group for the SYSTEM_THREADS polling mode. The read and write pollers
395      * are system-wide (scheduler agnostic) platform threads.
396      */
397     private static class SystemThreadsPollerGroup extends PollerGroup {
398         // system-wide read and write pollers
399         private final Poller[] readPollers;
400         private final Poller[] writePollers;
401 
402         SystemThreadsPollerGroup(PollerProvider provider) throws IOException {
403             super(provider);
404 
405             int readPollerCount = pollerCount("jdk.readPollers",
406                     provider.defaultReadPollers(Mode.SYSTEM_THREADS));
407             int writePollerCount = pollerCount("jdk.writePollers",
408                     provider.defaultWritePollers(Mode.SYSTEM_THREADS));
409 
410             Poller[] readPollers = new Poller[readPollerCount];
411             Poller[] writePollers = new Poller[writePollerCount];
412             try {
413                 for (int i = 0; i < readPollerCount; i++) {
414                     readPollers[i] = provider.readPoller(false);
415                 }
416                 for (int i = 0; i < writePollerCount; i++) {
417                     writePollers[i] = provider.writePoller(false);
418                 }
419             } catch (Throwable e) {
420                 closeAll(readPollers);
421                 closeAll(writePollers);
422                 throw e;
423             }
424 
425             this.readPollers = readPollers;
426             this.writePollers = writePollers;
427         }
428 
429         /**
430          * Close the given pollers.
431          */
432         private static void closeAll(Poller... pollers) {
433             for (Poller poller : pollers) {
434                 if (poller != null) {
435                     try {
436                         poller.close();
437                     } catch (IOException _) { }
438                 }
439             }
440         }
441 
442         /**
443          * Start poller threads.
444          */
445         @Override
446         void start() {
447             Arrays.stream(readPollers).forEach(p -> {
448                 startPlatformThread("Read-Poller", p::pollerLoop);
449             });
450             Arrays.stream(writePollers).forEach(p -> {
451                 startPlatformThread("Write-Poller", p::pollerLoop);
452             });
453         }
454 
455         private Poller readPoller(int fdVal) {
456             int index = provider().fdValToIndex(fdVal, readPollers.length);
457             return readPollers[index];
458         }
459 
460         private Poller writePoller(int fdVal) {
461             int index = provider().fdValToIndex(fdVal, writePollers.length);
462             return writePollers[index];
463         }
464 
465         @Override
466         void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
467             Poller poller = (event == Net.POLLIN)
468                     ? readPoller(fdVal)
469                     : writePoller(fdVal);
470             poller.poll(fdVal, nanos, isOpen);
471         }
472 
473         @Override
474         Poller defaultMasterPoller() {
475             return null;
476         }
477 
478         @Override
479         List<Poller> defaultReadPollers() {
480             return List.of(readPollers);
481         }
482 
483         @Override
484         List<Poller> defaultWritePollers() {
485             return List.of(writePollers);
486         }
487     }
488 
489     /**
490      * The poller group for the VTHREAD_POLLERS poller mode. The read and write pollers
491      * are scheduler-specific virtual threads. The default scheduler may have many read
492      * and write pollers. Custom schedulers have one read poller and one write poller.
493      * When read and write pollers must block then they register with a system-wide
494      * (scheduler agnostic) "master poller" that runs in a dedicated platform thread.
495      */
496     private static class VirtualThreadsPollerGroup extends PollerGroup {
497         private static final Thread.VirtualThreadScheduler DEFAULT_SCHEDULER = JLA.defaultVirtualThreadScheduler();
498 
499         // system-wide master poller
500         private final Poller masterPoller;
501 
502         // number of read and write pollers for default scheduler
503         private final int defaultReadPollerCount;
504         private final int defaultWritePollerCount;
505 
506         // maps scheduler to a set of read and write pollers
507         private record Pollers(ExecutorService executor, Poller[] readPollers, Poller[] writePollers) { }
508         private final Map<Thread.VirtualThreadScheduler, Pollers> POLLERS = new ConcurrentHashMap<>();
509 
510         VirtualThreadsPollerGroup(PollerProvider provider) throws IOException {
511             super(provider);
512 
513             var mode = Mode.VTHREAD_POLLERS;
514             this.defaultReadPollerCount = pollerCount("jdk.readPollers",
515                     provider.defaultReadPollers(mode));
516             this.defaultWritePollerCount = pollerCount("jdk.writePollers",
517                     provider.defaultWritePollers(mode));
518             this.masterPoller = provider.readPoller(false);
519         }
520 
521         @Override
522         void start() {
523             startPlatformThread("Master-Poller", masterPoller::pollerLoop);
524         }
525 
526         /**
527          * Create the read and write pollers for the given scheduler.
528          */
529         private Pollers createPollers(Thread.VirtualThreadScheduler scheduler) {
530             int readPollerCount;
531             int writePollerCount;
532             if (scheduler == DEFAULT_SCHEDULER) {
533                 readPollerCount = defaultReadPollerCount;
534                 writePollerCount = defaultWritePollerCount;
535             } else {
536                 readPollerCount = 1;
537                 writePollerCount = 1;
538             }
539 
540             Poller[] readPollers = new Poller[readPollerCount];
541             Poller[] writePollers = new Poller[writePollerCount];
542             try {
543                 for (int i = 0; i < readPollerCount; i++) {
544                     readPollers[i] = provider().readPoller(true);
545                 }
546                 for (int i = 0; i < writePollerCount; i++) {
547                     writePollers[i] = provider().writePoller(true);
548                 }
549             } catch (IOException ioe) {
550                 closeAll(readPollers);
551                 closeAll(writePollers);
552                 throw new UncheckedIOException(ioe);
553             }
554 
555             @SuppressWarnings("restricted")
556             ThreadFactory factory = Thread.ofVirtual()
557                     .scheduler(scheduler)
558                     .inheritInheritableThreadLocals(false)
559                     .name("SubPoller-", 0)
560                     .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
561                     .factory();
562             ExecutorService executor = Executors.newThreadPerTaskExecutor(factory);
563 
564             Arrays.stream(readPollers).forEach(p -> {
565                 executor.execute(() -> p.subPollerLoop(masterPoller));
566             });
567             Arrays.stream(writePollers).forEach(p -> {
568                 executor.execute(() -> p.subPollerLoop(masterPoller));
569             });
570 
571             return new Pollers(executor, readPollers, writePollers);
572         }
573 
574         @Override
575         void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
576             Thread.VirtualThreadScheduler scheduler;
577             if (Thread.currentThread().isVirtual()) {
578                 scheduler = JLA.virtualThreadScheduler(Thread.currentThread());
579             } else {
580                 scheduler = DEFAULT_SCHEDULER;
581             }
582             Pollers pollers;
583             try {
584                 pollers = POLLERS.computeIfAbsent(scheduler, _ -> createPollers(scheduler));
585             } catch (UncheckedIOException uioe) {
586                 throw uioe.getCause();
587             }
588 
589             Poller poller;
590             if (event == Net.POLLIN) {
591                 Poller[] readPollers = pollers.readPollers();
592                 int index = provider().fdValToIndex(fdVal, readPollers.length);
593                 poller = readPollers[index];
594             } else {
595                 Poller[] writePollers = pollers.writePollers();
596                 int index = provider().fdValToIndex(fdVal, writePollers.length);
597                 poller = writePollers[index];
598             }
599             poller.poll(fdVal, nanos, isOpen);
600         }
601 
602         @Override
603         void pollSelector(int fdVal, long nanos) throws IOException {
604             masterPoller.poll(fdVal, nanos, () -> true);
605         }
606 
607         /**
608          * Close the given pollers.
609          */
610         private static void closeAll(Poller... pollers) {
611             for (Poller poller : pollers) {
612                 if (poller != null) {
613                     try {
614                         poller.close();
615                     } catch (IOException _) { }
616                 }
617             }
618         }
619 
620         @Override
621         Poller defaultMasterPoller() {
622             return masterPoller;
623         }
624 
625         @Override
626         List<Poller> defaultReadPollers() {
627             Pollers pollers = POLLERS.get(DEFAULT_SCHEDULER);
628             return (pollers != null) ? List.of(pollers.readPollers()) : List.of();
629         }
630 
631         @Override
632         List<Poller> defaultWritePollers() {
633             Pollers pollers = POLLERS.get(DEFAULT_SCHEDULER);
634             return (pollers != null) ? List.of(pollers.writePollers()) : List.of();
635         }
636     }
637 
638     /**
639      * The poller group for the PER_CARRIER polling mode. A dedicated read poller is
640      * created for each carrier thread. When a virtual thread polls a file descriptor
641      * for POLLIN, then it will use (almost always, not guaranteed) to see the read
642      * poller for its carrier. The read poller terminates if carrier thread terminates.
643      * There is one system-wide (carrier agnostic) write poller.
644      */
645     private static class PerCarrierPollerGroup extends PollerGroup {
646         private static final boolean USE_SUBPOLLER;
647         static {
648             String s = System.getProperty("jdk.useSubPoller");
649             USE_SUBPOLLER = (s == null) || s.isEmpty() || Boolean.parseBoolean(s);
650         }
651 
652         private static final TerminatingThreadLocal<PerCarrierPollerGroup> POLLER_GROUP =
653             new TerminatingThreadLocal<>() {
654                 @Override
655                 protected void threadTerminated(PerCarrierPollerGroup pollerGroup) {
656                     pollerGroup.carrierTerminated();
657                 }
658             };
659 
660         // -XX:-VMContinuations or called from platform thread
661         private static final Thread PLACEHOLDER = new Thread();
662 
663         // maps carrier thread to its read poller
664         private final Map<Thread, Poller> READ_POLLERS = new ConcurrentHashMap<>();
665 
666         private final Poller writePoller;
667         private final Poller masterPoller;
668 
669         PerCarrierPollerGroup(PollerProvider provider) throws IOException {
670             super(provider);
671 
672             this.writePoller = provider.writePoller(false);
673             if (USE_SUBPOLLER) {
674                 this.masterPoller = provider.readPoller(false);
675             } else {
676                 this.masterPoller = null;
677             }
678         }
679 
680         @Override
681         void start() {
682             startPlatformThread("Write-Poller", writePoller::pollerLoop);
683             if (masterPoller != null) {
684                 startPlatformThread("Master-Poller", masterPoller::pollerLoop);
685             }
686         }
687 
688         /**
689          * Starts a read poller with the given name.
690          * @throws UncheckedIOException if an I/O error occurs
691          */
692         private Poller startReadPoller(String name) {
693             try {
694                 Poller readPoller = provider().readPoller(USE_SUBPOLLER);
695                 if (USE_SUBPOLLER) {
696                     var scheduler = JLA.virtualThreadScheduler(Thread.currentThread());
697                     @SuppressWarnings("restricted")
698                     var _ = Thread.ofVirtual().scheduler(scheduler)
699                             .inheritInheritableThreadLocals(false)
700                             .name(name)
701                             .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
702                             .start(() -> subPollerLoop(readPoller));
703                 } else {
704                     startPlatformThread(name, () -> pollLoop(readPoller));
705                 }
706                 return readPoller;
707             } catch (IOException ioe) {
708                 throw new UncheckedIOException(ioe);
709             }
710         }
711 
712         @Override
713         void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
714             if (event == Net.POLLOUT) {
715                 writePoller.poll(fdVal, nanos, isOpen);
716                 return;
717             }
718 
719             assert event == Net.POLLIN;
720             if (Thread.currentThread().isVirtual() && ContinuationSupport.isSupported()) {
721                 Poller readPoller;
722                 // get read poller for this carrier
723                 Continuation.pin();
724                 try {
725                     Thread carrier = JLA.currentCarrierThread();
726                     try {
727                         readPoller = READ_POLLERS.computeIfAbsent(carrier, _ -> {
728                             String name = carrier.getName() + "-Read-Poller";
729                             return startReadPoller(name);
730                         });
731                     } catch (UncheckedIOException uioe) {
732                         throw uioe.getCause();
733                     }
734                     POLLER_GROUP.set(this);
735                 } finally {
736                     Continuation.unpin();
737                 }
738                 // may execute on a different carrier
739                 readPoller.poll(fdVal, nanos, isOpen);
740                 return;
741             }
742 
743             // -XX:-VMContinuations or called from platform thread
744             if (masterPoller != null) {
745                 masterPoller.poll(fdVal, nanos, isOpen);
746             } else {
747                 Poller readPoller;
748                 try {
749                     readPoller = READ_POLLERS.computeIfAbsent(PLACEHOLDER,
750                             _ -> startReadPoller("Read-Poller"));
751                 } catch (UncheckedIOException uioe) {
752                     throw uioe.getCause();
753                 }
754                 readPoller.poll(fdVal, nanos, isOpen);
755             }
756         }
757 
758         @Override
759         void pollSelector(int fdVal, long nanos) throws IOException {
760             masterPoller.poll(fdVal, nanos, () -> true);
761         }
762 
763         /**
764          * Read-poller polling loop.
765          */
766         private void pollLoop(Poller readPoller) {
767             try {
768                 readPoller.pollerLoop();
769             } finally {
770                 // wakeup all threads waiting on file descriptors registered with the
771                 // read poller, these I/O operation will migrate to another carrier.
772                 readPoller.wakeupAll();
773             }
774         }
775 
776         /**
777          * Read-poll sub-poller polling loop.
778          */
779         private void subPollerLoop(Poller readPoller) {
780             try {
781                 readPoller.subPollerLoop(masterPoller);
782             } finally {
783                 // wakeup all threads waiting on file descriptors registered with the
784                 // read poller, these I/O operation will migrate to another carrier.
785                 readPoller.wakeupAll();
786             }
787         }
788 
789         /**
790          * Invoked by the carrier thread before it terminates.
791          */
792         private void carrierTerminated() {
793             Poller readPoller = READ_POLLERS.remove(Thread.currentThread());
794             if (readPoller != null) {
795                 readPoller.setShutdown();
796                 try {
797                     readPoller.wakeupPoller();
798                 } catch (Throwable e) {
799                     e.printStackTrace();
800                 }
801             }
802         }
803 
804         @Override
805         Poller defaultMasterPoller() {
806             return masterPoller;
807         }
808 
809         @Override
810         List<Poller> defaultReadPollers() {
811             // return all read pollers for now.
812             return READ_POLLERS.values().stream().toList();
813         }
814 
815         @Override
816         List<Poller> defaultWritePollers() {
817             return List.of(writePoller);
818         }
819     }
820 
821     /**
822      * Returns the poller mode.
823      */
824     private static Mode pollerMode(Mode defaultPollerMode) {
825         String s = System.getProperty("jdk.pollerMode");
826         if (s != null) {
827             if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) {
828                 return Mode.SYSTEM_THREADS;
829             } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) {
830                 return Mode.VTHREAD_POLLERS;
831             } else if (s.equalsIgnoreCase(Mode.PER_CARRIER.name()) || s.equals("3")) {
832                 return Mode.PER_CARRIER;
833             } else {
834                 throw new RuntimeException("Can't parse '" + s + "' as polling mode");
835             }
836         } else {
837             return defaultPollerMode;
838         }
839     }
840 
841     /**
842      * Reads the given property name to get the poller count. If the property is
843      * set then the value must be a power of 2. Returns 1 if the property is not
844      * set.
845      * @throws IllegalArgumentException if the property is set to a value that
846      * is not a power of 2.
847      */
848     private static int pollerCount(String propName, int defaultCount) {
849         String s = System.getProperty(propName);
850         int count = (s != null) ? Integer.parseInt(s) : defaultCount;
851 
852         // check power of 2
853         if (count != Integer.highestOneBit(count)) {
854             String msg = propName + " is set to a value that is not a power of 2";
855             throw new IllegalArgumentException(msg);
856         }
857         return count;
858     }
859 
860     /**
861      * Return the master poller or null if there is no master poller.
862      */
863     public static Poller masterPoller() {
864         return POLLER_GROUP.defaultMasterPoller();
865     }
866 
867     /**
868      * Return the list of read pollers.
869      */
870     public static List<Poller> readPollers() {
871         return POLLER_GROUP.defaultReadPollers();
872     }
873 
874     /**
875      * Return the list of write pollers.
876      */
877     public static List<Poller> writePollers() {
878         return POLLER_GROUP.defaultWritePollers();
879     }
880 }