< prev index next >

src/java.base/share/classes/sun/nio/ch/Poller.java

Print this page

 31 import java.util.List;
 32 import java.util.Map;
 33 import java.util.Objects;
 34 import java.util.Set;
 35 import java.util.concurrent.ConcurrentHashMap;
 36 import java.util.concurrent.Executor;
 37 import java.util.concurrent.Executors;
 38 import java.util.concurrent.ThreadFactory;
 39 import java.util.concurrent.locks.LockSupport;
 40 import java.util.function.BooleanSupplier;
 41 import jdk.internal.access.JavaLangAccess;
 42 import jdk.internal.access.SharedSecrets;
 43 import jdk.internal.misc.InnocuousThread;
 44 import jdk.internal.misc.TerminatingThreadLocal;
 45 import jdk.internal.vm.Continuation;
 46 import jdk.internal.vm.ContinuationSupport;
 47 import jdk.internal.vm.annotation.Stable;
 48 
 49 /**
 50  * I/O poller to allow virtual threads park until a file descriptor is ready for I/O.


 51  */
 52 public abstract class Poller {
 53     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 54 
 55     // the poller group for the I/O pollers and poller threads
 56     private static final PollerGroup POLLER_GROUP = createPollerGroup();
 57 
 58     // the poller or sub-poller thread (used for observability only)
 59     private @Stable Thread owner;
 60 
 61     // maps file descriptors to parked Thread
 62     private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
 63 
 64     // shutdown (if supported by poller group)
 65     private volatile boolean shutdown;
 66 
 67     /**
 68      * Poller mode.
 69      */
 70     enum Mode {

152      */
153     boolean isShutdown() {
154         return shutdown;
155     }
156 
157     /**
158      * Marks this poller for shutdown.
159      */
160     private void setShutdown() {
161         shutdown = true;
162     }
163 
164     /**
165      * Returns the poller's file descriptor to use when polling with the master poller.
166      * @throws UnsupportedOperationException if not supported
167      */
168     int fdVal() {
169         throw new UnsupportedOperationException();
170     }
171 






172     /**
173      * Register the file descriptor with the I/O event management facility so that it is
174      * polled when the file descriptor is ready for I/O. The registration is "one shot",
175      * meaning it should be polled at most once.
176      */
177     abstract void implStartPoll(int fdVal) throws IOException;
178 
179     /**
180      * Deregister a file descriptor from the I/O event management facility. This may be
181      * a no-op in some implementations when the file descriptor has already been polled.
182      * @param polled true if the file descriptor has already been polled
183      */
184     abstract void implStopPoll(int fdVal, boolean polled) throws IOException;
185 
186     /**
187      * Poll for events. The {@link #polled(int)} method is invoked for each
188      * polled file descriptor.
189      *
190      * @param timeout if positive then block for up to {@code timeout} milliseconds,
191      *     if zero then don't block, if -1 then block indefinitely

302         }
303     }
304 
305     /**
306      * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
307      * descriptor that is polled.
308      *
309      * The sub-poller registers its file descriptor with the master poller to park until
310      * there are events to poll. When unparked, it does non-blocking polls and parks
311      * again when there are no more events. The sub-poller yields after each poll to help
312      * with fairness and to avoid re-registering with the master poller where possible.
313      */
314     private void subPollerLoop(Poller masterPoller) {
315         assert Thread.currentThread().isVirtual();
316         setOwner();
317         try {
318             int polled = 0;
319             while (!isShutdown()) {
320                 if (polled == 0) {
321                     masterPoller.poll(fdVal(), 0, () -> true);  // park

322                 } else {
323                     Thread.yield();
324                 }
325                 polled = poll(0);
326             }
327         } catch (Exception e) {
328             e.printStackTrace();
329         }
330     }
331 
332     /**
333      * Unparks all threads waiting on a file descriptor registered with this poller.
334      */
335     private void wakeupAll() {
336         map.values().forEach(LockSupport::unpark);
337     }
338 
339     @Override
340     public String toString() {
341         return String.format("%s [registered = %d, owner = %s]",

393          */
394         abstract List<Poller> readPollers();
395 
396         /**
397          * Return the write pollers.
398          */
399         abstract List<Poller> writePollers();
400 
401         /**
402          * Close the given pollers.
403          */
404         static void closeAll(Poller... pollers) {
405             for (Poller poller : pollers) {
406                 if (poller != null) {
407                     try {
408                         poller.close();
409                     } catch (IOException _) { }
410                 }
411             }
412         }






























413     }
414 
415     /**
416      * SYSTEM_THREADS poller group. The read and write pollers are system-wide platform threads.
417      */
418     private static class SystemThreadsPollerGroup extends PollerGroup {
419         // system-wide read and write pollers
420         private final Poller[] readPollers;
421         private final Poller[] writePollers;
422 
423         SystemThreadsPollerGroup(PollerProvider provider,
424                                  int readPollerCount,
425                                  int writePollerCount) throws IOException {
426             super(provider);
427             Poller[] readPollers = new Poller[readPollerCount];
428             Poller[] writePollers = new Poller[writePollerCount];
429             try {
430                 for (int i = 0; i < readPollerCount; i++) {
431                     readPollers[i] = provider.readPoller(false);
432                 }

454         }
455 
456         private Poller readPoller(int fdVal) {
457             int index = provider().fdValToIndex(fdVal, readPollers.length);
458             return readPollers[index];
459         }
460 
461         private Poller writePoller(int fdVal) {
462             int index = provider().fdValToIndex(fdVal, writePollers.length);
463             return writePollers[index];
464         }
465 
466         @Override
467         void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
468             Poller poller = (event == Net.POLLIN)
469                     ? readPoller(fdVal)
470                     : writePoller(fdVal);
471             poller.poll(fdVal, nanos, isOpen);
472         }
473 











474         @Override
475         Poller masterPoller() {
476             return null;
477         }
478 
479         @Override
480         List<Poller> readPollers() {
481             return List.of(readPollers);
482         }
483 
484         @Override
485         List<Poller> writePollers() {
486             return List.of(writePollers);
487         }
488     }
489 
490     /**
491      * VTHREAD_POLLERS poller group. The read and write pollers are virtual threads.
492      * When read and write pollers need to block then they register with a system-wide
493      * "master poller" that runs in a dedicated platform thread.

546         }
547 
548         private Poller readPoller(int fdVal) {
549             int index = provider().fdValToIndex(fdVal, readPollers.length);
550             return readPollers[index];
551         }
552 
553         private Poller writePoller(int fdVal) {
554             int index = provider().fdValToIndex(fdVal, writePollers.length);
555             return writePollers[index];
556         }
557 
558         @Override
559         void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
560             Poller poller = (event == Net.POLLIN)
561                     ? readPoller(fdVal)
562                     : writePoller(fdVal);
563             poller.poll(fdVal, nanos, isOpen);
564         }
565 











566         @Override
567         void pollSelector(int fdVal, long nanos) throws IOException {
568             masterPoller.poll(fdVal, nanos, () -> true);
569         }
570 
571         @Override
572         Poller masterPoller() {
573             return masterPoller;
574         }
575 
576         @Override
577         List<Poller> readPollers() {
578             return List.of(readPollers);
579         }
580 
581         @Override
582         List<Poller> writePollers() {
583             return List.of(writePollers);
584         }
585     }

637             });
638         }
639 
640         private Poller writePoller(int fdVal) {
641             int index = provider().fdValToIndex(fdVal, writePollers.length);
642             return writePollers[index];
643         }
644 
645         /**
646          * Starts a read sub-poller in a virtual thread.
647          */
648         private Poller startReadPoller() throws IOException {
649             assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
650 
651             // create read sub-poller
652             Poller readPoller = provider().readPoller(true);
653             readPollers.add(readPoller);
654 
655             // start virtual thread to execute sub-polling loop
656             Thread carrier = JLA.currentCarrierThread();
657             Thread.ofVirtual()
658                     .inheritInheritableThreadLocals(false)
659                     .name(carrier.getName() + "-Read-Poller")
660                     .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
661                     .start(() -> subPollerLoop(readPoller));



662             return readPoller;
663         }
664 
665         /**
666          * Returns the read poller for the current carrier, starting it if required.
667          */
668         private Poller readPoller() throws IOException {
669             assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
670             Continuation.pin();
671             try {
672                 CarrierPoller carrierPoller = CARRIER_POLLER.get();
673                 if (carrierPoller != null) {
674                     return carrierPoller.readPoller();
675                 } else {
676                     // first poll on this carrier will start poller
677                     Poller readPoller = startReadPoller();
678                     CARRIER_POLLER.set(new CarrierPoller(this, readPoller));
679                     return readPoller;
680                 }
681             } finally {

689             if (event == Net.POLLIN
690                     && Thread.currentThread().isVirtual()
691                     && ContinuationSupport.isSupported()) {
692                 readPoller().poll(fdVal, nanos, isOpen);
693                 return;
694             }
695 
696             // -XX:-VMContinuations or POLLIN from platform thread does master poller
697             if (event == Net.POLLIN) {
698                 masterPoller.poll(fdVal, nanos, isOpen);
699             } else {
700                 writePoller(fdVal).poll(fdVal, nanos, isOpen);
701             }
702         }
703 
704         @Override
705         void pollSelector(int fdVal, long nanos) throws IOException {
706             masterPoller.poll(fdVal, nanos, () -> true);
707         }
708 











709         /**
710          * Sub-poller polling loop.
711          */
712         private void subPollerLoop(Poller readPoller) {
713             try {
714                 readPoller.subPollerLoop(masterPoller);
715             } finally {
716                 // wakeup all threads waiting on file descriptors registered with the
717                 // read poller, these I/O operation will migrate to another carrier.
718                 readPoller.wakeupAll();
719 
720                 // remove from serviceability view
721                 readPollers.remove(readPoller);
722             }
723         }
724 
725         /**
726          * Invoked by the carrier thread before it terminates.
727          */
728         private void carrierTerminated(Poller readPoller) {

752 
753     /**
754      * Reads the given property name to get the poller count. If the property is
755      * set then the value must be a power of 2. Returns 1 if the property is not
756      * set.
757      * @throws IllegalArgumentException if the property is set to a value that
758      * is not a power of 2.
759      */
760     private static int pollerCount(String propName, int defaultCount) {
761         String s = System.getProperty(propName);
762         int count = (s != null) ? Integer.parseInt(s) : defaultCount;
763 
764         // check power of 2
765         if (count != Integer.highestOneBit(count)) {
766             String msg = propName + " is set to a value that is not a power of 2";
767             throw new IllegalArgumentException(msg);
768         }
769         return count;
770     }
771 

























































772     /**
773      * Return the master poller or null if there is no master poller.
774      */
775     public static Poller masterPoller() {
776         return POLLER_GROUP.masterPoller();
777     }
778 
779     /**
780      * Return the list of read pollers.
781      */
782     public static List<Poller> readPollers() {
783         return POLLER_GROUP.readPollers();
784     }
785 
786     /**
787      * Return the list of write pollers.
788      */
789     public static List<Poller> writePollers() {
790         return POLLER_GROUP.writePollers();
791     }

 31 import java.util.List;
 32 import java.util.Map;
 33 import java.util.Objects;
 34 import java.util.Set;
 35 import java.util.concurrent.ConcurrentHashMap;
 36 import java.util.concurrent.Executor;
 37 import java.util.concurrent.Executors;
 38 import java.util.concurrent.ThreadFactory;
 39 import java.util.concurrent.locks.LockSupport;
 40 import java.util.function.BooleanSupplier;
 41 import jdk.internal.access.JavaLangAccess;
 42 import jdk.internal.access.SharedSecrets;
 43 import jdk.internal.misc.InnocuousThread;
 44 import jdk.internal.misc.TerminatingThreadLocal;
 45 import jdk.internal.vm.Continuation;
 46 import jdk.internal.vm.ContinuationSupport;
 47 import jdk.internal.vm.annotation.Stable;
 48 
 49 /**
 50  * I/O poller to allow virtual threads park until a file descriptor is ready for I/O.
 51  * Implementations also optionally support read/write operations where virtual threads
 52  * park until bytes are read or written.
 53  */
 54 public abstract class Poller {
 55     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 56 
 57     // the poller group for the I/O pollers and poller threads
 58     private static final PollerGroup POLLER_GROUP = createPollerGroup();
 59 
 60     // the poller or sub-poller thread (used for observability only)
 61     private @Stable Thread owner;
 62 
 63     // maps file descriptors to parked Thread
 64     private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
 65 
 66     // shutdown (if supported by poller group)
 67     private volatile boolean shutdown;
 68 
 69     /**
 70      * Poller mode.
 71      */
 72     enum Mode {

154      */
155     boolean isShutdown() {
156         return shutdown;
157     }
158 
159     /**
160      * Marks this poller for shutdown.
161      */
162     private void setShutdown() {
163         shutdown = true;
164     }
165 
166     /**
167      * Returns the poller's file descriptor to use when polling with the master poller.
168      * @throws UnsupportedOperationException if not supported
169      */
170     int fdVal() {
171         throw new UnsupportedOperationException();
172     }
173 
174     /**
175      * Invoked if when this poller's file descriptor is polled by the master poller.
176      */
177     void pollerPolled() throws IOException {
178     }
179 
180     /**
181      * Register the file descriptor with the I/O event management facility so that it is
182      * polled when the file descriptor is ready for I/O. The registration is "one shot",
183      * meaning it should be polled at most once.
184      */
185     abstract void implStartPoll(int fdVal) throws IOException;
186 
187     /**
188      * Deregister a file descriptor from the I/O event management facility. This may be
189      * a no-op in some implementations when the file descriptor has already been polled.
190      * @param polled true if the file descriptor has already been polled
191      */
192     abstract void implStopPoll(int fdVal, boolean polled) throws IOException;
193 
194     /**
195      * Poll for events. The {@link #polled(int)} method is invoked for each
196      * polled file descriptor.
197      *
198      * @param timeout if positive then block for up to {@code timeout} milliseconds,
199      *     if zero then don't block, if -1 then block indefinitely

310         }
311     }
312 
313     /**
314      * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
315      * descriptor that is polled.
316      *
317      * The sub-poller registers its file descriptor with the master poller to park until
318      * there are events to poll. When unparked, it does non-blocking polls and parks
319      * again when there are no more events. The sub-poller yields after each poll to help
320      * with fairness and to avoid re-registering with the master poller where possible.
321      */
322     private void subPollerLoop(Poller masterPoller) {
323         assert Thread.currentThread().isVirtual();
324         setOwner();
325         try {
326             int polled = 0;
327             while (!isShutdown()) {
328                 if (polled == 0) {
329                     masterPoller.poll(fdVal(), 0, () -> true);  // park
330                     pollerPolled();
331                 } else {
332                     Thread.yield();
333                 }
334                 polled = poll(0);
335             }
336         } catch (Exception e) {
337             e.printStackTrace();
338         }
339     }
340 
341     /**
342      * Unparks all threads waiting on a file descriptor registered with this poller.
343      */
344     private void wakeupAll() {
345         map.values().forEach(LockSupport::unpark);
346     }
347 
348     @Override
349     public String toString() {
350         return String.format("%s [registered = %d, owner = %s]",

402          */
403         abstract List<Poller> readPollers();
404 
405         /**
406          * Return the write pollers.
407          */
408         abstract List<Poller> writePollers();
409 
410         /**
411          * Close the given pollers.
412          */
413         static void closeAll(Poller... pollers) {
414             for (Poller poller : pollers) {
415                 if (poller != null) {
416                     try {
417                         poller.close();
418                     } catch (IOException _) { }
419                 }
420             }
421         }
422 
423         /**
424          * Returns true if the read pollers in this poller group support read ops in
425          * addition to POLLIN polling.
426          */
427         boolean supportReadOps() {
428             return provider().supportReadOps();
429         }
430 
431         /**
432          * Reads bytes into a byte array.
433          * @throws UnsupportedOperationException if not supported
434          */
435         abstract int read(int fdVal, byte[] b, int off, int len, long nanos,
436                           BooleanSupplier isOpen) throws IOException;
437 
438         /**
439          * Returns true if the write pollers in this poller group support write ops in
440          * addition to POLLOUT polling.
441          */
442         boolean supportWriteOps() {
443             return provider().supportWriteOps();
444         }
445 
446         /**
447          * Write bytes from a byte array.
448          * @throws UnsupportedOperationException if not supported
449          */
450         abstract int write(int fdVal, byte[] b, int off, int len,
451                            BooleanSupplier isOpen) throws IOException;
452     }
453 
454     /**
455      * SYSTEM_THREADS poller group. The read and write pollers are system-wide platform threads.
456      */
457     private static class SystemThreadsPollerGroup extends PollerGroup {
458         // system-wide read and write pollers
459         private final Poller[] readPollers;
460         private final Poller[] writePollers;
461 
462         SystemThreadsPollerGroup(PollerProvider provider,
463                                  int readPollerCount,
464                                  int writePollerCount) throws IOException {
465             super(provider);
466             Poller[] readPollers = new Poller[readPollerCount];
467             Poller[] writePollers = new Poller[writePollerCount];
468             try {
469                 for (int i = 0; i < readPollerCount; i++) {
470                     readPollers[i] = provider.readPoller(false);
471                 }

493         }
494 
495         private Poller readPoller(int fdVal) {
496             int index = provider().fdValToIndex(fdVal, readPollers.length);
497             return readPollers[index];
498         }
499 
500         private Poller writePoller(int fdVal) {
501             int index = provider().fdValToIndex(fdVal, writePollers.length);
502             return writePollers[index];
503         }
504 
505         @Override
506         void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
507             Poller poller = (event == Net.POLLIN)
508                     ? readPoller(fdVal)
509                     : writePoller(fdVal);
510             poller.poll(fdVal, nanos, isOpen);
511         }
512 
513         @Override
514         int read(int fdVal, byte[] b, int off, int len, long nanos,
515                  BooleanSupplier isOpen) throws IOException {
516             return readPoller(fdVal).implRead(fdVal, b, off, len, nanos, isOpen);
517         }
518 
519         @Override
520         int write(int fdVal, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException {
521             return writePoller(fdVal).implWrite(fdVal, b, off, len, isOpen);
522         }
523 
524         @Override
525         Poller masterPoller() {
526             return null;
527         }
528 
529         @Override
530         List<Poller> readPollers() {
531             return List.of(readPollers);
532         }
533 
534         @Override
535         List<Poller> writePollers() {
536             return List.of(writePollers);
537         }
538     }
539 
540     /**
541      * VTHREAD_POLLERS poller group. The read and write pollers are virtual threads.
542      * When read and write pollers need to block then they register with a system-wide
543      * "master poller" that runs in a dedicated platform thread.

596         }
597 
598         private Poller readPoller(int fdVal) {
599             int index = provider().fdValToIndex(fdVal, readPollers.length);
600             return readPollers[index];
601         }
602 
603         private Poller writePoller(int fdVal) {
604             int index = provider().fdValToIndex(fdVal, writePollers.length);
605             return writePollers[index];
606         }
607 
608         @Override
609         void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
610             Poller poller = (event == Net.POLLIN)
611                     ? readPoller(fdVal)
612                     : writePoller(fdVal);
613             poller.poll(fdVal, nanos, isOpen);
614         }
615 
616         @Override
617         int read(int fdVal, byte[] b, int off, int len, long nanos,
618                  BooleanSupplier isOpen) throws IOException {
619             return readPoller(fdVal).implRead(fdVal, b, off, len, nanos, isOpen);
620         }
621 
622         @Override
623         int write(int fdVal, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException {
624             return writePoller(fdVal).implWrite(fdVal, b, off, len, isOpen);
625         }
626 
627         @Override
628         void pollSelector(int fdVal, long nanos) throws IOException {
629             masterPoller.poll(fdVal, nanos, () -> true);
630         }
631 
632         @Override
633         Poller masterPoller() {
634             return masterPoller;
635         }
636 
637         @Override
638         List<Poller> readPollers() {
639             return List.of(readPollers);
640         }
641 
642         @Override
643         List<Poller> writePollers() {
644             return List.of(writePollers);
645         }
646     }

698             });
699         }
700 
701         private Poller writePoller(int fdVal) {
702             int index = provider().fdValToIndex(fdVal, writePollers.length);
703             return writePollers[index];
704         }
705 
706         /**
707          * Starts a read sub-poller in a virtual thread.
708          */
709         private Poller startReadPoller() throws IOException {
710             assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
711 
712             // create read sub-poller
713             Poller readPoller = provider().readPoller(true);
714             readPollers.add(readPoller);
715 
716             // start virtual thread to execute sub-polling loop
717             Thread carrier = JLA.currentCarrierThread();
718             Thread.Builder.OfVirtual builder = Thread.ofVirtual()
719                     .inheritInheritableThreadLocals(false)
720                     .name(carrier.getName() + "-Read-Poller")
721                     .uncaughtExceptionHandler((_, e) -> e.printStackTrace());
722             Thread thread = JLA.defaultVirtualThreadScheduler()
723                     .newThread(builder, carrier, () -> subPollerLoop(readPoller))
724                     .thread();
725             thread.start();
726             return readPoller;
727         }
728 
729         /**
730          * Returns the read poller for the current carrier, starting it if required.
731          */
732         private Poller readPoller() throws IOException {
733             assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
734             Continuation.pin();
735             try {
736                 CarrierPoller carrierPoller = CARRIER_POLLER.get();
737                 if (carrierPoller != null) {
738                     return carrierPoller.readPoller();
739                 } else {
740                     // first poll on this carrier will start poller
741                     Poller readPoller = startReadPoller();
742                     CARRIER_POLLER.set(new CarrierPoller(this, readPoller));
743                     return readPoller;
744                 }
745             } finally {

753             if (event == Net.POLLIN
754                     && Thread.currentThread().isVirtual()
755                     && ContinuationSupport.isSupported()) {
756                 readPoller().poll(fdVal, nanos, isOpen);
757                 return;
758             }
759 
760             // -XX:-VMContinuations or POLLIN from platform thread does master poller
761             if (event == Net.POLLIN) {
762                 masterPoller.poll(fdVal, nanos, isOpen);
763             } else {
764                 writePoller(fdVal).poll(fdVal, nanos, isOpen);
765             }
766         }
767 
768         @Override
769         void pollSelector(int fdVal, long nanos) throws IOException {
770             masterPoller.poll(fdVal, nanos, () -> true);
771         }
772 
773         @Override
774         int read(int fdVal, byte[] b, int off, int len, long nanos,
775                  BooleanSupplier isOpen) throws IOException {
776             return readPoller().implRead(fdVal, b, off, len, nanos, isOpen);
777         }
778 
779         @Override
780         int write(int fdVal, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException {
781             return writePoller(fdVal).implWrite(fdVal, b, off, len, isOpen);
782         }
783 
784         /**
785          * Sub-poller polling loop.
786          */
787         private void subPollerLoop(Poller readPoller) {
788             try {
789                 readPoller.subPollerLoop(masterPoller);
790             } finally {
791                 // wakeup all threads waiting on file descriptors registered with the
792                 // read poller, these I/O operation will migrate to another carrier.
793                 readPoller.wakeupAll();
794 
795                 // remove from serviceability view
796                 readPollers.remove(readPoller);
797             }
798         }
799 
800         /**
801          * Invoked by the carrier thread before it terminates.
802          */
803         private void carrierTerminated(Poller readPoller) {

827 
828     /**
829      * Reads the given property name to get the poller count. If the property is
830      * set then the value must be a power of 2. Returns 1 if the property is not
831      * set.
832      * @throws IllegalArgumentException if the property is set to a value that
833      * is not a power of 2.
834      */
835     private static int pollerCount(String propName, int defaultCount) {
836         String s = System.getProperty(propName);
837         int count = (s != null) ? Integer.parseInt(s) : defaultCount;
838 
839         // check power of 2
840         if (count != Integer.highestOneBit(count)) {
841             String msg = propName + " is set to a value that is not a power of 2";
842             throw new IllegalArgumentException(msg);
843         }
844         return count;
845     }
846 
847 
848     /**
849      * Returns true if read ops are supported in addition to POLLIN polling.
850      */
851     public static boolean supportReadOps() {
852         return POLLER_GROUP.supportReadOps();
853     }
854 
855     /**
856      * Returns true if write ops are supported in addition to POLLOUT polling.
857      */
858     public static boolean supportWriteOps() {
859         return POLLER_GROUP.supportWriteOps();
860     }
861 
862     /**
863      * Parks the current thread until bytes are read into a byte array.
864      * @param isOpen supplies a boolean to indicate if the enclosing object is open
865      * @return the number of bytes read (>0), EOF (-1), or UNAVAILABLE (-2) if unparked
866      * or the timeout expires while waiting for bytes to be read
867      * @throws UnsupportedOperationException if not supported
868      */
869     public static int read(int fdVal, byte[] b, int off, int len, long nanos,
870                            BooleanSupplier isOpen) throws IOException {
871         return POLLER_GROUP.read(fdVal, b, off, len, nanos, isOpen);
872     }
873 
874     /**
875      * Parks the current thread until bytes are written from a byte array.
876      * @param isOpen supplies a boolean to indicate if the enclosing object is open
877      * @return the number of bytes read (>0), EOF (-1), or UNAVAILABLE (-2) if unparked
878      * or the timeout expires while waiting for bytes to be read
879      * @throws UnsupportedOperationException if not supported
880      */
881     public static int write(int fdVal, byte[] b, int off, int len,
882                             BooleanSupplier isOpen) throws IOException {
883         return POLLER_GROUP.write(fdVal, b, off, len, isOpen);
884     }
885 
886     /**
887      * Parks the current thread until bytes are read a byte array. This method is
888      * overridden by poller implementations that support this operation.
889      */
890     int implRead(int fdVal, byte[] b, int off, int len, long nanos,
891                  BooleanSupplier isOpen) throws IOException {
892         throw new UnsupportedOperationException();
893     }
894 
895     /**
896      * Parks the current thread until bytes are written from a byte array. This
897      * method is overridden by poller implementations that support this operation.
898      */
899     int implWrite(int fdVal, byte[] b, int off, int len,
900                  BooleanSupplier isOpen) throws IOException {
901         throw new UnsupportedOperationException();
902     }
903 
904     /**
905      * Return the master poller or null if there is no master poller.
906      */
907     public static Poller masterPoller() {
908         return POLLER_GROUP.masterPoller();
909     }
910 
911     /**
912      * Return the list of read pollers.
913      */
914     public static List<Poller> readPollers() {
915         return POLLER_GROUP.readPollers();
916     }
917 
918     /**
919      * Return the list of write pollers.
920      */
921     public static List<Poller> writePollers() {
922         return POLLER_GROUP.writePollers();
923     }
< prev index next >