1 /*
2 * Copyright (c) 2017, 2025, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package sun.nio.ch;
26
27 import java.io.IOException;
28 import java.io.UncheckedIOException;
29 import java.lang.ref.Reference;
30 import java.util.Arrays;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Objects;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.Executor;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.ThreadFactory;
39 import java.util.concurrent.locks.LockSupport;
40 import java.util.function.BooleanSupplier;
41 import jdk.internal.access.JavaLangAccess;
42 import jdk.internal.access.SharedSecrets;
43 import jdk.internal.misc.InnocuousThread;
44 import jdk.internal.misc.TerminatingThreadLocal;
45 import jdk.internal.vm.Continuation;
46 import jdk.internal.vm.ContinuationSupport;
47 import jdk.internal.vm.annotation.Stable;
48
49 /**
50 * I/O poller to allow virtual threads park until a file descriptor is ready for I/O.
51 */
52 public abstract class Poller {
53 private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
54
55 // the poller group for the I/O pollers and poller threads
56 private static final PollerGroup POLLER_GROUP = createPollerGroup();
57
58 // the poller or sub-poller thread (used for observability only)
59 private @Stable Thread owner;
60
61 // maps file descriptors to parked Thread
62 private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
63
64 // shutdown (if supported by poller group)
65 private volatile boolean shutdown;
66
67 /**
68 * Poller mode.
69 */
70 enum Mode {
71 /**
72 * Read and write pollers are platform threads that block waiting for events and
73 * unpark virtual threads when file descriptors are ready for I/O.
74 */
75 SYSTEM_THREADS,
76
77 /**
78 * Read and write pollers are virtual threads that poll for events, yielding
79 * between polls and unparking virtual threads when file descriptors are
80 * ready for I/O. If there are no events then the poller threads park until there
81 * are I/O events to poll. This mode helps to integrate polling with virtual
82 * thread scheduling. The approach is similar to the default scheme in "User-level
83 * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
84 * (https://dl.acm.org/doi/10.1145/3379483).
85 */
86 VTHREAD_POLLERS,
87
88 /**
89 * Read pollers are per-carrier virtual threads that poll for events, yielding
90 * between polls and unparking virtual threads when file descriptors are ready
91 * for I/O. If there are no events then the poller threads park until there
92 * are I/O events to poll. The write poller is a system-wide platform thread.
93 */
94 POLLER_PER_CARRIER
95 }
96
97 /**
98 * Create and return the PollerGroup.
99 */
100 private static PollerGroup createPollerGroup() {
101 try {
102 PollerProvider provider;
103 if (System.getProperty("jdk.pollerMode") instanceof String s) {
104 Mode mode = switch (s) {
105 case "1" -> Mode.SYSTEM_THREADS;
106 case "2" -> Mode.VTHREAD_POLLERS;
107 case "3" -> Mode.POLLER_PER_CARRIER;
108 default -> {
109 throw new RuntimeException(s + " is not a valid polling mode");
110 }
111 };
112 provider = PollerProvider.createProvider(mode);
113 } else {
114 provider = PollerProvider.createProvider();
115 }
116
117 int readPollers = pollerCount("jdk.readPollers", provider.defaultReadPollers());
118 int writePollers = pollerCount("jdk.writePollers", provider.defaultWritePollers());
119 PollerGroup group = switch (provider.pollerMode()) {
120 case SYSTEM_THREADS -> new SystemThreadsPollerGroup(provider, readPollers, writePollers);
121 case VTHREAD_POLLERS -> new VThreadsPollerGroup(provider, readPollers, writePollers);
122 case POLLER_PER_CARRIER -> new PollerPerCarrierPollerGroup(provider, writePollers);
123 };
124 group.start();
125 return group;
126 } catch (IOException ioe) {
127 throw new UncheckedIOException(ioe);
128 }
129 }
130
131 /**
132 * Initialize a Poller.
133 */
134 protected Poller() {
135 }
136
137 /**
138 * Closes the poller and release resources. This method can only be used to cleanup
139 * when creating a poller group fails.
140 */
141 abstract void close() throws IOException;
142
143 /**
144 * Sets the poller's thread owner.
145 */
146 private void setOwner() {
147 owner = Thread.currentThread();
148 }
149
150 /**
151 * Returns true if this poller is marked for shutdown.
152 */
153 boolean isShutdown() {
154 return shutdown;
155 }
156
157 /**
158 * Marks this poller for shutdown.
159 */
160 private void setShutdown() {
161 shutdown = true;
162 }
163
164 /**
165 * Returns the poller's file descriptor to use when polling with the master poller.
166 * @throws UnsupportedOperationException if not supported
167 */
168 int fdVal() {
169 throw new UnsupportedOperationException();
170 }
171
172 /**
173 * Register the file descriptor with the I/O event management facility so that it is
174 * polled when the file descriptor is ready for I/O. The registration is "one shot",
175 * meaning it should be polled at most once.
176 */
177 abstract void implStartPoll(int fdVal) throws IOException;
178
179 /**
180 * Deregister a file descriptor from the I/O event management facility. This may be
181 * a no-op in some implementations when the file descriptor has already been polled.
182 * @param polled true if the file descriptor has already been polled
183 */
184 abstract void implStopPoll(int fdVal, boolean polled) throws IOException;
185
186 /**
187 * Poll for events. The {@link #polled(int)} method is invoked for each
188 * polled file descriptor.
189 *
190 * @param timeout if positive then block for up to {@code timeout} milliseconds,
191 * if zero then don't block, if -1 then block indefinitely
192 * @return >0 if file descriptors are polled, 0 if no file descriptor polled
193 */
194 abstract int poll(int timeout) throws IOException;
195
196 /**
197 * Wakeup the poller thread if blocked in poll so it can shutdown.
198 * @throws UnsupportedOperationException if not supported
199 */
200 void wakeupPoller() throws IOException {
201 throw new UnsupportedOperationException();
202 }
203
204 /**
205 * Callback by the poll method when a file descriptor is polled.
206 */
207 final void polled(int fdVal) {
208 Thread t = map.remove(fdVal);
209 if (t != null) {
210 if (POLLER_GROUP.useLazyUnpark() && Thread.currentThread().isVirtual()) {
211 JLA.lazyUnparkVirtualThread(t);
212 } else {
213 LockSupport.unpark(t);
214 }
215 }
216 }
217
218 /**
219 * Parks the current thread until a file descriptor is ready for the given op.
220 * @param fdVal the file descriptor
221 * @param event POLLIN or POLLOUT
222 * @param nanos the waiting time or 0 to wait indefinitely
223 * @param isOpen supplies a boolean to indicate if the enclosing object is open
224 */
225 public static void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
226 POLLER_GROUP.poll(fdVal, event, nanos, isOpen);
227 }
228
229 /**
230 * Parks the current thread until a Selector's file descriptor is ready.
231 * @param fdVal the Selector's file descriptor
232 * @param nanos the waiting time or 0 to wait indefinitely
233 */
234 public static void pollSelector(int fdVal, long nanos) throws IOException {
235 POLLER_GROUP.pollSelector(fdVal, nanos);
236 }
237
238 /**
239 * Unpark the given thread so that it stops polling.
240 */
241 public static void stopPoll(Thread thread) {
242 LockSupport.unpark(thread);
243 }
244
245 /**
246 * Parks the current thread until a file descriptor is ready.
247 */
248 private void poll(int fdVal, long nanos, BooleanSupplier isOpen) throws IOException {
249 startPoll(fdVal);
250 try {
251 if (isOpen.getAsBoolean() && !isShutdown()) {
252 if (nanos > 0) {
253 LockSupport.parkNanos(nanos);
254 } else {
255 LockSupport.park();
256 }
257 }
258 } finally {
259 stopPoll(fdVal);
260 }
261 }
262
263 /**
264 * Register a file descriptor with the I/O event management facility so that it is
265 * polled when the file descriptor is ready for I/O.
266 */
267 private void startPoll(int fdVal) throws IOException {
268 Thread previous = map.put(fdVal, Thread.currentThread());
269 assert previous == null;
270 try {
271 implStartPoll(fdVal);
272 } catch (Throwable t) {
273 map.remove(fdVal);
274 throw t;
275 } finally {
276 Reference.reachabilityFence(this);
277 }
278 }
279
280 /**
281 * Deregister a file descriptor from the I/O event management facility.
282 */
283 private void stopPoll(int fdVal) throws IOException {
284 Thread previous = map.remove(fdVal);
285 boolean polled = (previous == null);
286 assert polled || previous == Thread.currentThread();
287 try {
288 implStopPoll(fdVal, polled);
289 } finally {
290 Reference.reachabilityFence(this);
291 }
292 }
293
294 /**
295 * Master polling loop. The {@link #polled(int)} method is invoked for each file
296 * descriptor that is polled.
297 */
298 private void pollerLoop() {
299 setOwner();
300 try {
301 while (!isShutdown()) {
302 poll(-1);
303 }
304 } catch (Exception e) {
305 e.printStackTrace();
306 }
307 }
308
309 /**
310 * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
311 * descriptor that is polled.
312 *
313 * The sub-poller registers its file descriptor with the master poller to park until
314 * there are events to poll. When unparked, it does non-blocking polls and parks
315 * again when there are no more events. The sub-poller yields after each poll to help
316 * with fairness and to avoid re-registering with the master poller where possible.
317 */
318 private void subPollerLoop(Poller masterPoller) {
319 assert Thread.currentThread().isVirtual();
320 setOwner();
321 try {
322 int polled = 0;
323 while (!isShutdown()) {
324 if (polled == 0) {
325 masterPoller.poll(fdVal(), 0, () -> true); // park
326 } else {
327 Thread.yield();
328 }
329 polled = poll(0);
330 }
331 } catch (Exception e) {
332 e.printStackTrace();
333 }
334 }
335
336 /**
337 * Unparks all threads waiting on a file descriptor registered with this poller.
338 */
339 private void wakeupAll() {
340 map.values().forEach(LockSupport::unpark);
341 }
342
343 @Override
344 public String toString() {
345 return String.format("%s [registered = %d, owner = %s]",
346 Objects.toIdentityString(this), map.size(), owner);
347 }
348
349 /**
350 * A group of poller threads that support virtual threads polling file descriptors.
351 */
352 private static abstract class PollerGroup {
353 private final PollerProvider provider;
354
355 PollerGroup(PollerProvider provider) {
356 this.provider = provider;
357 }
358
359 final PollerProvider provider() {
360 return provider;
361 }
362
363 /**
364 * Starts the poller group and any system-wide poller threads.
365 */
366 abstract void start();
367
368 /**
369 * Parks the current thread until a file descriptor is ready for the given op.
370 */
371 abstract void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException;
372
373 /**
374 * Parks the current thread until a Selector's file descriptor is ready.
375 */
376 void pollSelector(int fdVal, long nanos) throws IOException {
377 poll(fdVal, Net.POLLIN, nanos, () -> true);
378 }
379
380 /**
381 * Starts a platform thread to run the given task.
382 */
383 protected final void startPlatformThread(String name, Runnable task) {
384 Thread thread = InnocuousThread.newSystemThread(name, task);
385 thread.setDaemon(true);
386 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
387 thread.start();
388 }
389
390 /**
391 * Return the master poller, or null if no master poller.
392 */
393 abstract Poller masterPoller();
394
395 /**
396 * Return the read pollers.
397 */
398 abstract List<Poller> readPollers();
399
400 /**
401 * Return the write pollers.
402 */
403 abstract List<Poller> writePollers();
404
405 /**
406 * Return true if the unparking threads should use lazyUnpark.
407 */
408 boolean useLazyUnpark() {
409 return false;
410 }
411
412 /**
413 * Close the given pollers.
414 */
415 static void closeAll(Poller... pollers) {
416 for (Poller poller : pollers) {
417 if (poller != null) {
418 try {
419 poller.close();
420 } catch (IOException _) { }
421 }
422 }
423 }
424 }
425
426 /**
427 * SYSTEM_THREADS poller group. The read and write pollers are system-wide platform threads.
428 */
429 private static class SystemThreadsPollerGroup extends PollerGroup {
430 // system-wide read and write pollers
431 private final Poller[] readPollers;
432 private final Poller[] writePollers;
433
434 SystemThreadsPollerGroup(PollerProvider provider,
435 int readPollerCount,
436 int writePollerCount) throws IOException {
437 super(provider);
438 Poller[] readPollers = new Poller[readPollerCount];
439 Poller[] writePollers = new Poller[writePollerCount];
440 try {
441 for (int i = 0; i < readPollerCount; i++) {
442 readPollers[i] = provider.readPoller(false);
443 }
444 for (int i = 0; i < writePollerCount; i++) {
445 writePollers[i] = provider.writePoller(false);
446 }
447 } catch (Throwable e) {
448 closeAll(readPollers);
449 closeAll(writePollers);
450 throw e;
451 }
452
453 this.readPollers = readPollers;
454 this.writePollers = writePollers;
455 }
456
457 @Override
458 void start() {
459 Arrays.stream(readPollers).forEach(p -> {
460 startPlatformThread("Read-Poller", p::pollerLoop);
461 });
462 Arrays.stream(writePollers).forEach(p -> {
463 startPlatformThread("Write-Poller", p::pollerLoop);
464 });
465 }
466
467 private Poller readPoller(int fdVal) {
468 int index = provider().fdValToIndex(fdVal, readPollers.length);
469 return readPollers[index];
470 }
471
472 private Poller writePoller(int fdVal) {
473 int index = provider().fdValToIndex(fdVal, writePollers.length);
474 return writePollers[index];
475 }
476
477 @Override
478 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
479 Poller poller = (event == Net.POLLIN)
480 ? readPoller(fdVal)
481 : writePoller(fdVal);
482 poller.poll(fdVal, nanos, isOpen);
483 }
484
485 @Override
486 Poller masterPoller() {
487 return null;
488 }
489
490 @Override
491 List<Poller> readPollers() {
492 return List.of(readPollers);
493 }
494
495 @Override
496 List<Poller> writePollers() {
497 return List.of(writePollers);
498 }
499 }
500
501 /**
502 * VTHREAD_POLLERS poller group. The read and write pollers are virtual threads.
503 * When read and write pollers need to block then they register with a system-wide
504 * "master poller" that runs in a dedicated platform thread.
505 */
506 private static class VThreadsPollerGroup extends PollerGroup {
507 private final Poller masterPoller;
508 private final Poller[] readPollers;
509 private final Poller[] writePollers;
510
511 // keep virtual thread pollers alive
512 private final Executor executor;
513
514 VThreadsPollerGroup(PollerProvider provider,
515 int readPollerCount,
516 int writePollerCount) throws IOException {
517 super(provider);
518 Poller masterPoller = provider.readPoller(false);
519 Poller[] readPollers = new Poller[readPollerCount];
520 Poller[] writePollers = new Poller[writePollerCount];
521
522 try {
523 for (int i = 0; i < readPollerCount; i++) {
524 readPollers[i] = provider.readPoller(true);
525 }
526 for (int i = 0; i < writePollerCount; i++) {
527 writePollers[i] = provider.writePoller(true);
528 }
529 } catch (Throwable e) {
530 masterPoller.close();
531 closeAll(readPollers);
532 closeAll(writePollers);
533 throw e;
534 }
535
536 this.masterPoller = masterPoller;
537 this.readPollers = readPollers;
538 this.writePollers = writePollers;
539
540 ThreadFactory factory = Thread.ofVirtual()
541 .inheritInheritableThreadLocals(false)
542 .name("SubPoller-", 0)
543 .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
544 .factory();
545 this.executor = Executors.newThreadPerTaskExecutor(factory);
546 }
547
548 @Override
549 void start() {
550 startPlatformThread("Master-Poller", masterPoller::pollerLoop);
551 Arrays.stream(readPollers).forEach(p -> {
552 executor.execute(() -> p.subPollerLoop(masterPoller));
553 });
554 Arrays.stream(writePollers).forEach(p -> {
555 executor.execute(() -> p.subPollerLoop(masterPoller));
556 });
557 }
558
559 private Poller readPoller(int fdVal) {
560 int index = provider().fdValToIndex(fdVal, readPollers.length);
561 return readPollers[index];
562 }
563
564 private Poller writePoller(int fdVal) {
565 int index = provider().fdValToIndex(fdVal, writePollers.length);
566 return writePollers[index];
567 }
568
569 @Override
570 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
571 Poller poller = (event == Net.POLLIN)
572 ? readPoller(fdVal)
573 : writePoller(fdVal);
574 poller.poll(fdVal, nanos, isOpen);
575 }
576
577 @Override
578 void pollSelector(int fdVal, long nanos) throws IOException {
579 masterPoller.poll(fdVal, nanos, () -> true);
580 }
581
582 @Override
583 Poller masterPoller() {
584 return masterPoller;
585 }
586
587 @Override
588 List<Poller> readPollers() {
589 return List.of(readPollers);
590 }
591
592 @Override
593 List<Poller> writePollers() {
594 return List.of(writePollers);
595 }
596 }
597
598 /**
599 * POLLER_PER_CARRIER poller group. The read poller is a per-carrier virtual thread.
600 * When a virtual thread polls a file descriptor for POLLIN, then it will use (almost
601 * always, not guaranteed) the read poller for its carrier. When a read poller needs
602 * to block then it registers with a system-wide "master poller" that runs in a
603 * dedicated platform thread. The read poller terminates if the carrier terminates.
604 * The write pollers are system-wide platform threads (usually one).
605 */
606 private static class PollerPerCarrierPollerGroup extends PollerGroup {
607 private record CarrierPoller(PollerPerCarrierPollerGroup group, Poller readPoller) { }
608 private static final TerminatingThreadLocal<CarrierPoller> CARRIER_POLLER =
609 new TerminatingThreadLocal<>() {
610 @Override
611 protected void threadTerminated(CarrierPoller carrierPoller) {
612 Poller readPoller = carrierPoller.readPoller();
613 carrierPoller.group().carrierTerminated(readPoller);
614 }
615 };
616
617 private final Poller masterPoller;
618 private final Set<Poller> readPollers;
619 private final Poller[] writePollers;
620
621 /**
622 * Create a PollerPerCarrierPollerGroup with the given number of write pollers.
623 */
624 PollerPerCarrierPollerGroup(PollerProvider provider,
625 int writePollerCount) throws IOException {
626 super(provider);
627 Poller masterPoller = provider.readPoller(false);
628 Poller[] writePollers = new Poller[writePollerCount];
629 try {
630 for (int i = 0; i < writePollerCount; i++) {
631 writePollers[i] = provider.writePoller(false);
632 }
633 } catch (Throwable e) {
634 masterPoller.close();
635 closeAll(writePollers);
636 throw e;
637 }
638 this.masterPoller = masterPoller;
639 this.readPollers = ConcurrentHashMap.newKeySet();;
640 this.writePollers = writePollers;
641 }
642
643 @Override
644 void start() {
645 startPlatformThread("Master-Poller", masterPoller::pollerLoop);
646 Arrays.stream(writePollers).forEach(p -> {
647 startPlatformThread("Write-Poller", p::pollerLoop);
648 });
649 }
650
651 private Poller writePoller(int fdVal) {
652 int index = provider().fdValToIndex(fdVal, writePollers.length);
653 return writePollers[index];
654 }
655
656 /**
657 * Starts a read sub-poller in a virtual thread.
658 */
659 private Poller startReadPoller() throws IOException {
660 assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
661
662 // create read sub-poller
663 Poller readPoller = provider().readPoller(true);
664 readPollers.add(readPoller);
665
666 // start virtual thread to execute sub-polling loop
667 Thread carrier = JLA.currentCarrierThread();
668 Thread.Builder.OfVirtual builder = Thread.ofVirtual()
669 .inheritInheritableThreadLocals(false)
670 .name(carrier.getName() + "-Read-Poller")
671 .uncaughtExceptionHandler((_, e) -> e.printStackTrace());
672 Thread thread = JLA.defaultVirtualThreadScheduler()
673 .newThread(builder, carrier, () -> subPollerLoop(readPoller))
674 .thread();
675 thread.start();
676 return readPoller;
677 }
678
679 /**
680 * Returns the read poller for the current carrier, starting it if required.
681 */
682 private Poller readPoller() throws IOException {
683 assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
684 Continuation.pin();
685 try {
686 CarrierPoller carrierPoller = CARRIER_POLLER.get();
687 if (carrierPoller != null) {
688 return carrierPoller.readPoller();
689 } else {
690 // first poll on this carrier will start poller
691 Poller readPoller = startReadPoller();
692 CARRIER_POLLER.set(new CarrierPoller(this, readPoller));
693 return readPoller;
694 }
695 } finally {
696 Continuation.unpin();
697 }
698 }
699
700 @Override
701 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
702 // for POLLIN, get the read poller for this carrier
703 if (event == Net.POLLIN
704 && Thread.currentThread().isVirtual()
705 && ContinuationSupport.isSupported()) {
706 readPoller().poll(fdVal, nanos, isOpen);
707 return;
708 }
709
710 // -XX:-VMContinuations or POLLIN from platform thread does master poller
711 if (event == Net.POLLIN) {
712 masterPoller.poll(fdVal, nanos, isOpen);
713 } else {
714 writePoller(fdVal).poll(fdVal, nanos, isOpen);
715 }
716 }
717
718 @Override
719 void pollSelector(int fdVal, long nanos) throws IOException {
720 masterPoller.poll(fdVal, nanos, () -> true);
721 }
722
723 /**
724 * Sub-poller polling loop.
725 */
726 private void subPollerLoop(Poller readPoller) {
727 try {
728 readPoller.subPollerLoop(masterPoller);
729 } finally {
730 // wakeup all threads waiting on file descriptors registered with the
731 // read poller, these I/O operation will migrate to another carrier.
732 readPoller.wakeupAll();
733
734 // remove from serviceability view
735 readPollers.remove(readPoller);
736 }
737 }
738
739 /**
740 * Invoked by the carrier thread before it terminates.
741 */
742 private void carrierTerminated(Poller readPoller) {
743 readPoller.setShutdown();
744 try {
745 readPoller.wakeupPoller();
746 } catch (Throwable e) {
747 e.printStackTrace();
748 }
749 }
750
751 @Override
752 Poller masterPoller() {
753 return masterPoller;
754 }
755
756 @Override
757 List<Poller> readPollers() {
758 return readPollers.stream().toList();
759 }
760
761 @Override
762 List<Poller> writePollers() {
763 return List.of(writePollers);
764 }
765
766 @Override
767 boolean useLazyUnpark() {
768 return true;
769 }
770 }
771
772 /**
773 * Reads the given property name to get the poller count. If the property is
774 * set then the value must be a power of 2. Returns 1 if the property is not
775 * set.
776 * @throws IllegalArgumentException if the property is set to a value that
777 * is not a power of 2.
778 */
779 private static int pollerCount(String propName, int defaultCount) {
780 String s = System.getProperty(propName);
781 int count = (s != null) ? Integer.parseInt(s) : defaultCount;
782
783 // check power of 2
784 if (count != Integer.highestOneBit(count)) {
785 String msg = propName + " is set to a value that is not a power of 2";
786 throw new IllegalArgumentException(msg);
787 }
788 return count;
789 }
790
791 /**
792 * Return the master poller or null if there is no master poller.
793 */
794 public static Poller masterPoller() {
795 return POLLER_GROUP.masterPoller();
796 }
797
798 /**
799 * Return the list of read pollers.
800 */
801 public static List<Poller> readPollers() {
802 return POLLER_GROUP.readPollers();
803 }
804
805 /**
806 * Return the list of write pollers.
807 */
808 public static List<Poller> writePollers() {
809 return POLLER_GROUP.writePollers();
810 }
811 }