< prev index next >

src/java.base/share/classes/sun/nio/ch/Poller.java

Print this page

190      * @param timeout if positive then block for up to {@code timeout} milliseconds,
191      *     if zero then don't block, if -1 then block indefinitely
192      * @return >0 if file descriptors are polled, 0 if no file descriptor polled
193      */
194     abstract int poll(int timeout) throws IOException;
195 
196     /**
197      * Wakeup the poller thread if blocked in poll so it can shutdown.
198      * @throws UnsupportedOperationException if not supported
199      */
200     void wakeupPoller() throws IOException {
201         throw new UnsupportedOperationException();
202     }
203 
204     /**
205      * Callback by the poll method when a file descriptor is polled.
206      */
207     final void polled(int fdVal) {
208         Thread t = map.remove(fdVal);
209         if (t != null) {
210             LockSupport.unpark(t);




211         }
212     }
213 
214     /**
215      * Parks the current thread until a file descriptor is ready for the given op.
216      * @param fdVal the file descriptor
217      * @param event POLLIN or POLLOUT
218      * @param nanos the waiting time or 0 to wait indefinitely
219      * @param isOpen supplies a boolean to indicate if the enclosing object is open
220      */
221     public static void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
222         POLLER_GROUP.poll(fdVal, event, nanos, isOpen);
223     }
224 
225     /**
226      * Parks the current thread until a Selector's file descriptor is ready.
227      * @param fdVal the Selector's file descriptor
228      * @param nanos the waiting time or 0 to wait indefinitely
229      */
230     public static void pollSelector(int fdVal, long nanos) throws IOException {

381             thread.setDaemon(true);
382             thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
383             thread.start();
384         }
385 
386         /**
387          * Return the master poller, or null if no master poller.
388          */
389         abstract Poller masterPoller();
390 
391         /**
392          * Return the read pollers.
393          */
394         abstract List<Poller> readPollers();
395 
396         /**
397          * Return the write pollers.
398          */
399         abstract List<Poller> writePollers();
400 







401         /**
402          * Close the given pollers.
403          */
404         static void closeAll(Poller... pollers) {
405             for (Poller poller : pollers) {
406                 if (poller != null) {
407                     try {
408                         poller.close();
409                     } catch (IOException _) { }
410                 }
411             }
412         }
413     }
414 
415     /**
416      * SYSTEM_THREADS poller group. The read and write pollers are system-wide platform threads.
417      */
418     private static class SystemThreadsPollerGroup extends PollerGroup {
419         // system-wide read and write pollers
420         private final Poller[] readPollers;

637             });
638         }
639 
640         private Poller writePoller(int fdVal) {
641             int index = provider().fdValToIndex(fdVal, writePollers.length);
642             return writePollers[index];
643         }
644 
645         /**
646          * Starts a read sub-poller in a virtual thread.
647          */
648         private Poller startReadPoller() throws IOException {
649             assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
650 
651             // create read sub-poller
652             Poller readPoller = provider().readPoller(true);
653             readPollers.add(readPoller);
654 
655             // start virtual thread to execute sub-polling loop
656             Thread carrier = JLA.currentCarrierThread();
657             Thread.ofVirtual()
658                     .inheritInheritableThreadLocals(false)
659                     .name(carrier.getName() + "-Read-Poller")
660                     .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
661                     .start(() -> subPollerLoop(readPoller));



662             return readPoller;
663         }
664 
665         /**
666          * Returns the read poller for the current carrier, starting it if required.
667          */
668         private Poller readPoller() throws IOException {
669             assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
670             Continuation.pin();
671             try {
672                 CarrierPoller carrierPoller = CARRIER_POLLER.get();
673                 if (carrierPoller != null) {
674                     return carrierPoller.readPoller();
675                 } else {
676                     // first poll on this carrier will start poller
677                     Poller readPoller = startReadPoller();
678                     CARRIER_POLLER.set(new CarrierPoller(this, readPoller));
679                     return readPoller;
680                 }
681             } finally {

731                 readPoller.wakeupPoller();
732             } catch (Throwable e) {
733                 e.printStackTrace();
734             }
735         }
736 
737         @Override
738         Poller masterPoller() {
739             return masterPoller;
740         }
741 
742         @Override
743         List<Poller> readPollers() {
744             return readPollers.stream().toList();
745         }
746 
747         @Override
748         List<Poller> writePollers() {
749             return List.of(writePollers);
750         }





751     }
752 
753     /**
754      * Reads the given property name to get the poller count. If the property is
755      * set then the value must be a power of 2. Returns 1 if the property is not
756      * set.
757      * @throws IllegalArgumentException if the property is set to a value that
758      * is not a power of 2.
759      */
760     private static int pollerCount(String propName, int defaultCount) {
761         String s = System.getProperty(propName);
762         int count = (s != null) ? Integer.parseInt(s) : defaultCount;
763 
764         // check power of 2
765         if (count != Integer.highestOneBit(count)) {
766             String msg = propName + " is set to a value that is not a power of 2";
767             throw new IllegalArgumentException(msg);
768         }
769         return count;
770     }

