31 import java.util.List;
32 import java.util.Map;
33 import java.util.Objects;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.Executor;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.ThreadFactory;
39 import java.util.concurrent.locks.LockSupport;
40 import java.util.function.BooleanSupplier;
41 import jdk.internal.access.JavaLangAccess;
42 import jdk.internal.access.SharedSecrets;
43 import jdk.internal.misc.InnocuousThread;
44 import jdk.internal.misc.TerminatingThreadLocal;
45 import jdk.internal.vm.Continuation;
46 import jdk.internal.vm.ContinuationSupport;
47 import jdk.internal.vm.annotation.Stable;
48
49 /**
50 * I/O poller to allow virtual threads park until a file descriptor is ready for I/O.
51 */
52 public abstract class Poller {
53 private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
54
55 // the poller group for the I/O pollers and poller threads
56 private static final PollerGroup POLLER_GROUP = createPollerGroup();
57
58 // the poller or sub-poller thread (used for observability only)
59 private @Stable Thread owner;
60
61 // maps file descriptors to parked Thread
62 private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
63
64 // shutdown (if supported by poller group)
65 private volatile boolean shutdown;
66
67 /**
68 * Poller mode.
69 */
70 enum Mode {
152 */
153 boolean isShutdown() {
154 return shutdown;
155 }
156
157 /**
158 * Marks this poller for shutdown.
159 */
160 private void setShutdown() {
161 shutdown = true;
162 }
163
164 /**
165 * Returns the poller's file descriptor to use when polling with the master poller.
166 * @throws UnsupportedOperationException if not supported
167 */
168 int fdVal() {
169 throw new UnsupportedOperationException();
170 }
171
172 /**
173 * Register the file descriptor with the I/O event management facility so that it is
174 * polled when the file descriptor is ready for I/O. The registration is "one shot",
175 * meaning it should be polled at most once.
176 */
177 abstract void implStartPoll(int fdVal) throws IOException;
178
179 /**
180 * Deregister a file descriptor from the I/O event management facility. This may be
181 * a no-op in some implementations when the file descriptor has already been polled.
182 * @param polled true if the file descriptor has already been polled
183 */
184 abstract void implStopPoll(int fdVal, boolean polled) throws IOException;
185
186 /**
187 * Poll for events. The {@link #polled(int)} method is invoked for each
188 * polled file descriptor.
189 *
190 * @param timeout if positive then block for up to {@code timeout} milliseconds,
191 * if zero then don't block, if -1 then block indefinitely
302 }
303 }
304
305 /**
306 * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
307 * descriptor that is polled.
308 *
309 * The sub-poller registers its file descriptor with the master poller to park until
310 * there are events to poll. When unparked, it does non-blocking polls and parks
311 * again when there are no more events. The sub-poller yields after each poll to help
312 * with fairness and to avoid re-registering with the master poller where possible.
313 */
314 private void subPollerLoop(Poller masterPoller) {
315 assert Thread.currentThread().isVirtual();
316 setOwner();
317 try {
318 int polled = 0;
319 while (!isShutdown()) {
320 if (polled == 0) {
321 masterPoller.poll(fdVal(), 0, () -> true); // park
322 } else {
323 Thread.yield();
324 }
325 polled = poll(0);
326 }
327 } catch (Exception e) {
328 e.printStackTrace();
329 }
330 }
331
332 /**
333 * Unparks all threads waiting on a file descriptor registered with this poller.
334 */
335 private void wakeupAll() {
336 map.values().forEach(LockSupport::unpark);
337 }
338
339 @Override
340 public String toString() {
341 return String.format("%s [registered = %d, owner = %s]",
393 */
394 abstract List<Poller> readPollers();
395
396 /**
397 * Return the write pollers.
398 */
399 abstract List<Poller> writePollers();
400
401 /**
402 * Close the given pollers.
403 */
404 static void closeAll(Poller... pollers) {
405 for (Poller poller : pollers) {
406 if (poller != null) {
407 try {
408 poller.close();
409 } catch (IOException _) { }
410 }
411 }
412 }
413 }
414
415 /**
416 * SYSTEM_THREADS poller group. The read and write pollers are system-wide platform threads.
417 */
418 private static class SystemThreadsPollerGroup extends PollerGroup {
419 // system-wide read and write pollers
420 private final Poller[] readPollers;
421 private final Poller[] writePollers;
422
423 SystemThreadsPollerGroup(PollerProvider provider,
424 int readPollerCount,
425 int writePollerCount) throws IOException {
426 super(provider);
427 Poller[] readPollers = new Poller[readPollerCount];
428 Poller[] writePollers = new Poller[writePollerCount];
429 try {
430 for (int i = 0; i < readPollerCount; i++) {
431 readPollers[i] = provider.readPoller(false);
432 }
454 }
455
456 private Poller readPoller(int fdVal) {
457 int index = provider().fdValToIndex(fdVal, readPollers.length);
458 return readPollers[index];
459 }
460
461 private Poller writePoller(int fdVal) {
462 int index = provider().fdValToIndex(fdVal, writePollers.length);
463 return writePollers[index];
464 }
465
466 @Override
467 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
468 Poller poller = (event == Net.POLLIN)
469 ? readPoller(fdVal)
470 : writePoller(fdVal);
471 poller.poll(fdVal, nanos, isOpen);
472 }
473
474 @Override
475 Poller masterPoller() {
476 return null;
477 }
478
479 @Override
480 List<Poller> readPollers() {
481 return List.of(readPollers);
482 }
483
484 @Override
485 List<Poller> writePollers() {
486 return List.of(writePollers);
487 }
488 }
489
490 /**
491 * VTHREAD_POLLERS poller group. The read and write pollers are virtual threads.
492 * When read and write pollers need to block then they register with a system-wide
493 * "master poller" that runs in a dedicated platform thread.
546 }
547
548 private Poller readPoller(int fdVal) {
549 int index = provider().fdValToIndex(fdVal, readPollers.length);
550 return readPollers[index];
551 }
552
553 private Poller writePoller(int fdVal) {
554 int index = provider().fdValToIndex(fdVal, writePollers.length);
555 return writePollers[index];
556 }
557
558 @Override
559 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
560 Poller poller = (event == Net.POLLIN)
561 ? readPoller(fdVal)
562 : writePoller(fdVal);
563 poller.poll(fdVal, nanos, isOpen);
564 }
565
566 @Override
567 void pollSelector(int fdVal, long nanos) throws IOException {
568 masterPoller.poll(fdVal, nanos, () -> true);
569 }
570
571 @Override
572 Poller masterPoller() {
573 return masterPoller;
574 }
575
576 @Override
577 List<Poller> readPollers() {
578 return List.of(readPollers);
579 }
580
581 @Override
582 List<Poller> writePollers() {
583 return List.of(writePollers);
584 }
585 }
637 });
638 }
639
640 private Poller writePoller(int fdVal) {
641 int index = provider().fdValToIndex(fdVal, writePollers.length);
642 return writePollers[index];
643 }
644
645 /**
646 * Starts a read sub-poller in a virtual thread.
647 */
648 private Poller startReadPoller() throws IOException {
649 assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
650
651 // create read sub-poller
652 Poller readPoller = provider().readPoller(true);
653 readPollers.add(readPoller);
654
655 // start virtual thread to execute sub-polling loop
656 Thread carrier = JLA.currentCarrierThread();
657 Thread.ofVirtual()
658 .inheritInheritableThreadLocals(false)
659 .name(carrier.getName() + "-Read-Poller")
660 .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
661 .start(() -> subPollerLoop(readPoller));
662 return readPoller;
663 }
664
665 /**
666 * Returns the read poller for the current carrier, starting it if required.
667 */
668 private Poller readPoller() throws IOException {
669 assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
670 Continuation.pin();
671 try {
672 CarrierPoller carrierPoller = CARRIER_POLLER.get();
673 if (carrierPoller != null) {
674 return carrierPoller.readPoller();
675 } else {
676 // first poll on this carrier will start poller
677 Poller readPoller = startReadPoller();
678 CARRIER_POLLER.set(new CarrierPoller(this, readPoller));
679 return readPoller;
680 }
681 } finally {
689 if (event == Net.POLLIN
690 && Thread.currentThread().isVirtual()
691 && ContinuationSupport.isSupported()) {
692 readPoller().poll(fdVal, nanos, isOpen);
693 return;
694 }
695
696 // -XX:-VMContinuations or POLLIN from platform thread does master poller
697 if (event == Net.POLLIN) {
698 masterPoller.poll(fdVal, nanos, isOpen);
699 } else {
700 writePoller(fdVal).poll(fdVal, nanos, isOpen);
701 }
702 }
703
704 @Override
705 void pollSelector(int fdVal, long nanos) throws IOException {
706 masterPoller.poll(fdVal, nanos, () -> true);
707 }
708
709 /**
710 * Sub-poller polling loop.
711 */
712 private void subPollerLoop(Poller readPoller) {
713 try {
714 readPoller.subPollerLoop(masterPoller);
715 } finally {
716 // wakeup all threads waiting on file descriptors registered with the
717 // read poller, these I/O operation will migrate to another carrier.
718 readPoller.wakeupAll();
719
720 // remove from serviceability view
721 readPollers.remove(readPoller);
722 }
723 }
724
725 /**
726 * Invoked by the carrier thread before it terminates.
727 */
728 private void carrierTerminated(Poller readPoller) {
752
753 /**
754 * Reads the given property name to get the poller count. If the property is
755 * set then the value must be a power of 2. Returns 1 if the property is not
756 * set.
757 * @throws IllegalArgumentException if the property is set to a value that
758 * is not a power of 2.
759 */
760 private static int pollerCount(String propName, int defaultCount) {
761 String s = System.getProperty(propName);
762 int count = (s != null) ? Integer.parseInt(s) : defaultCount;
763
764 // check power of 2
765 if (count != Integer.highestOneBit(count)) {
766 String msg = propName + " is set to a value that is not a power of 2";
767 throw new IllegalArgumentException(msg);
768 }
769 return count;
770 }
771
772 /**
773 * Return the master poller or null if there is no master poller.
774 */
775 public static Poller masterPoller() {
776 return POLLER_GROUP.masterPoller();
777 }
778
779 /**
780 * Return the list of read pollers.
781 */
782 public static List<Poller> readPollers() {
783 return POLLER_GROUP.readPollers();
784 }
785
786 /**
787 * Return the list of write pollers.
788 */
789 public static List<Poller> writePollers() {
790 return POLLER_GROUP.writePollers();
791 }
|
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Objects;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.Executor;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.ThreadFactory;
39 import java.util.concurrent.locks.LockSupport;
40 import java.util.function.BooleanSupplier;
41 import jdk.internal.access.JavaLangAccess;
42 import jdk.internal.access.SharedSecrets;
43 import jdk.internal.misc.InnocuousThread;
44 import jdk.internal.misc.TerminatingThreadLocal;
45 import jdk.internal.vm.Continuation;
46 import jdk.internal.vm.ContinuationSupport;
47 import jdk.internal.vm.annotation.Stable;
48
49 /**
50 * I/O poller to allow virtual threads park until a file descriptor is ready for I/O.
51 * Implementations also optionally support read/write operations where virtual threads
52 * park until bytes are read or written.
53 */
54 public abstract class Poller {
55 private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
56
57 // the poller group for the I/O pollers and poller threads
58 private static final PollerGroup POLLER_GROUP = createPollerGroup();
59
60 // the poller or sub-poller thread (used for observability only)
61 private @Stable Thread owner;
62
63 // maps file descriptors to parked Thread
64 private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
65
66 // shutdown (if supported by poller group)
67 private volatile boolean shutdown;
68
69 /**
70 * Poller mode.
71 */
72 enum Mode {
154 */
155 boolean isShutdown() {
156 return shutdown;
157 }
158
159 /**
160 * Marks this poller for shutdown.
161 */
162 private void setShutdown() {
163 shutdown = true;
164 }
165
166 /**
167 * Returns the poller's file descriptor to use when polling with the master poller.
168 * @throws UnsupportedOperationException if not supported
169 */
170 int fdVal() {
171 throw new UnsupportedOperationException();
172 }
173
174 /**
175 * Invoked if when this poller's file descriptor is polled by the master poller.
176 */
177 void pollerPolled() throws IOException {
178 }
179
180 /**
181 * Register the file descriptor with the I/O event management facility so that it is
182 * polled when the file descriptor is ready for I/O. The registration is "one shot",
183 * meaning it should be polled at most once.
184 */
185 abstract void implStartPoll(int fdVal) throws IOException;
186
187 /**
188 * Deregister a file descriptor from the I/O event management facility. This may be
189 * a no-op in some implementations when the file descriptor has already been polled.
190 * @param polled true if the file descriptor has already been polled
191 */
192 abstract void implStopPoll(int fdVal, boolean polled) throws IOException;
193
194 /**
195 * Poll for events. The {@link #polled(int)} method is invoked for each
196 * polled file descriptor.
197 *
198 * @param timeout if positive then block for up to {@code timeout} milliseconds,
199 * if zero then don't block, if -1 then block indefinitely
310 }
311 }
312
313 /**
314 * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
315 * descriptor that is polled.
316 *
317 * The sub-poller registers its file descriptor with the master poller to park until
318 * there are events to poll. When unparked, it does non-blocking polls and parks
319 * again when there are no more events. The sub-poller yields after each poll to help
320 * with fairness and to avoid re-registering with the master poller where possible.
321 */
322 private void subPollerLoop(Poller masterPoller) {
323 assert Thread.currentThread().isVirtual();
324 setOwner();
325 try {
326 int polled = 0;
327 while (!isShutdown()) {
328 if (polled == 0) {
329 masterPoller.poll(fdVal(), 0, () -> true); // park
330 pollerPolled();
331 } else {
332 Thread.yield();
333 }
334 polled = poll(0);
335 }
336 } catch (Exception e) {
337 e.printStackTrace();
338 }
339 }
340
341 /**
342 * Unparks all threads waiting on a file descriptor registered with this poller.
343 */
344 private void wakeupAll() {
345 map.values().forEach(LockSupport::unpark);
346 }
347
348 @Override
349 public String toString() {
350 return String.format("%s [registered = %d, owner = %s]",
402 */
403 abstract List<Poller> readPollers();
404
405 /**
406 * Return the write pollers.
407 */
408 abstract List<Poller> writePollers();
409
410 /**
411 * Close the given pollers.
412 */
413 static void closeAll(Poller... pollers) {
414 for (Poller poller : pollers) {
415 if (poller != null) {
416 try {
417 poller.close();
418 } catch (IOException _) { }
419 }
420 }
421 }
422
423 /**
424 * Returns true if the read pollers in this poller group support read ops in
425 * addition to POLLIN polling.
426 */
427 boolean supportReadOps() {
428 return provider().supportReadOps();
429 }
430
431 /**
432 * Reads bytes into a byte array.
433 * @throws UnsupportedOperationException if not supported
434 */
435 abstract int read(int fdVal, byte[] b, int off, int len, long nanos,
436 BooleanSupplier isOpen) throws IOException;
437
438 /**
439 * Returns true if the write pollers in this poller group support write ops in
440 * addition to POLLOUT polling.
441 */
442 boolean supportWriteOps() {
443 return provider().supportWriteOps();
444 }
445
446 /**
447 * Write bytes from a byte array.
448 * @throws UnsupportedOperationException if not supported
449 */
450 abstract int write(int fdVal, byte[] b, int off, int len,
451 BooleanSupplier isOpen) throws IOException;
452 }
453
454 /**
455 * SYSTEM_THREADS poller group. The read and write pollers are system-wide platform threads.
456 */
457 private static class SystemThreadsPollerGroup extends PollerGroup {
458 // system-wide read and write pollers
459 private final Poller[] readPollers;
460 private final Poller[] writePollers;
461
462 SystemThreadsPollerGroup(PollerProvider provider,
463 int readPollerCount,
464 int writePollerCount) throws IOException {
465 super(provider);
466 Poller[] readPollers = new Poller[readPollerCount];
467 Poller[] writePollers = new Poller[writePollerCount];
468 try {
469 for (int i = 0; i < readPollerCount; i++) {
470 readPollers[i] = provider.readPoller(false);
471 }
493 }
494
495 private Poller readPoller(int fdVal) {
496 int index = provider().fdValToIndex(fdVal, readPollers.length);
497 return readPollers[index];
498 }
499
500 private Poller writePoller(int fdVal) {
501 int index = provider().fdValToIndex(fdVal, writePollers.length);
502 return writePollers[index];
503 }
504
505 @Override
506 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
507 Poller poller = (event == Net.POLLIN)
508 ? readPoller(fdVal)
509 : writePoller(fdVal);
510 poller.poll(fdVal, nanos, isOpen);
511 }
512
513 @Override
514 int read(int fdVal, byte[] b, int off, int len, long nanos,
515 BooleanSupplier isOpen) throws IOException {
516 return readPoller(fdVal).implRead(fdVal, b, off, len, nanos, isOpen);
517 }
518
519 @Override
520 int write(int fdVal, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException {
521 return writePoller(fdVal).implWrite(fdVal, b, off, len, isOpen);
522 }
523
524 @Override
525 Poller masterPoller() {
526 return null;
527 }
528
529 @Override
530 List<Poller> readPollers() {
531 return List.of(readPollers);
532 }
533
534 @Override
535 List<Poller> writePollers() {
536 return List.of(writePollers);
537 }
538 }
539
540 /**
541 * VTHREAD_POLLERS poller group. The read and write pollers are virtual threads.
542 * When read and write pollers need to block then they register with a system-wide
543 * "master poller" that runs in a dedicated platform thread.
596 }
597
598 private Poller readPoller(int fdVal) {
599 int index = provider().fdValToIndex(fdVal, readPollers.length);
600 return readPollers[index];
601 }
602
603 private Poller writePoller(int fdVal) {
604 int index = provider().fdValToIndex(fdVal, writePollers.length);
605 return writePollers[index];
606 }
607
608 @Override
609 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
610 Poller poller = (event == Net.POLLIN)
611 ? readPoller(fdVal)
612 : writePoller(fdVal);
613 poller.poll(fdVal, nanos, isOpen);
614 }
615
616 @Override
617 int read(int fdVal, byte[] b, int off, int len, long nanos,
618 BooleanSupplier isOpen) throws IOException {
619 return readPoller(fdVal).implRead(fdVal, b, off, len, nanos, isOpen);
620 }
621
622 @Override
623 int write(int fdVal, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException {
624 return writePoller(fdVal).implWrite(fdVal, b, off, len, isOpen);
625 }
626
627 @Override
628 void pollSelector(int fdVal, long nanos) throws IOException {
629 masterPoller.poll(fdVal, nanos, () -> true);
630 }
631
632 @Override
633 Poller masterPoller() {
634 return masterPoller;
635 }
636
637 @Override
638 List<Poller> readPollers() {
639 return List.of(readPollers);
640 }
641
642 @Override
643 List<Poller> writePollers() {
644 return List.of(writePollers);
645 }
646 }
698 });
699 }
700
701 private Poller writePoller(int fdVal) {
702 int index = provider().fdValToIndex(fdVal, writePollers.length);
703 return writePollers[index];
704 }
705
706 /**
707 * Starts a read sub-poller in a virtual thread.
708 */
709 private Poller startReadPoller() throws IOException {
710 assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
711
712 // create read sub-poller
713 Poller readPoller = provider().readPoller(true);
714 readPollers.add(readPoller);
715
716 // start virtual thread to execute sub-polling loop
717 Thread carrier = JLA.currentCarrierThread();
718 Thread.Builder.OfVirtual builder = Thread.ofVirtual()
719 .inheritInheritableThreadLocals(false)
720 .name(carrier.getName() + "-Read-Poller")
721 .uncaughtExceptionHandler((_, e) -> e.printStackTrace());
722 Thread thread = JLA.defaultVirtualThreadScheduler()
723 .newThread(builder, carrier, () -> subPollerLoop(readPoller))
724 .thread();
725 thread.start();
726 return readPoller;
727 }
728
729 /**
730 * Returns the read poller for the current carrier, starting it if required.
731 */
732 private Poller readPoller() throws IOException {
733 assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
734 Continuation.pin();
735 try {
736 CarrierPoller carrierPoller = CARRIER_POLLER.get();
737 if (carrierPoller != null) {
738 return carrierPoller.readPoller();
739 } else {
740 // first poll on this carrier will start poller
741 Poller readPoller = startReadPoller();
742 CARRIER_POLLER.set(new CarrierPoller(this, readPoller));
743 return readPoller;
744 }
745 } finally {
753 if (event == Net.POLLIN
754 && Thread.currentThread().isVirtual()
755 && ContinuationSupport.isSupported()) {
756 readPoller().poll(fdVal, nanos, isOpen);
757 return;
758 }
759
760 // -XX:-VMContinuations or POLLIN from platform thread does master poller
761 if (event == Net.POLLIN) {
762 masterPoller.poll(fdVal, nanos, isOpen);
763 } else {
764 writePoller(fdVal).poll(fdVal, nanos, isOpen);
765 }
766 }
767
768 @Override
769 void pollSelector(int fdVal, long nanos) throws IOException {
770 masterPoller.poll(fdVal, nanos, () -> true);
771 }
772
773 @Override
774 int read(int fdVal, byte[] b, int off, int len, long nanos,
775 BooleanSupplier isOpen) throws IOException {
776 return readPoller().implRead(fdVal, b, off, len, nanos, isOpen);
777 }
778
779 @Override
780 int write(int fdVal, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException {
781 return writePoller(fdVal).implWrite(fdVal, b, off, len, isOpen);
782 }
783
784 /**
785 * Sub-poller polling loop.
786 */
787 private void subPollerLoop(Poller readPoller) {
788 try {
789 readPoller.subPollerLoop(masterPoller);
790 } finally {
791 // wakeup all threads waiting on file descriptors registered with the
792 // read poller, these I/O operation will migrate to another carrier.
793 readPoller.wakeupAll();
794
795 // remove from serviceability view
796 readPollers.remove(readPoller);
797 }
798 }
799
800 /**
801 * Invoked by the carrier thread before it terminates.
802 */
803 private void carrierTerminated(Poller readPoller) {
827
828 /**
829 * Reads the given property name to get the poller count. If the property is
830 * set then the value must be a power of 2. Returns 1 if the property is not
831 * set.
832 * @throws IllegalArgumentException if the property is set to a value that
833 * is not a power of 2.
834 */
835 private static int pollerCount(String propName, int defaultCount) {
836 String s = System.getProperty(propName);
837 int count = (s != null) ? Integer.parseInt(s) : defaultCount;
838
839 // check power of 2
840 if (count != Integer.highestOneBit(count)) {
841 String msg = propName + " is set to a value that is not a power of 2";
842 throw new IllegalArgumentException(msg);
843 }
844 return count;
845 }
846
847
848 /**
849 * Returns true if read ops are supported in addition to POLLIN polling.
850 */
851 public static boolean supportReadOps() {
852 return POLLER_GROUP.supportReadOps();
853 }
854
855 /**
856 * Returns true if write ops are supported in addition to POLLOUT polling.
857 */
858 public static boolean supportWriteOps() {
859 return POLLER_GROUP.supportWriteOps();
860 }
861
862 /**
863 * Parks the current thread until bytes are read into a byte array.
864 * @param isOpen supplies a boolean to indicate if the enclosing object is open
865 * @return the number of bytes read (>0), EOF (-1), or UNAVAILABLE (-2) if unparked
866 * or the timeout expires while waiting for bytes to be read
867 * @throws UnsupportedOperationException if not supported
868 */
869 public static int read(int fdVal, byte[] b, int off, int len, long nanos,
870 BooleanSupplier isOpen) throws IOException {
871 return POLLER_GROUP.read(fdVal, b, off, len, nanos, isOpen);
872 }
873
874 /**
875 * Parks the current thread until bytes are written from a byte array.
876 * @param isOpen supplies a boolean to indicate if the enclosing object is open
877 * @return the number of bytes read (>0), EOF (-1), or UNAVAILABLE (-2) if unparked
878 * or the timeout expires while waiting for bytes to be read
879 * @throws UnsupportedOperationException if not supported
880 */
881 public static int write(int fdVal, byte[] b, int off, int len,
882 BooleanSupplier isOpen) throws IOException {
883 return POLLER_GROUP.write(fdVal, b, off, len, isOpen);
884 }
885
886 /**
887 * Parks the current thread until bytes are read a byte array. This method is
888 * overridden by poller implementations that support this operation.
889 */
890 int implRead(int fdVal, byte[] b, int off, int len, long nanos,
891 BooleanSupplier isOpen) throws IOException {
892 throw new UnsupportedOperationException();
893 }
894
895 /**
896 * Parks the current thread until bytes are written from a byte array. This
897 * method is overridden by poller implementations that support this operation.
898 */
899 int implWrite(int fdVal, byte[] b, int off, int len,
900 BooleanSupplier isOpen) throws IOException {
901 throw new UnsupportedOperationException();
902 }
903
904 /**
905 * Return the master poller or null if there is no master poller.
906 */
907 public static Poller masterPoller() {
908 return POLLER_GROUP.masterPoller();
909 }
910
911 /**
912 * Return the list of read pollers.
913 */
914 public static List<Poller> readPollers() {
915 return POLLER_GROUP.readPollers();
916 }
917
918 /**
919 * Return the list of write pollers.
920 */
921 public static List<Poller> writePollers() {
922 return POLLER_GROUP.writePollers();
923 }
|