1 /*
  2  * Copyright (c) 2017, 2024, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package sun.nio.ch;
 26 
 27 import java.io.IOException;
 28 import java.io.UncheckedIOException;
 29 import java.lang.ref.Reference;
 30 import java.util.Arrays;
 31 import java.util.List;
 32 import java.util.Map;
 33 import java.util.Objects;
 34 import java.util.Set;
 35 import java.util.concurrent.ConcurrentHashMap;
 36 import java.util.concurrent.Executor;
 37 import java.util.concurrent.Executors;
 38 import java.util.concurrent.ThreadFactory;
 39 import java.util.concurrent.locks.LockSupport;
 40 import java.util.function.BooleanSupplier;
 41 import jdk.internal.access.JavaLangAccess;
 42 import jdk.internal.access.SharedSecrets;
 43 import jdk.internal.misc.InnocuousThread;
 44 import jdk.internal.misc.TerminatingThreadLocal;
 45 import jdk.internal.vm.Continuation;
 46 import jdk.internal.vm.ContinuationSupport;
 47 import jdk.internal.vm.annotation.Stable;
 48 
 49 /**
 50  * I/O poller to allow virtual threads park until a file descriptor is ready for I/O.
 51  * Implementations also optionally support read/write operations where virtual threads
 52  * park until bytes are read or written.
 53  */
 54 public abstract class Poller {
 55     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 56 
 57     // the poller group for the I/O pollers and poller threads
 58     private static final PollerGroup POLLER_GROUP = createPollerGroup();
 59 
 60     // the poller or sub-poller thread (used for observability only)
 61     private @Stable Thread owner;
 62 
 63     // maps file descriptors to parked Thread
 64     private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
 65 
 66     // shutdown (if supported by poller group)
 67     private volatile boolean shutdown;
 68 
 69     /**
 70      * Poller mode.
 71      */
 72     enum Mode {
 73         /**
 74          * Read and write pollers are platform threads that block waiting for events and
 75          * unpark virtual threads when file descriptors are ready for I/O.
 76          */
 77         SYSTEM_THREADS,
 78 
 79         /**
 80          * Read and write pollers are virtual threads that poll for events, yielding
 81          * between polls and unparking virtual threads when file descriptors are
 82          * ready for I/O. If there are no events then the poller threads park until there
 83          * are I/O events to poll. This mode helps to integrate polling with virtual
 84          * thread scheduling. The approach is similar to the default scheme in "User-level
 85          * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
 86          * (https://dl.acm.org/doi/10.1145/3379483).
 87          */
 88         VTHREAD_POLLERS,
 89 
 90         /**
 91          * Read pollers are per-carrier virtual threads that poll for events, yielding
 92          * between polls and unparking virtual threads when file descriptors are ready
 93          * for I/O. If there are no events then the poller threads park until there
 94          * are I/O events to poll. The write poller is a system-wide platform thread.
 95          */
 96         POLLER_PER_CARRIER
 97     }
 98 
 99     /**
100      * Create and return the PollerGroup.
101      */
102     private static PollerGroup createPollerGroup() {
103         try {
104             PollerProvider provider;
105             if (System.getProperty("jdk.pollerMode") instanceof String s) {
106                 Mode mode = switch (s) {
107                     case "1" -> Mode.SYSTEM_THREADS;
108                     case "2" -> Mode.VTHREAD_POLLERS;
109                     case "3" -> Mode.POLLER_PER_CARRIER;
110                     default -> {
111                         throw new RuntimeException(s + " is not a valid polling mode");
112                     }
113                 };
114                 provider = PollerProvider.createProvider(mode);
115             } else {
116                 provider = PollerProvider.createProvider();
117             }
118 
119             int readPollers = pollerCount("jdk.readPollers", provider.defaultReadPollers());
120             int writePollers = pollerCount("jdk.writePollers", provider.defaultWritePollers());
121             PollerGroup group = switch (provider.pollerMode()) {
122                 case SYSTEM_THREADS     -> new SystemThreadsPollerGroup(provider, readPollers, writePollers);
123                 case VTHREAD_POLLERS    -> new VThreadsPollerGroup(provider, readPollers, writePollers);
124                 case POLLER_PER_CARRIER -> new PollerPerCarrierPollerGroup(provider, writePollers);
125             };
126             group.start();
127             return group;
128         } catch (IOException ioe) {
129             throw new UncheckedIOException(ioe);
130         }
131     }
132 
133     /**
134      * Initialize a Poller.
135      */
136     protected Poller() {
137     }
138 
139     /**
140      * Closes the poller and release resources. This method can only be used to cleanup
141      * when creating a poller group fails.
142      */
143     abstract void close() throws IOException;
144 
145     /**
146      * Sets the poller's thread owner.
147      */
148     private void setOwner() {
149         owner = Thread.currentThread();
150     }
151 
152     /**
153      * Returns true if this poller is marked for shutdown.
154      */
155     boolean isShutdown() {
156         return shutdown;
157     }
158 
159     /**
160      * Marks this poller for shutdown.
161      */
162     private void setShutdown() {
163         shutdown = true;
164     }
165 
166     /**
167      * Returns the poller's file descriptor to use when polling with the master poller.
168      * @throws UnsupportedOperationException if not supported
169      */
170     int fdVal() {
171         throw new UnsupportedOperationException();
172     }
173 
174     /**
175      * Invoked if when this poller's file descriptor is polled by the master poller.
176      */
177     void pollerPolled() throws IOException {
178     }
179 
180     /**
181      * Register the file descriptor with the I/O event management facility so that it is
182      * polled when the file descriptor is ready for I/O. The registration is "one shot",
183      * meaning it should be polled at most once.
184      */
185     abstract void implStartPoll(int fdVal) throws IOException;
186 
187     /**
188      * Deregister a file descriptor from the I/O event management facility. This may be
189      * a no-op in some implementations when the file descriptor has already been polled.
190      * @param polled true if the file descriptor has already been polled
191      */
192     abstract void implStopPoll(int fdVal, boolean polled) throws IOException;
193 
194     /**
195      * Poll for events. The {@link #polled(int)} method is invoked for each
196      * polled file descriptor.
197      *
198      * @param timeout if positive then block for up to {@code timeout} milliseconds,
199      *     if zero then don't block, if -1 then block indefinitely
200      * @return >0 if file descriptors are polled, 0 if no file descriptor polled
201      */
202     abstract int poll(int timeout) throws IOException;
203 
204     /**
205      * Wakeup the poller thread if blocked in poll so it can shutdown.
206      * @throws UnsupportedOperationException if not supported
207      */
208     void wakeupPoller() throws IOException {
209         throw new UnsupportedOperationException();
210     }
211 
212     /**
213      * Callback by the poll method when a file descriptor is polled.
214      */
215     final void polled(int fdVal) {
216         Thread t = map.remove(fdVal);
217         if (t != null) {
218             LockSupport.unpark(t);
219         }
220     }
221 
222     /**
223      * Parks the current thread until a file descriptor is ready for the given op.
224      * @param fdVal the file descriptor
225      * @param event POLLIN or POLLOUT
226      * @param nanos the waiting time or 0 to wait indefinitely
227      * @param isOpen supplies a boolean to indicate if the enclosing object is open
228      */
229     public static void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
230         POLLER_GROUP.poll(fdVal, event, nanos, isOpen);
231     }
232 
233     /**
234      * Parks the current thread until a Selector's file descriptor is ready.
235      * @param fdVal the Selector's file descriptor
236      * @param nanos the waiting time or 0 to wait indefinitely
237      */
238     public static void pollSelector(int fdVal, long nanos) throws IOException {
239         POLLER_GROUP.pollSelector(fdVal, nanos);
240     }
241 
242     /**
243      * Unpark the given thread so that it stops polling.
244      */
245     public static void stopPoll(Thread thread) {
246         LockSupport.unpark(thread);
247     }
248 
249     /**
250      * Parks the current thread until a file descriptor is ready.
251      */
252     private void poll(int fdVal, long nanos, BooleanSupplier isOpen) throws IOException {
253         startPoll(fdVal);
254         try {
255             if (isOpen.getAsBoolean() && !isShutdown()) {
256                 if (nanos > 0) {
257                     LockSupport.parkNanos(nanos);
258                 } else {
259                     LockSupport.park();
260                 }
261             }
262         } finally {
263             stopPoll(fdVal);
264         }
265     }
266 
267     /**
268      * Register a file descriptor with the I/O event management facility so that it is
269      * polled when the file descriptor is ready for I/O.
270      */
271     private void startPoll(int fdVal) throws IOException {
272         Thread previous = map.put(fdVal, Thread.currentThread());
273         assert previous == null;
274         try {
275             implStartPoll(fdVal);
276         } catch (Throwable t) {
277             map.remove(fdVal);
278             throw t;
279         } finally {
280             Reference.reachabilityFence(this);
281         }
282     }
283 
284     /**
285      * Deregister a file descriptor from the I/O event management facility.
286      */
287     private void stopPoll(int fdVal) throws IOException {
288         Thread previous = map.remove(fdVal);
289         boolean polled = (previous == null);
290         assert polled || previous == Thread.currentThread();
291         try {
292             implStopPoll(fdVal, polled);
293         } finally {
294             Reference.reachabilityFence(this);
295         }
296     }
297 
298     /**
299      * Master polling loop. The {@link #polled(int)} method is invoked for each file
300      * descriptor that is polled.
301      */
302     private void pollerLoop() {
303         setOwner();
304         try {
305             while (!isShutdown()) {
306                 poll(-1);
307             }
308         } catch (Exception e) {
309             e.printStackTrace();
310         }
311     }
312 
313     /**
314      * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
315      * descriptor that is polled.
316      *
317      * The sub-poller registers its file descriptor with the master poller to park until
318      * there are events to poll. When unparked, it does non-blocking polls and parks
319      * again when there are no more events. The sub-poller yields after each poll to help
320      * with fairness and to avoid re-registering with the master poller where possible.
321      */
322     private void subPollerLoop(Poller masterPoller) {
323         assert Thread.currentThread().isVirtual();
324         setOwner();
325         try {
326             int polled = 0;
327             while (!isShutdown()) {
328                 if (polled == 0) {
329                     masterPoller.poll(fdVal(), 0, () -> true);  // park
330                     pollerPolled();
331                 } else {
332                     Thread.yield();
333                 }
334                 polled = poll(0);
335             }
336         } catch (Exception e) {
337             e.printStackTrace();
338         }
339     }
340 
341     /**
342      * Unparks all threads waiting on a file descriptor registered with this poller.
343      */
344     private void wakeupAll() {
345         map.values().forEach(LockSupport::unpark);
346     }
347 
348     @Override
349     public String toString() {
350         return String.format("%s [registered = %d, owner = %s]",
351             Objects.toIdentityString(this), map.size(), owner);
352     }
353 
354     /**
355      * A group of poller threads that support virtual threads polling file descriptors.
356      */
357     private static abstract class PollerGroup {
358         private final PollerProvider provider;
359 
360         PollerGroup(PollerProvider provider) {
361             this.provider = provider;
362         }
363 
364         final PollerProvider provider() {
365             return provider;
366         }
367 
368         /**
369          * Starts the poller group and any system-wide poller threads.
370          */
371         abstract void start();
372 
373         /**
374          * Parks the current thread until a file descriptor is ready for the given op.
375          */
376         abstract void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException;
377 
378         /**
379          * Parks the current thread until a Selector's file descriptor is ready.
380          */
381         void pollSelector(int fdVal, long nanos) throws IOException {
382             poll(fdVal, Net.POLLIN, nanos, () -> true);
383         }
384 
385         /**
386          * Starts a platform thread to run the given task.
387          */
388         protected final void startPlatformThread(String name, Runnable task) {
389             Thread thread = InnocuousThread.newSystemThread(name, task);
390             thread.setDaemon(true);
391             thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
392             thread.start();
393         }
394 
395         /**
396          * Return the master poller, or null if no master poller.
397          */
398         abstract Poller masterPoller();
399 
400         /**
401          * Return the read pollers.
402          */
403         abstract List<Poller> readPollers();
404 
405         /**
406          * Return the write pollers.
407          */
408         abstract List<Poller> writePollers();
409 
410         /**
411          * Close the given pollers.
412          */
413         static void closeAll(Poller... pollers) {
414             for (Poller poller : pollers) {
415                 if (poller != null) {
416                     try {
417                         poller.close();
418                     } catch (IOException _) { }
419                 }
420             }
421         }
422 
423         /**
424          * Returns true if the read pollers in this poller group support read ops in
425          * addition to POLLIN polling.
426          */
427         boolean supportReadOps() {
428             return provider().supportReadOps();
429         }
430 
431         /**
432          * Reads bytes into a byte array.
433          * @throws UnsupportedOperationException if not supported
434          */
435         abstract int read(int fdVal, byte[] b, int off, int len, long nanos,
436                           BooleanSupplier isOpen) throws IOException;
437 
438         /**
439          * Returns true if the write pollers in this poller group support write ops in
440          * addition to POLLOUT polling.
441          */
442         boolean supportWriteOps() {
443             return provider().supportWriteOps();
444         }
445 
446         /**
447          * Write bytes from a byte array.
448          * @throws UnsupportedOperationException if not supported
449          */
450         abstract int write(int fdVal, byte[] b, int off, int len,
451                            BooleanSupplier isOpen) throws IOException;
452     }
453 
454     /**
455      * SYSTEM_THREADS poller group. The read and write pollers are system-wide platform threads.
456      */
457     private static class SystemThreadsPollerGroup extends PollerGroup {
458         // system-wide read and write pollers
459         private final Poller[] readPollers;
460         private final Poller[] writePollers;
461 
462         SystemThreadsPollerGroup(PollerProvider provider,
463                                  int readPollerCount,
464                                  int writePollerCount) throws IOException {
465             super(provider);
466             Poller[] readPollers = new Poller[readPollerCount];
467             Poller[] writePollers = new Poller[writePollerCount];
468             try {
469                 for (int i = 0; i < readPollerCount; i++) {
470                     readPollers[i] = provider.readPoller(false);
471                 }
472                 for (int i = 0; i < writePollerCount; i++) {
473                     writePollers[i] = provider.writePoller(false);
474                 }
475             } catch (Throwable e) {
476                 closeAll(readPollers);
477                 closeAll(writePollers);
478                 throw e;
479             }
480 
481             this.readPollers = readPollers;
482             this.writePollers = writePollers;
483         }
484 
485         @Override
486         void start() {
487             Arrays.stream(readPollers).forEach(p -> {
488                 startPlatformThread("Read-Poller", p::pollerLoop);
489             });
490             Arrays.stream(writePollers).forEach(p -> {
491                 startPlatformThread("Write-Poller", p::pollerLoop);
492             });
493         }
494 
495         private Poller readPoller(int fdVal) {
496             int index = provider().fdValToIndex(fdVal, readPollers.length);
497             return readPollers[index];
498         }
499 
500         private Poller writePoller(int fdVal) {
501             int index = provider().fdValToIndex(fdVal, writePollers.length);
502             return writePollers[index];
503         }
504 
505         @Override
506         void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
507             Poller poller = (event == Net.POLLIN)
508                     ? readPoller(fdVal)
509                     : writePoller(fdVal);
510             poller.poll(fdVal, nanos, isOpen);
511         }
512 
513         @Override
514         int read(int fdVal, byte[] b, int off, int len, long nanos,
515                  BooleanSupplier isOpen) throws IOException {
516             return readPoller(fdVal).implRead(fdVal, b, off, len, nanos, isOpen);
517         }
518 
519         @Override
520         int write(int fdVal, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException {
521             return writePoller(fdVal).implWrite(fdVal, b, off, len, isOpen);
522         }
523 
524         @Override
525         Poller masterPoller() {
526             return null;
527         }
528 
529         @Override
530         List<Poller> readPollers() {
531             return List.of(readPollers);
532         }
533 
534         @Override
535         List<Poller> writePollers() {
536             return List.of(writePollers);
537         }
538     }
539 
540     /**
541      * VTHREAD_POLLERS poller group. The read and write pollers are virtual threads.
542      * When read and write pollers need to block then they register with a system-wide
543      * "master poller" that runs in a dedicated platform thread.
544      */
545     private static class VThreadsPollerGroup extends PollerGroup {
546         private final Poller masterPoller;
547         private final Poller[] readPollers;
548         private final Poller[] writePollers;
549 
550         // keep virtual thread pollers alive
551         private final Executor executor;
552 
553         VThreadsPollerGroup(PollerProvider provider,
554                             int readPollerCount,
555                             int writePollerCount) throws IOException {
556             super(provider);
557             Poller masterPoller = provider.readPoller(false);
558             Poller[] readPollers = new Poller[readPollerCount];
559             Poller[] writePollers = new Poller[writePollerCount];
560 
561             try {
562                 for (int i = 0; i < readPollerCount; i++) {
563                     readPollers[i] = provider.readPoller(true);
564                 }
565                 for (int i = 0; i < writePollerCount; i++) {
566                     writePollers[i] = provider.writePoller(true);
567                 }
568             } catch (Throwable e) {
569                 masterPoller.close();
570                 closeAll(readPollers);
571                 closeAll(writePollers);
572                 throw e;
573             }
574 
575             this.masterPoller = masterPoller;
576             this.readPollers = readPollers;
577             this.writePollers = writePollers;
578 
579             ThreadFactory factory = Thread.ofVirtual()
580                     .inheritInheritableThreadLocals(false)
581                     .name("SubPoller-", 0)
582                     .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
583                     .factory();
584             this.executor = Executors.newThreadPerTaskExecutor(factory);
585         }
586 
587         @Override
588         void start() {
589             startPlatformThread("Master-Poller", masterPoller::pollerLoop);
590             Arrays.stream(readPollers).forEach(p -> {
591                 executor.execute(() -> p.subPollerLoop(masterPoller));
592             });
593             Arrays.stream(writePollers).forEach(p -> {
594                 executor.execute(() -> p.subPollerLoop(masterPoller));
595             });
596         }
597 
598         private Poller readPoller(int fdVal) {
599             int index = provider().fdValToIndex(fdVal, readPollers.length);
600             return readPollers[index];
601         }
602 
603         private Poller writePoller(int fdVal) {
604             int index = provider().fdValToIndex(fdVal, writePollers.length);
605             return writePollers[index];
606         }
607 
608         @Override
609         void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
610             Poller poller = (event == Net.POLLIN)
611                     ? readPoller(fdVal)
612                     : writePoller(fdVal);
613             poller.poll(fdVal, nanos, isOpen);
614         }
615 
616         @Override
617         int read(int fdVal, byte[] b, int off, int len, long nanos,
618                  BooleanSupplier isOpen) throws IOException {
619             return readPoller(fdVal).implRead(fdVal, b, off, len, nanos, isOpen);
620         }
621 
622         @Override
623         int write(int fdVal, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException {
624             return writePoller(fdVal).implWrite(fdVal, b, off, len, isOpen);
625         }
626 
627         @Override
628         void pollSelector(int fdVal, long nanos) throws IOException {
629             masterPoller.poll(fdVal, nanos, () -> true);
630         }
631 
632         @Override
633         Poller masterPoller() {
634             return masterPoller;
635         }
636 
637         @Override
638         List<Poller> readPollers() {
639             return List.of(readPollers);
640         }
641 
642         @Override
643         List<Poller> writePollers() {
644             return List.of(writePollers);
645         }
646     }
647 
648     /**
649      * POLLER_PER_CARRIER poller group. The read poller is a per-carrier virtual thread.
650      * When a virtual thread polls a file descriptor for POLLIN, then it will use (almost
651      * always, not guaranteed) the read poller for its carrier. When a read poller needs
652      * to block then it registers with a system-wide "master poller" that runs in a
653      * dedicated platform thread. The read poller terminates if the carrier terminates.
654      * The write pollers are system-wide platform threads (usually one).
655      */
656     private static class PollerPerCarrierPollerGroup extends PollerGroup {
657         private record CarrierPoller(PollerPerCarrierPollerGroup group, Poller readPoller) { }
658         private static final TerminatingThreadLocal<CarrierPoller> CARRIER_POLLER =
659             new TerminatingThreadLocal<>() {
660                 @Override
661                 protected void threadTerminated(CarrierPoller carrierPoller) {
662                     Poller readPoller = carrierPoller.readPoller();
663                     carrierPoller.group().carrierTerminated(readPoller);
664                 }
665             };
666 
667         private final Poller masterPoller;
668         private final Set<Poller> readPollers;
669         private final Poller[] writePollers;
670 
671         /**
672          * Create a PollerPerCarrierPollerGroup with the given number of write pollers.
673          */
674         PollerPerCarrierPollerGroup(PollerProvider provider,
675                                     int writePollerCount) throws IOException {
676             super(provider);
677             Poller masterPoller = provider.readPoller(false);
678             Poller[] writePollers = new Poller[writePollerCount];
679             try {
680                 for (int i = 0; i < writePollerCount; i++) {
681                     writePollers[i] = provider.writePoller(false);
682                 }
683             } catch (Throwable e) {
684                 masterPoller.close();
685                 closeAll(writePollers);
686                 throw e;
687             }
688             this.masterPoller = masterPoller;
689             this.readPollers = ConcurrentHashMap.newKeySet();;
690             this.writePollers = writePollers;
691         }
692 
693         @Override
694         void start() {
695             startPlatformThread("Master-Poller", masterPoller::pollerLoop);
696             Arrays.stream(writePollers).forEach(p -> {
697                 startPlatformThread("Write-Poller", p::pollerLoop);
698             });
699         }
700 
701         private Poller writePoller(int fdVal) {
702             int index = provider().fdValToIndex(fdVal, writePollers.length);
703             return writePollers[index];
704         }
705 
706         /**
707          * Starts a read sub-poller in a virtual thread.
708          */
709         private Poller startReadPoller() throws IOException {
710             assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
711 
712             // create read sub-poller
713             Poller readPoller = provider().readPoller(true);
714             readPollers.add(readPoller);
715 
716             // start virtual thread to execute sub-polling loop
717             Thread carrier = JLA.currentCarrierThread();
718             var scheduler = JLA.virtualThreadScheduler(Thread.currentThread());
719             @SuppressWarnings("restricted")
720             var _ = Thread.ofVirtual()
721                     .scheduler(scheduler)
722                     .inheritInheritableThreadLocals(false)
723                     .name(carrier.getName() + "-Read-Poller")
724                     .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
725                     .start(() -> subPollerLoop(readPoller));
726             return readPoller;
727         }
728 
729         /**
730          * Returns the read poller for the current carrier, starting it if required.
731          */
732         private Poller readPoller() throws IOException {
733             assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
734             Continuation.pin();
735             try {
736                 CarrierPoller carrierPoller = CARRIER_POLLER.get();
737                 if (carrierPoller != null) {
738                     return carrierPoller.readPoller();
739                 } else {
740                     // first poll on this carrier will start poller
741                     Poller readPoller = startReadPoller();
742                     CARRIER_POLLER.set(new CarrierPoller(this, readPoller));
743                     return readPoller;
744                 }
745             } finally {
746                 Continuation.unpin();
747             }
748         }
749 
750         @Override
751         void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
752             // for POLLIN, get the read poller for this carrier
753             if (event == Net.POLLIN
754                     && Thread.currentThread().isVirtual()
755                     && ContinuationSupport.isSupported()) {
756                 readPoller().poll(fdVal, nanos, isOpen);
757                 return;
758             }
759 
760             // -XX:-VMContinuations or POLLIN from platform thread does master poller
761             if (event == Net.POLLIN) {
762                 masterPoller.poll(fdVal, nanos, isOpen);
763             } else {
764                 writePoller(fdVal).poll(fdVal, nanos, isOpen);
765             }
766         }
767 
768         @Override
769         void pollSelector(int fdVal, long nanos) throws IOException {
770             masterPoller.poll(fdVal, nanos, () -> true);
771         }
772 
773         @Override
774         int read(int fdVal, byte[] b, int off, int len, long nanos,
775                  BooleanSupplier isOpen) throws IOException {
776             return readPoller().implRead(fdVal, b, off, len, nanos, isOpen);
777         }
778 
779         @Override
780         int write(int fdVal, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException {
781             return writePoller(fdVal).implWrite(fdVal, b, off, len, isOpen);
782         }
783 
784         /**
785          * Sub-poller polling loop.
786          */
787         private void subPollerLoop(Poller readPoller) {
788             try {
789                 readPoller.subPollerLoop(masterPoller);
790             } finally {
791                 // wakeup all threads waiting on file descriptors registered with the
792                 // read poller, these I/O operation will migrate to another carrier.
793                 readPoller.wakeupAll();
794 
795                 // remove from serviceability view
796                 readPollers.remove(readPoller);
797             }
798         }
799 
800         /**
801          * Invoked by the carrier thread before it terminates.
802          */
803         private void carrierTerminated(Poller readPoller) {
804             readPoller.setShutdown();
805             try {
806                 readPoller.wakeupPoller();
807             } catch (Throwable e) {
808                 e.printStackTrace();
809             }
810         }
811 
812         @Override
813         Poller masterPoller() {
814             return masterPoller;
815         }
816 
817         @Override
818         List<Poller> readPollers() {
819             return readPollers.stream().toList();
820         }
821 
822         @Override
823         List<Poller> writePollers() {
824             return List.of(writePollers);
825         }
826     }
827 
828     /**
829      * Reads the given property name to get the poller count. If the property is
830      * set then the value must be a power of 2. Returns 1 if the property is not
831      * set.
832      * @throws IllegalArgumentException if the property is set to a value that
833      * is not a power of 2.
834      */
835     private static int pollerCount(String propName, int defaultCount) {
836         String s = System.getProperty(propName);
837         int count = (s != null) ? Integer.parseInt(s) : defaultCount;
838 
839         // check power of 2
840         if (count != Integer.highestOneBit(count)) {
841             String msg = propName + " is set to a value that is not a power of 2";
842             throw new IllegalArgumentException(msg);
843         }
844         return count;
845     }
846 
847 
848     /**
849      * Returns true if read ops are supported in addition to POLLIN polling.
850      */
851     public static boolean supportReadOps() {
852         return POLLER_GROUP.supportReadOps();
853     }
854 
855     /**
856      * Returns true if write ops are supported in addition to POLLOUT polling.
857      */
858     public static boolean supportWriteOps() {
859         return POLLER_GROUP.supportWriteOps();
860     }
861 
862     /**
863      * Parks the current thread until bytes are read into a byte array.
864      * @param isOpen supplies a boolean to indicate if the enclosing object is open
865      * @return the number of bytes read (>0), EOF (-1), or UNAVAILABLE (-2) if unparked
866      * or the timeout expires while waiting for bytes to be read
867      * @throws UnsupportedOperationException if not supported
868      */
869     public static int read(int fdVal, byte[] b, int off, int len, long nanos,
870                            BooleanSupplier isOpen) throws IOException {
871         return POLLER_GROUP.read(fdVal, b, off, len, nanos, isOpen);
872     }
873 
874     /**
875      * Parks the current thread until bytes are written from a byte array.
876      * @param isOpen supplies a boolean to indicate if the enclosing object is open
877      * @return the number of bytes read (>0), EOF (-1), or UNAVAILABLE (-2) if unparked
878      * or the timeout expires while waiting for bytes to be read
879      * @throws UnsupportedOperationException if not supported
880      */
881     public static int write(int fdVal, byte[] b, int off, int len,
882                             BooleanSupplier isOpen) throws IOException {
883         return POLLER_GROUP.write(fdVal, b, off, len, isOpen);
884     }
885 
886     /**
887      * Parks the current thread until bytes are read a byte array. This method is
888      * overridden by poller implementations that support this operation.
889      */
890     int implRead(int fdVal, byte[] b, int off, int len, long nanos,
891                  BooleanSupplier isOpen) throws IOException {
892         throw new UnsupportedOperationException();
893     }
894 
895     /**
896      * Parks the current thread until bytes are written from a byte array. This
897      * method is overridden by poller implementations that support this operation.
898      */
899     int implWrite(int fdVal, byte[] b, int off, int len,
900                  BooleanSupplier isOpen) throws IOException {
901         throw new UnsupportedOperationException();
902     }
903 
904     /**
905      * Return the master poller or null if there is no master poller.
906      */
907     public static Poller masterPoller() {
908         return POLLER_GROUP.masterPoller();
909     }
910 
911     /**
912      * Return the list of read pollers.
913      */
914     public static List<Poller> readPollers() {
915         return POLLER_GROUP.readPollers();
916     }
917 
918     /**
919      * Return the list of write pollers.
920      */
921     public static List<Poller> writePollers() {
922         return POLLER_GROUP.writePollers();
923     }
924 }