190      * @param timeout if positive then block for up to {@code timeout} milliseconds,
191      *     if zero then don't block, if -1 then block indefinitely
192      * @return >0 if file descriptors are polled, 0 if no file descriptor polled
193      */
194     abstract int poll(int timeout) throws IOException;
195 
196     /**
197      * Wakeup the poller thread if blocked in poll so it can shutdown.
198      * @throws UnsupportedOperationException if not supported
199      */
200     void wakeupPoller() throws IOException {
201         throw new UnsupportedOperationException();
202     }
203 
204     /**
205      * Callback by the poll method when a file descriptor is polled.
206      */
207     final void polled(int fdVal) {
208         Thread t = map.remove(fdVal);
209         if (t != null) {
210             if (POLLER_GROUP.useLazyUnpark() && Thread.currentThread().isVirtual()) {
211                 JLA.lazyUnparkVirtualThread(t);
212             } else {
213                 LockSupport.unpark(t);
214             }
215         }
216     }
217 
218     /**
219      * Parks the current thread until a file descriptor is ready for the given op.
220      * @param fdVal the file descriptor
221      * @param event POLLIN or POLLOUT
222      * @param nanos the waiting time or 0 to wait indefinitely
223      * @param isOpen supplies a boolean to indicate if the enclosing object is open
224      */
225     public static void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
226         POLLER_GROUP.poll(fdVal, event, nanos, isOpen);
227     }
228 
229     /**
230      * Parks the current thread until a Selector's file descriptor is ready.
231      * @param fdVal the Selector's file descriptor
232      * @param nanos the waiting time or 0 to wait indefinitely
233      */
234     public static void pollSelector(int fdVal, long nanos) throws IOException {

385             thread.setDaemon(true);
386             thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
387             thread.start();
388         }
389 
390         /**
391          * Return the master poller, or null if no master poller.
392          */
393         abstract Poller masterPoller();
394 
395         /**
396          * Return the read pollers.
397          */
398         abstract List<Poller> readPollers();
399 
400         /**
401          * Return the write pollers.
402          */
403         abstract List<Poller> writePollers();
404 
405         /**
406          * Return true if the unparking threads should use lazyUnpark.
407          */
408         boolean useLazyUnpark() {
409             return false;
410         }
411 
412         /**
413          * Close the given pollers.
414          */
415         static void closeAll(Poller... pollers) {
416             for (Poller poller : pollers) {
417                 if (poller != null) {
418                     try {
419                         poller.close();
420                     } catch (IOException _) { }
421                 }
422             }
423         }
424     }
425 
426     /**
427      * SYSTEM_THREADS poller group. The read and write pollers are system-wide platform threads.
428      */
429     private static class SystemThreadsPollerGroup extends PollerGroup {
430         // system-wide read and write pollers
431         private final Poller[] readPollers;

648             });
649         }
650 
651         private Poller writePoller(int fdVal) {
652             int index = provider().fdValToIndex(fdVal, writePollers.length);
653             return writePollers[index];
654         }
655 
656         /**
657          * Starts a read sub-poller in a virtual thread.
658          */
659         private Poller startReadPoller() throws IOException {
660             assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
661 
662             // create read sub-poller
663             Poller readPoller = provider().readPoller(true);
664             readPollers.add(readPoller);
665 
666             // start virtual thread to execute sub-polling loop
667             Thread carrier = JLA.currentCarrierThread();
668             Thread.Builder.OfVirtual builder = Thread.ofVirtual()
669                     .inheritInheritableThreadLocals(false)
670                     .name(carrier.getName() + "-Read-Poller")
671                     .uncaughtExceptionHandler((_, e) -> e.printStackTrace());
672             Thread thread = JLA.defaultVirtualThreadScheduler()
673                     .newThread(builder, carrier, () -> subPollerLoop(readPoller))
674                     .thread();
675             thread.start();
676             return readPoller;
677         }
678 
679         /**
680          * Returns the read poller for the current carrier, starting it if required.
681          */
682         private Poller readPoller() throws IOException {
683             assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
684             Continuation.pin();
685             try {
686                 CarrierPoller carrierPoller = CARRIER_POLLER.get();
687                 if (carrierPoller != null) {
688                     return carrierPoller.readPoller();
689                 } else {
690                     // first poll on this carrier will start poller
691                     Poller readPoller = startReadPoller();
692                     CARRIER_POLLER.set(new CarrierPoller(this, readPoller));
693                     return readPoller;
694                 }
695             } finally {

745                 readPoller.wakeupPoller();
746             } catch (Throwable e) {
747                 e.printStackTrace();
748             }
749         }
750 
751         @Override
752         Poller masterPoller() {
753             return masterPoller;
754         }
755 
756         @Override
757         List<Poller> readPollers() {
758             return readPollers.stream().toList();
759         }
760 
761         @Override
762         List<Poller> writePollers() {
763             return List.of(writePollers);
764         }
765 
766         @Override
767         boolean useLazyUnpark() {
768             return true;
769         }
770     }
771 
772     /**
773      * Reads the given property name to get the poller count. If the property is
774      * set then the value must be a power of 2. Returns 1 if the property is not
775      * set.
776      * @throws IllegalArgumentException if the property is set to a value that
777      * is not a power of 2.
778      */
779     private static int pollerCount(String propName, int defaultCount) {
780         String s = System.getProperty(propName);
781         int count = (s != null) ? Integer.parseInt(s) : defaultCount;
782 
783         // check power of 2
784         if (count != Integer.highestOneBit(count)) {
785             String msg = propName + " is set to a value that is not a power of 2";
786             throw new IllegalArgumentException(msg);
787         }
788         return count;
789     }
< prev index next >