1 /*
2 * Copyright (c) 2017, 2024, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package sun.nio.ch;
26
27 import java.io.IOException;
28 import java.io.UncheckedIOException;
29 import java.lang.ref.Reference;
30 import java.util.Arrays;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Objects;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.Executor;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.ThreadFactory;
39 import java.util.concurrent.locks.LockSupport;
40 import java.util.function.BooleanSupplier;
41 import jdk.internal.access.JavaLangAccess;
42 import jdk.internal.access.SharedSecrets;
43 import jdk.internal.misc.InnocuousThread;
44 import jdk.internal.misc.TerminatingThreadLocal;
45 import jdk.internal.vm.Continuation;
46 import jdk.internal.vm.ContinuationSupport;
47 import jdk.internal.vm.annotation.Stable;
48
49 /**
50 * I/O poller to allow virtual threads park until a file descriptor is ready for I/O.
51 * Implementations also optionally support read/write operations where virtual threads
52 * park until bytes are read or written.
53 */
54 public abstract class Poller {
55 private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
56
57 // the poller group for the I/O pollers and poller threads
58 private static final PollerGroup POLLER_GROUP = createPollerGroup();
59
60 // the poller or sub-poller thread (used for observability only)
61 private @Stable Thread owner;
62
63 // maps file descriptors to parked Thread
64 private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
65
66 // shutdown (if supported by poller group)
67 private volatile boolean shutdown;
68
69 /**
70 * Poller mode.
71 */
72 enum Mode {
73 /**
74 * Read and write pollers are platform threads that block waiting for events and
75 * unpark virtual threads when file descriptors are ready for I/O.
76 */
77 SYSTEM_THREADS,
78
79 /**
80 * Read and write pollers are virtual threads that poll for events, yielding
81 * between polls and unparking virtual threads when file descriptors are
82 * ready for I/O. If there are no events then the poller threads park until there
83 * are I/O events to poll. This mode helps to integrate polling with virtual
84 * thread scheduling. The approach is similar to the default scheme in "User-level
85 * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
86 * (https://dl.acm.org/doi/10.1145/3379483).
87 */
88 VTHREAD_POLLERS,
89
90 /**
91 * Read pollers are per-carrier virtual threads that poll for events, yielding
92 * between polls and unparking virtual threads when file descriptors are ready
93 * for I/O. If there are no events then the poller threads park until there
94 * are I/O events to poll. The write poller is a system-wide platform thread.
95 */
96 POLLER_PER_CARRIER
97 }
98
99 /**
100 * Create and return the PollerGroup.
101 */
102 private static PollerGroup createPollerGroup() {
103 try {
104 PollerProvider provider;
105 if (System.getProperty("jdk.pollerMode") instanceof String s) {
106 Mode mode = switch (s) {
107 case "1" -> Mode.SYSTEM_THREADS;
108 case "2" -> Mode.VTHREAD_POLLERS;
109 case "3" -> Mode.POLLER_PER_CARRIER;
110 default -> {
111 throw new RuntimeException(s + " is not a valid polling mode");
112 }
113 };
114 provider = PollerProvider.createProvider(mode);
115 } else {
116 provider = PollerProvider.createProvider();
117 }
118
119 int readPollers = pollerCount("jdk.readPollers", provider.defaultReadPollers());
120 int writePollers = pollerCount("jdk.writePollers", provider.defaultWritePollers());
121 PollerGroup group = switch (provider.pollerMode()) {
122 case SYSTEM_THREADS -> new SystemThreadsPollerGroup(provider, readPollers, writePollers);
123 case VTHREAD_POLLERS -> new VThreadsPollerGroup(provider, readPollers, writePollers);
124 case POLLER_PER_CARRIER -> new PollerPerCarrierPollerGroup(provider, writePollers);
125 };
126 group.start();
127 return group;
128 } catch (IOException ioe) {
129 throw new UncheckedIOException(ioe);
130 }
131 }
132
133 /**
134 * Initialize a Poller.
135 */
136 protected Poller() {
137 }
138
139 /**
140 * Closes the poller and release resources. This method can only be used to cleanup
141 * when creating a poller group fails.
142 */
143 abstract void close() throws IOException;
144
145 /**
146 * Sets the poller's thread owner.
147 */
148 private void setOwner() {
149 owner = Thread.currentThread();
150 }
151
152 /**
153 * Returns true if this poller is marked for shutdown.
154 */
155 boolean isShutdown() {
156 return shutdown;
157 }
158
159 /**
160 * Marks this poller for shutdown.
161 */
162 private void setShutdown() {
163 shutdown = true;
164 }
165
166 /**
167 * Returns the poller's file descriptor to use when polling with the master poller.
168 * @throws UnsupportedOperationException if not supported
169 */
170 int fdVal() {
171 throw new UnsupportedOperationException();
172 }
173
174 /**
175 * Invoked if when this poller's file descriptor is polled by the master poller.
176 */
177 void pollerPolled() throws IOException {
178 }
179
180 /**
181 * Register the file descriptor with the I/O event management facility so that it is
182 * polled when the file descriptor is ready for I/O. The registration is "one shot",
183 * meaning it should be polled at most once.
184 */
185 abstract void implStartPoll(int fdVal) throws IOException;
186
187 /**
188 * Deregister a file descriptor from the I/O event management facility. This may be
189 * a no-op in some implementations when the file descriptor has already been polled.
190 * @param polled true if the file descriptor has already been polled
191 */
192 abstract void implStopPoll(int fdVal, boolean polled) throws IOException;
193
194 /**
195 * Poll for events. The {@link #polled(int)} method is invoked for each
196 * polled file descriptor.
197 *
198 * @param timeout if positive then block for up to {@code timeout} milliseconds,
199 * if zero then don't block, if -1 then block indefinitely
200 * @return >0 if file descriptors are polled, 0 if no file descriptor polled
201 */
202 abstract int poll(int timeout) throws IOException;
203
204 /**
205 * Wakeup the poller thread if blocked in poll so it can shutdown.
206 * @throws UnsupportedOperationException if not supported
207 */
208 void wakeupPoller() throws IOException {
209 throw new UnsupportedOperationException();
210 }
211
212 /**
213 * Callback by the poll method when a file descriptor is polled.
214 */
215 final void polled(int fdVal) {
216 Thread t = map.remove(fdVal);
217 if (t != null) {
218 LockSupport.unpark(t);
219 }
220 }
221
222 /**
223 * Parks the current thread until a file descriptor is ready for the given op.
224 * @param fdVal the file descriptor
225 * @param event POLLIN or POLLOUT
226 * @param nanos the waiting time or 0 to wait indefinitely
227 * @param isOpen supplies a boolean to indicate if the enclosing object is open
228 */
229 public static void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
230 POLLER_GROUP.poll(fdVal, event, nanos, isOpen);
231 }
232
233 /**
234 * Parks the current thread until a Selector's file descriptor is ready.
235 * @param fdVal the Selector's file descriptor
236 * @param nanos the waiting time or 0 to wait indefinitely
237 */
238 public static void pollSelector(int fdVal, long nanos) throws IOException {
239 POLLER_GROUP.pollSelector(fdVal, nanos);
240 }
241
242 /**
243 * Unpark the given thread so that it stops polling.
244 */
245 public static void stopPoll(Thread thread) {
246 LockSupport.unpark(thread);
247 }
248
249 /**
250 * Parks the current thread until a file descriptor is ready.
251 */
252 private void poll(int fdVal, long nanos, BooleanSupplier isOpen) throws IOException {
253 startPoll(fdVal);
254 try {
255 if (isOpen.getAsBoolean() && !isShutdown()) {
256 if (nanos > 0) {
257 LockSupport.parkNanos(nanos);
258 } else {
259 LockSupport.park();
260 }
261 }
262 } finally {
263 stopPoll(fdVal);
264 }
265 }
266
267 /**
268 * Register a file descriptor with the I/O event management facility so that it is
269 * polled when the file descriptor is ready for I/O.
270 */
271 private void startPoll(int fdVal) throws IOException {
272 Thread previous = map.put(fdVal, Thread.currentThread());
273 assert previous == null;
274 try {
275 implStartPoll(fdVal);
276 } catch (Throwable t) {
277 map.remove(fdVal);
278 throw t;
279 } finally {
280 Reference.reachabilityFence(this);
281 }
282 }
283
284 /**
285 * Deregister a file descriptor from the I/O event management facility.
286 */
287 private void stopPoll(int fdVal) throws IOException {
288 Thread previous = map.remove(fdVal);
289 boolean polled = (previous == null);
290 assert polled || previous == Thread.currentThread();
291 try {
292 implStopPoll(fdVal, polled);
293 } finally {
294 Reference.reachabilityFence(this);
295 }
296 }
297
298 /**
299 * Master polling loop. The {@link #polled(int)} method is invoked for each file
300 * descriptor that is polled.
301 */
302 private void pollerLoop() {
303 setOwner();
304 try {
305 while (!isShutdown()) {
306 poll(-1);
307 }
308 } catch (Exception e) {
309 e.printStackTrace();
310 }
311 }
312
313 /**
314 * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
315 * descriptor that is polled.
316 *
317 * The sub-poller registers its file descriptor with the master poller to park until
318 * there are events to poll. When unparked, it does non-blocking polls and parks
319 * again when there are no more events. The sub-poller yields after each poll to help
320 * with fairness and to avoid re-registering with the master poller where possible.
321 */
322 private void subPollerLoop(Poller masterPoller) {
323 assert Thread.currentThread().isVirtual();
324 setOwner();
325 try {
326 int polled = 0;
327 while (!isShutdown()) {
328 if (polled == 0) {
329 masterPoller.poll(fdVal(), 0, () -> true); // park
330 pollerPolled();
331 } else {
332 Thread.yield();
333 }
334 polled = poll(0);
335 }
336 } catch (Exception e) {
337 e.printStackTrace();
338 }
339 }
340
341 /**
342 * Unparks all threads waiting on a file descriptor registered with this poller.
343 */
344 private void wakeupAll() {
345 map.values().forEach(LockSupport::unpark);
346 }
347
348 @Override
349 public String toString() {
350 return String.format("%s [registered = %d, owner = %s]",
351 Objects.toIdentityString(this), map.size(), owner);
352 }
353
354 /**
355 * A group of poller threads that support virtual threads polling file descriptors.
356 */
357 private static abstract class PollerGroup {
358 private final PollerProvider provider;
359
360 PollerGroup(PollerProvider provider) {
361 this.provider = provider;
362 }
363
364 final PollerProvider provider() {
365 return provider;
366 }
367
368 /**
369 * Starts the poller group and any system-wide poller threads.
370 */
371 abstract void start();
372
373 /**
374 * Parks the current thread until a file descriptor is ready for the given op.
375 */
376 abstract void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException;
377
378 /**
379 * Parks the current thread until a Selector's file descriptor is ready.
380 */
381 void pollSelector(int fdVal, long nanos) throws IOException {
382 poll(fdVal, Net.POLLIN, nanos, () -> true);
383 }
384
385 /**
386 * Starts a platform thread to run the given task.
387 */
388 protected final void startPlatformThread(String name, Runnable task) {
389 Thread thread = InnocuousThread.newSystemThread(name, task);
390 thread.setDaemon(true);
391 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
392 thread.start();
393 }
394
395 /**
396 * Return the master poller, or null if no master poller.
397 */
398 abstract Poller masterPoller();
399
400 /**
401 * Return the read pollers.
402 */
403 abstract List<Poller> readPollers();
404
405 /**
406 * Return the write pollers.
407 */
408 abstract List<Poller> writePollers();
409
410 /**
411 * Close the given pollers.
412 */
413 static void closeAll(Poller... pollers) {
414 for (Poller poller : pollers) {
415 if (poller != null) {
416 try {
417 poller.close();
418 } catch (IOException _) { }
419 }
420 }
421 }
422
423 /**
424 * Returns true if the read pollers in this poller group support read ops in
425 * addition to POLLIN polling.
426 */
427 boolean supportReadOps() {
428 return provider().supportReadOps();
429 }
430
431 /**
432 * Reads bytes into a byte array.
433 * @throws UnsupportedOperationException if not supported
434 */
435 abstract int read(int fdVal, byte[] b, int off, int len, long nanos,
436 BooleanSupplier isOpen) throws IOException;
437
438 /**
439 * Returns true if the write pollers in this poller group support write ops in
440 * addition to POLLOUT polling.
441 */
442 boolean supportWriteOps() {
443 return provider().supportWriteOps();
444 }
445
446 /**
447 * Write bytes from a byte array.
448 * @throws UnsupportedOperationException if not supported
449 */
450 abstract int write(int fdVal, byte[] b, int off, int len,
451 BooleanSupplier isOpen) throws IOException;
452 }
453
454 /**
455 * SYSTEM_THREADS poller group. The read and write pollers are system-wide platform threads.
456 */
457 private static class SystemThreadsPollerGroup extends PollerGroup {
458 // system-wide read and write pollers
459 private final Poller[] readPollers;
460 private final Poller[] writePollers;
461
462 SystemThreadsPollerGroup(PollerProvider provider,
463 int readPollerCount,
464 int writePollerCount) throws IOException {
465 super(provider);
466 Poller[] readPollers = new Poller[readPollerCount];
467 Poller[] writePollers = new Poller[writePollerCount];
468 try {
469 for (int i = 0; i < readPollerCount; i++) {
470 readPollers[i] = provider.readPoller(false);
471 }
472 for (int i = 0; i < writePollerCount; i++) {
473 writePollers[i] = provider.writePoller(false);
474 }
475 } catch (Throwable e) {
476 closeAll(readPollers);
477 closeAll(writePollers);
478 throw e;
479 }
480
481 this.readPollers = readPollers;
482 this.writePollers = writePollers;
483 }
484
485 @Override
486 void start() {
487 Arrays.stream(readPollers).forEach(p -> {
488 startPlatformThread("Read-Poller", p::pollerLoop);
489 });
490 Arrays.stream(writePollers).forEach(p -> {
491 startPlatformThread("Write-Poller", p::pollerLoop);
492 });
493 }
494
495 private Poller readPoller(int fdVal) {
496 int index = provider().fdValToIndex(fdVal, readPollers.length);
497 return readPollers[index];
498 }
499
500 private Poller writePoller(int fdVal) {
501 int index = provider().fdValToIndex(fdVal, writePollers.length);
502 return writePollers[index];
503 }
504
505 @Override
506 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
507 Poller poller = (event == Net.POLLIN)
508 ? readPoller(fdVal)
509 : writePoller(fdVal);
510 poller.poll(fdVal, nanos, isOpen);
511 }
512
513 @Override
514 int read(int fdVal, byte[] b, int off, int len, long nanos,
515 BooleanSupplier isOpen) throws IOException {
516 return readPoller(fdVal).implRead(fdVal, b, off, len, nanos, isOpen);
517 }
518
519 @Override
520 int write(int fdVal, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException {
521 return writePoller(fdVal).implWrite(fdVal, b, off, len, isOpen);
522 }
523
524 @Override
525 Poller masterPoller() {
526 return null;
527 }
528
529 @Override
530 List<Poller> readPollers() {
531 return List.of(readPollers);
532 }
533
534 @Override
535 List<Poller> writePollers() {
536 return List.of(writePollers);
537 }
538 }
539
540 /**
541 * VTHREAD_POLLERS poller group. The read and write pollers are virtual threads.
542 * When read and write pollers need to block then they register with a system-wide
543 * "master poller" that runs in a dedicated platform thread.
544 */
545 private static class VThreadsPollerGroup extends PollerGroup {
546 private final Poller masterPoller;
547 private final Poller[] readPollers;
548 private final Poller[] writePollers;
549
550 // keep virtual thread pollers alive
551 private final Executor executor;
552
553 VThreadsPollerGroup(PollerProvider provider,
554 int readPollerCount,
555 int writePollerCount) throws IOException {
556 super(provider);
557 Poller masterPoller = provider.readPoller(false);
558 Poller[] readPollers = new Poller[readPollerCount];
559 Poller[] writePollers = new Poller[writePollerCount];
560
561 try {
562 for (int i = 0; i < readPollerCount; i++) {
563 readPollers[i] = provider.readPoller(true);
564 }
565 for (int i = 0; i < writePollerCount; i++) {
566 writePollers[i] = provider.writePoller(true);
567 }
568 } catch (Throwable e) {
569 masterPoller.close();
570 closeAll(readPollers);
571 closeAll(writePollers);
572 throw e;
573 }
574
575 this.masterPoller = masterPoller;
576 this.readPollers = readPollers;
577 this.writePollers = writePollers;
578
579 ThreadFactory factory = Thread.ofVirtual()
580 .inheritInheritableThreadLocals(false)
581 .name("SubPoller-", 0)
582 .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
583 .factory();
584 this.executor = Executors.newThreadPerTaskExecutor(factory);
585 }
586
587 @Override
588 void start() {
589 startPlatformThread("Master-Poller", masterPoller::pollerLoop);
590 Arrays.stream(readPollers).forEach(p -> {
591 executor.execute(() -> p.subPollerLoop(masterPoller));
592 });
593 Arrays.stream(writePollers).forEach(p -> {
594 executor.execute(() -> p.subPollerLoop(masterPoller));
595 });
596 }
597
598 private Poller readPoller(int fdVal) {
599 int index = provider().fdValToIndex(fdVal, readPollers.length);
600 return readPollers[index];
601 }
602
603 private Poller writePoller(int fdVal) {
604 int index = provider().fdValToIndex(fdVal, writePollers.length);
605 return writePollers[index];
606 }
607
608 @Override
609 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
610 Poller poller = (event == Net.POLLIN)
611 ? readPoller(fdVal)
612 : writePoller(fdVal);
613 poller.poll(fdVal, nanos, isOpen);
614 }
615
616 @Override
617 int read(int fdVal, byte[] b, int off, int len, long nanos,
618 BooleanSupplier isOpen) throws IOException {
619 return readPoller(fdVal).implRead(fdVal, b, off, len, nanos, isOpen);
620 }
621
622 @Override
623 int write(int fdVal, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException {
624 return writePoller(fdVal).implWrite(fdVal, b, off, len, isOpen);
625 }
626
627 @Override
628 void pollSelector(int fdVal, long nanos) throws IOException {
629 masterPoller.poll(fdVal, nanos, () -> true);
630 }
631
632 @Override
633 Poller masterPoller() {
634 return masterPoller;
635 }
636
637 @Override
638 List<Poller> readPollers() {
639 return List.of(readPollers);
640 }
641
642 @Override
643 List<Poller> writePollers() {
644 return List.of(writePollers);
645 }
646 }
647
648 /**
649 * POLLER_PER_CARRIER poller group. The read poller is a per-carrier virtual thread.
650 * When a virtual thread polls a file descriptor for POLLIN, then it will use (almost
651 * always, not guaranteed) the read poller for its carrier. When a read poller needs
652 * to block then it registers with a system-wide "master poller" that runs in a
653 * dedicated platform thread. The read poller terminates if the carrier terminates.
654 * The write pollers are system-wide platform threads (usually one).
655 */
656 private static class PollerPerCarrierPollerGroup extends PollerGroup {
657 private record CarrierPoller(PollerPerCarrierPollerGroup group, Poller readPoller) { }
658 private static final TerminatingThreadLocal<CarrierPoller> CARRIER_POLLER =
659 new TerminatingThreadLocal<>() {
660 @Override
661 protected void threadTerminated(CarrierPoller carrierPoller) {
662 Poller readPoller = carrierPoller.readPoller();
663 carrierPoller.group().carrierTerminated(readPoller);
664 }
665 };
666
667 private final Poller masterPoller;
668 private final Set<Poller> readPollers;
669 private final Poller[] writePollers;
670
671 /**
672 * Create a PollerPerCarrierPollerGroup with the given number of write pollers.
673 */
674 PollerPerCarrierPollerGroup(PollerProvider provider,
675 int writePollerCount) throws IOException {
676 super(provider);
677 Poller masterPoller = provider.readPoller(false);
678 Poller[] writePollers = new Poller[writePollerCount];
679 try {
680 for (int i = 0; i < writePollerCount; i++) {
681 writePollers[i] = provider.writePoller(false);
682 }
683 } catch (Throwable e) {
684 masterPoller.close();
685 closeAll(writePollers);
686 throw e;
687 }
688 this.masterPoller = masterPoller;
689 this.readPollers = ConcurrentHashMap.newKeySet();;
690 this.writePollers = writePollers;
691 }
692
693 @Override
694 void start() {
695 startPlatformThread("Master-Poller", masterPoller::pollerLoop);
696 Arrays.stream(writePollers).forEach(p -> {
697 startPlatformThread("Write-Poller", p::pollerLoop);
698 });
699 }
700
701 private Poller writePoller(int fdVal) {
702 int index = provider().fdValToIndex(fdVal, writePollers.length);
703 return writePollers[index];
704 }
705
706 /**
707 * Starts a read sub-poller in a virtual thread.
708 */
709 private Poller startReadPoller() throws IOException {
710 assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
711
712 // create read sub-poller
713 Poller readPoller = provider().readPoller(true);
714 readPollers.add(readPoller);
715
716 // start virtual thread to execute sub-polling loop
717 Thread carrier = JLA.currentCarrierThread();
718 var scheduler = JLA.virtualThreadScheduler(Thread.currentThread());
719 @SuppressWarnings("restricted")
720 var _ = Thread.ofVirtual()
721 .scheduler(scheduler)
722 .inheritInheritableThreadLocals(false)
723 .name(carrier.getName() + "-Read-Poller")
724 .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
725 .start(() -> subPollerLoop(readPoller));
726 return readPoller;
727 }
728
729 /**
730 * Returns the read poller for the current carrier, starting it if required.
731 */
732 private Poller readPoller() throws IOException {
733 assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
734 Continuation.pin();
735 try {
736 CarrierPoller carrierPoller = CARRIER_POLLER.get();
737 if (carrierPoller != null) {
738 return carrierPoller.readPoller();
739 } else {
740 // first poll on this carrier will start poller
741 Poller readPoller = startReadPoller();
742 CARRIER_POLLER.set(new CarrierPoller(this, readPoller));
743 return readPoller;
744 }
745 } finally {
746 Continuation.unpin();
747 }
748 }
749
750 @Override
751 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
752 // for POLLIN, get the read poller for this carrier
753 if (event == Net.POLLIN
754 && Thread.currentThread().isVirtual()
755 && ContinuationSupport.isSupported()) {
756 readPoller().poll(fdVal, nanos, isOpen);
757 return;
758 }
759
760 // -XX:-VMContinuations or POLLIN from platform thread does master poller
761 if (event == Net.POLLIN) {
762 masterPoller.poll(fdVal, nanos, isOpen);
763 } else {
764 writePoller(fdVal).poll(fdVal, nanos, isOpen);
765 }
766 }
767
768 @Override
769 void pollSelector(int fdVal, long nanos) throws IOException {
770 masterPoller.poll(fdVal, nanos, () -> true);
771 }
772
773 @Override
774 int read(int fdVal, byte[] b, int off, int len, long nanos,
775 BooleanSupplier isOpen) throws IOException {
776 return readPoller().implRead(fdVal, b, off, len, nanos, isOpen);
777 }
778
779 @Override
780 int write(int fdVal, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException {
781 return writePoller(fdVal).implWrite(fdVal, b, off, len, isOpen);
782 }
783
784 /**
785 * Sub-poller polling loop.
786 */
787 private void subPollerLoop(Poller readPoller) {
788 try {
789 readPoller.subPollerLoop(masterPoller);
790 } finally {
791 // wakeup all threads waiting on file descriptors registered with the
792 // read poller, these I/O operation will migrate to another carrier.
793 readPoller.wakeupAll();
794
795 // remove from serviceability view
796 readPollers.remove(readPoller);
797 }
798 }
799
800 /**
801 * Invoked by the carrier thread before it terminates.
802 */
803 private void carrierTerminated(Poller readPoller) {
804 readPoller.setShutdown();
805 try {
806 readPoller.wakeupPoller();
807 } catch (Throwable e) {
808 e.printStackTrace();
809 }
810 }
811
812 @Override
813 Poller masterPoller() {
814 return masterPoller;
815 }
816
817 @Override
818 List<Poller> readPollers() {
819 return readPollers.stream().toList();
820 }
821
822 @Override
823 List<Poller> writePollers() {
824 return List.of(writePollers);
825 }
826 }
827
828 /**
829 * Reads the given property name to get the poller count. If the property is
830 * set then the value must be a power of 2. Returns 1 if the property is not
831 * set.
832 * @throws IllegalArgumentException if the property is set to a value that
833 * is not a power of 2.
834 */
835 private static int pollerCount(String propName, int defaultCount) {
836 String s = System.getProperty(propName);
837 int count = (s != null) ? Integer.parseInt(s) : defaultCount;
838
839 // check power of 2
840 if (count != Integer.highestOneBit(count)) {
841 String msg = propName + " is set to a value that is not a power of 2";
842 throw new IllegalArgumentException(msg);
843 }
844 return count;
845 }
846
847
848 /**
849 * Returns true if read ops are supported in addition to POLLIN polling.
850 */
851 public static boolean supportReadOps() {
852 return POLLER_GROUP.supportReadOps();
853 }
854
855 /**
856 * Returns true if write ops are supported in addition to POLLOUT polling.
857 */
858 public static boolean supportWriteOps() {
859 return POLLER_GROUP.supportWriteOps();
860 }
861
862 /**
863 * Parks the current thread until bytes are read into a byte array.
864 * @param isOpen supplies a boolean to indicate if the enclosing object is open
865 * @return the number of bytes read (>0), EOF (-1), or UNAVAILABLE (-2) if unparked
866 * or the timeout expires while waiting for bytes to be read
867 * @throws UnsupportedOperationException if not supported
868 */
869 public static int read(int fdVal, byte[] b, int off, int len, long nanos,
870 BooleanSupplier isOpen) throws IOException {
871 return POLLER_GROUP.read(fdVal, b, off, len, nanos, isOpen);
872 }
873
874 /**
875 * Parks the current thread until bytes are written from a byte array.
876 * @param isOpen supplies a boolean to indicate if the enclosing object is open
877 * @return the number of bytes read (>0), EOF (-1), or UNAVAILABLE (-2) if unparked
878 * or the timeout expires while waiting for bytes to be read
879 * @throws UnsupportedOperationException if not supported
880 */
881 public static int write(int fdVal, byte[] b, int off, int len,
882 BooleanSupplier isOpen) throws IOException {
883 return POLLER_GROUP.write(fdVal, b, off, len, isOpen);
884 }
885
886 /**
887 * Parks the current thread until bytes are read a byte array. This method is
888 * overridden by poller implementations that support this operation.
889 */
890 int implRead(int fdVal, byte[] b, int off, int len, long nanos,
891 BooleanSupplier isOpen) throws IOException {
892 throw new UnsupportedOperationException();
893 }
894
895 /**
896 * Parks the current thread until bytes are written from a byte array. This
897 * method is overridden by poller implementations that support this operation.
898 */
899 int implWrite(int fdVal, byte[] b, int off, int len,
900 BooleanSupplier isOpen) throws IOException {
901 throw new UnsupportedOperationException();
902 }
903
904 /**
905 * Return the master poller or null if there is no master poller.
906 */
907 public static Poller masterPoller() {
908 return POLLER_GROUP.masterPoller();
909 }
910
911 /**
912 * Return the list of read pollers.
913 */
914 public static List<Poller> readPollers() {
915 return POLLER_GROUP.readPollers();
916 }
917
918 /**
919 * Return the list of write pollers.
920 */
921 public static List<Poller> writePollers() {
922 return POLLER_GROUP.writePollers();
923 }
924 }