190 * @param timeout if positive then block for up to {@code timeout} milliseconds,
191 * if zero then don't block, if -1 then block indefinitely
192 * @return >0 if file descriptors are polled, 0 if no file descriptor polled
193 */
194 abstract int poll(int timeout) throws IOException;
195
196 /**
197 * Wakeup the poller thread if blocked in poll so it can shutdown.
198 * @throws UnsupportedOperationException if not supported
199 */
200 void wakeupPoller() throws IOException {
201 throw new UnsupportedOperationException();
202 }
203
204 /**
205 * Callback by the poll method when a file descriptor is polled.
206 */
207 final void polled(int fdVal) {
208 Thread t = map.remove(fdVal);
209 if (t != null) {
210 LockSupport.unpark(t);
211 }
212 }
213
214 /**
215 * Parks the current thread until a file descriptor is ready for the given op.
216 * @param fdVal the file descriptor
217 * @param event POLLIN or POLLOUT
218 * @param nanos the waiting time or 0 to wait indefinitely
219 * @param isOpen supplies a boolean to indicate if the enclosing object is open
220 */
221 public static void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
222 POLLER_GROUP.poll(fdVal, event, nanos, isOpen);
223 }
224
225 /**
226 * Parks the current thread until a Selector's file descriptor is ready.
227 * @param fdVal the Selector's file descriptor
228 * @param nanos the waiting time or 0 to wait indefinitely
229 */
230 public static void pollSelector(int fdVal, long nanos) throws IOException {
381 thread.setDaemon(true);
382 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
383 thread.start();
384 }
385
386 /**
387 * Return the master poller, or null if no master poller.
388 */
389 abstract Poller masterPoller();
390
391 /**
392 * Return the read pollers.
393 */
394 abstract List<Poller> readPollers();
395
396 /**
397 * Return the write pollers.
398 */
399 abstract List<Poller> writePollers();
400
401 /**
402 * Close the given pollers.
403 */
404 static void closeAll(Poller... pollers) {
405 for (Poller poller : pollers) {
406 if (poller != null) {
407 try {
408 poller.close();
409 } catch (IOException _) { }
410 }
411 }
412 }
413 }
414
415 /**
416 * SYSTEM_THREADS poller group. The read and write pollers are system-wide platform threads.
417 */
418 private static class SystemThreadsPollerGroup extends PollerGroup {
419 // system-wide read and write pollers
420 private final Poller[] readPollers;
637 });
638 }
639
640 private Poller writePoller(int fdVal) {
641 int index = provider().fdValToIndex(fdVal, writePollers.length);
642 return writePollers[index];
643 }
644
645 /**
646 * Starts a read sub-poller in a virtual thread.
647 */
648 private Poller startReadPoller() throws IOException {
649 assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
650
651 // create read sub-poller
652 Poller readPoller = provider().readPoller(true);
653 readPollers.add(readPoller);
654
655 // start virtual thread to execute sub-polling loop
656 Thread carrier = JLA.currentCarrierThread();
657 Thread.ofVirtual()
658 .inheritInheritableThreadLocals(false)
659 .name(carrier.getName() + "-Read-Poller")
660 .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
661 .start(() -> subPollerLoop(readPoller));
662 return readPoller;
663 }
664
665 /**
666 * Returns the read poller for the current carrier, starting it if required.
667 */
668 private Poller readPoller() throws IOException {
669 assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
670 Continuation.pin();
671 try {
672 CarrierPoller carrierPoller = CARRIER_POLLER.get();
673 if (carrierPoller != null) {
674 return carrierPoller.readPoller();
675 } else {
676 // first poll on this carrier will start poller
677 Poller readPoller = startReadPoller();
678 CARRIER_POLLER.set(new CarrierPoller(this, readPoller));
679 return readPoller;
680 }
681 } finally {
731 readPoller.wakeupPoller();
732 } catch (Throwable e) {
733 e.printStackTrace();
734 }
735 }
736
737 @Override
738 Poller masterPoller() {
739 return masterPoller;
740 }
741
742 @Override
743 List<Poller> readPollers() {
744 return readPollers.stream().toList();
745 }
746
747 @Override
748 List<Poller> writePollers() {
749 return List.of(writePollers);
750 }
751 }
752
753 /**
754 * Reads the given property name to get the poller count. If the property is
755 * set then the value must be a power of 2. Returns 1 if the property is not
756 * set.
757 * @throws IllegalArgumentException if the property is set to a value that
758 * is not a power of 2.
759 */
760 private static int pollerCount(String propName, int defaultCount) {
761 String s = System.getProperty(propName);
762 int count = (s != null) ? Integer.parseInt(s) : defaultCount;
763
764 // check power of 2
765 if (count != Integer.highestOneBit(count)) {
766 String msg = propName + " is set to a value that is not a power of 2";
767 throw new IllegalArgumentException(msg);
768 }
769 return count;
770 }
|
190 * @param timeout if positive then block for up to {@code timeout} milliseconds,
191 * if zero then don't block, if -1 then block indefinitely
192 * @return >0 if file descriptors are polled, 0 if no file descriptor polled
193 */
194 abstract int poll(int timeout) throws IOException;
195
196 /**
197 * Wakeup the poller thread if blocked in poll so it can shutdown.
198 * @throws UnsupportedOperationException if not supported
199 */
200 void wakeupPoller() throws IOException {
201 throw new UnsupportedOperationException();
202 }
203
204 /**
205 * Callback by the poll method when a file descriptor is polled.
206 */
207 final void polled(int fdVal) {
208 Thread t = map.remove(fdVal);
209 if (t != null) {
210 if (POLLER_GROUP.useLazyUnpark() && Thread.currentThread().isVirtual()) {
211 JLA.lazyUnparkVirtualThread(t);
212 } else {
213 LockSupport.unpark(t);
214 }
215 }
216 }
217
218 /**
219 * Parks the current thread until a file descriptor is ready for the given op.
220 * @param fdVal the file descriptor
221 * @param event POLLIN or POLLOUT
222 * @param nanos the waiting time or 0 to wait indefinitely
223 * @param isOpen supplies a boolean to indicate if the enclosing object is open
224 */
225 public static void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
226 POLLER_GROUP.poll(fdVal, event, nanos, isOpen);
227 }
228
229 /**
230 * Parks the current thread until a Selector's file descriptor is ready.
231 * @param fdVal the Selector's file descriptor
232 * @param nanos the waiting time or 0 to wait indefinitely
233 */
234 public static void pollSelector(int fdVal, long nanos) throws IOException {
385 thread.setDaemon(true);
386 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
387 thread.start();
388 }
389
390 /**
391 * Return the master poller, or null if no master poller.
392 */
393 abstract Poller masterPoller();
394
395 /**
396 * Return the read pollers.
397 */
398 abstract List<Poller> readPollers();
399
400 /**
401 * Return the write pollers.
402 */
403 abstract List<Poller> writePollers();
404
405 /**
406 * Return true if the unparking threads should use lazyUnpark.
407 */
408 boolean useLazyUnpark() {
409 return false;
410 }
411
412 /**
413 * Close the given pollers.
414 */
415 static void closeAll(Poller... pollers) {
416 for (Poller poller : pollers) {
417 if (poller != null) {
418 try {
419 poller.close();
420 } catch (IOException _) { }
421 }
422 }
423 }
424 }
425
426 /**
427 * SYSTEM_THREADS poller group. The read and write pollers are system-wide platform threads.
428 */
429 private static class SystemThreadsPollerGroup extends PollerGroup {
430 // system-wide read and write pollers
431 private final Poller[] readPollers;
648 });
649 }
650
651 private Poller writePoller(int fdVal) {
652 int index = provider().fdValToIndex(fdVal, writePollers.length);
653 return writePollers[index];
654 }
655
656 /**
657 * Starts a read sub-poller in a virtual thread.
658 */
659 private Poller startReadPoller() throws IOException {
660 assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
661
662 // create read sub-poller
663 Poller readPoller = provider().readPoller(true);
664 readPollers.add(readPoller);
665
666 // start virtual thread to execute sub-polling loop
667 Thread carrier = JLA.currentCarrierThread();
668 Thread.Builder.OfVirtual builder = Thread.ofVirtual()
669 .inheritInheritableThreadLocals(false)
670 .name(carrier.getName() + "-Read-Poller")
671 .uncaughtExceptionHandler((_, e) -> e.printStackTrace());
672 Thread thread = JLA.defaultVirtualThreadScheduler()
673 .newThread(builder, carrier, () -> subPollerLoop(readPoller))
674 .thread();
675 thread.start();
676 return readPoller;
677 }
678
679 /**
680 * Returns the read poller for the current carrier, starting it if required.
681 */
682 private Poller readPoller() throws IOException {
683 assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
684 Continuation.pin();
685 try {
686 CarrierPoller carrierPoller = CARRIER_POLLER.get();
687 if (carrierPoller != null) {
688 return carrierPoller.readPoller();
689 } else {
690 // first poll on this carrier will start poller
691 Poller readPoller = startReadPoller();
692 CARRIER_POLLER.set(new CarrierPoller(this, readPoller));
693 return readPoller;
694 }
695 } finally {
745 readPoller.wakeupPoller();
746 } catch (Throwable e) {
747 e.printStackTrace();
748 }
749 }
750
751 @Override
752 Poller masterPoller() {
753 return masterPoller;
754 }
755
756 @Override
757 List<Poller> readPollers() {
758 return readPollers.stream().toList();
759 }
760
761 @Override
762 List<Poller> writePollers() {
763 return List.of(writePollers);
764 }
765
766 @Override
767 boolean useLazyUnpark() {
768 return true;
769 }
770 }
771
772 /**
773 * Reads the given property name to get the poller count. If the property is
774 * set then the value must be a power of 2. Returns 1 if the property is not
775 * set.
776 * @throws IllegalArgumentException if the property is set to a value that
777 * is not a power of 2.
778 */
779 private static int pollerCount(String propName, int defaultCount) {
780 String s = System.getProperty(propName);
781 int count = (s != null) ? Integer.parseInt(s) : defaultCount;
782
783 // check power of 2
784 if (count != Integer.highestOneBit(count)) {
785 String msg = propName + " is set to a value that is not a power of 2";
786 throw new IllegalArgumentException(msg);
787 }
788 return count;
789 }
|