1 /* 2 * Copyright (c) 2017, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.nio.ch; 26 27 import java.io.IOException; 28 import java.io.UncheckedIOException; 29 import java.lang.ref.Reference; 30 import java.util.Arrays; 31 import java.util.List; 32 import java.util.Map; 33 import java.util.Objects; 34 import java.util.Set; 35 import java.util.concurrent.ConcurrentHashMap; 36 import java.util.concurrent.Executor; 37 import java.util.concurrent.Executors; 38 import java.util.concurrent.ThreadFactory; 39 import java.util.concurrent.locks.LockSupport; 40 import java.util.function.BooleanSupplier; 41 import jdk.internal.access.JavaLangAccess; 42 import jdk.internal.access.SharedSecrets; 43 import jdk.internal.misc.InnocuousThread; 44 import jdk.internal.misc.TerminatingThreadLocal; 45 import jdk.internal.vm.Continuation; 46 import jdk.internal.vm.ContinuationSupport; 47 import jdk.internal.vm.annotation.Stable; 48 49 /** 50 * I/O poller to allow virtual threads park until a file descriptor is ready for I/O. 51 * Implementations also optionally support read/write operations where virtual threads 52 * park until bytes are read or written. 53 */ 54 public abstract class Poller { 55 private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess(); 56 57 // the poller group for the I/O pollers and poller threads 58 private static final PollerGroup POLLER_GROUP = createPollerGroup(); 59 60 // the poller or sub-poller thread (used for observability only) 61 private @Stable Thread owner; 62 63 // maps file descriptors to parked Thread 64 private final Map<Integer, Thread> map = new ConcurrentHashMap<>(); 65 66 // shutdown (if supported by poller group) 67 private volatile boolean shutdown; 68 69 /** 70 * Poller mode. 71 */ 72 enum Mode { 73 /** 74 * Read and write pollers are platform threads that block waiting for events and 75 * unpark virtual threads when file descriptors are ready for I/O. 76 */ 77 SYSTEM_THREADS, 78 79 /** 80 * Read and write pollers are virtual threads that poll for events, yielding 81 * between polls and unparking virtual threads when file descriptors are 82 * ready for I/O. If there are no events then the poller threads park until there 83 * are I/O events to poll. This mode helps to integrate polling with virtual 84 * thread scheduling. The approach is similar to the default scheme in "User-level 85 * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020 86 * (https://dl.acm.org/doi/10.1145/3379483). 87 */ 88 VTHREAD_POLLERS, 89 90 /** 91 * Read pollers are per-carrier virtual threads that poll for events, yielding 92 * between polls and unparking virtual threads when file descriptors are ready 93 * for I/O. If there are no events then the poller threads park until there 94 * are I/O events to poll. The write poller is a system-wide platform thread. 95 */ 96 POLLER_PER_CARRIER 97 } 98 99 /** 100 * Create and return the PollerGroup. 101 */ 102 private static PollerGroup createPollerGroup() { 103 try { 104 PollerProvider provider; 105 if (System.getProperty("jdk.pollerMode") instanceof String s) { 106 Mode mode = switch (s) { 107 case "1" -> Mode.SYSTEM_THREADS; 108 case "2" -> Mode.VTHREAD_POLLERS; 109 case "3" -> Mode.POLLER_PER_CARRIER; 110 default -> { 111 throw new RuntimeException(s + " is not a valid polling mode"); 112 } 113 }; 114 provider = PollerProvider.createProvider(mode); 115 } else { 116 provider = PollerProvider.createProvider(); 117 } 118 119 int readPollers = pollerCount("jdk.readPollers", provider.defaultReadPollers()); 120 int writePollers = pollerCount("jdk.writePollers", provider.defaultWritePollers()); 121 PollerGroup group = switch (provider.pollerMode()) { 122 case SYSTEM_THREADS -> new SystemThreadsPollerGroup(provider, readPollers, writePollers); 123 case VTHREAD_POLLERS -> new VThreadsPollerGroup(provider, readPollers, writePollers); 124 case POLLER_PER_CARRIER -> new PollerPerCarrierPollerGroup(provider, writePollers); 125 }; 126 group.start(); 127 return group; 128 } catch (IOException ioe) { 129 throw new UncheckedIOException(ioe); 130 } 131 } 132 133 /** 134 * Initialize a Poller. 135 */ 136 protected Poller() { 137 } 138 139 /** 140 * Closes the poller and release resources. This method can only be used to cleanup 141 * when creating a poller group fails. 142 */ 143 abstract void close() throws IOException; 144 145 /** 146 * Sets the poller's thread owner. 147 */ 148 private void setOwner() { 149 owner = Thread.currentThread(); 150 } 151 152 /** 153 * Returns true if this poller is marked for shutdown. 154 */ 155 boolean isShutdown() { 156 return shutdown; 157 } 158 159 /** 160 * Marks this poller for shutdown. 161 */ 162 private void setShutdown() { 163 shutdown = true; 164 } 165 166 /** 167 * Returns the poller's file descriptor to use when polling with the master poller. 168 * @throws UnsupportedOperationException if not supported 169 */ 170 int fdVal() { 171 throw new UnsupportedOperationException(); 172 } 173 174 /** 175 * Invoked if when this poller's file descriptor is polled by the master poller. 176 */ 177 void pollerPolled() throws IOException { 178 } 179 180 /** 181 * Register the file descriptor with the I/O event management facility so that it is 182 * polled when the file descriptor is ready for I/O. The registration is "one shot", 183 * meaning it should be polled at most once. 184 */ 185 abstract void implStartPoll(int fdVal) throws IOException; 186 187 /** 188 * Deregister a file descriptor from the I/O event management facility. This may be 189 * a no-op in some implementations when the file descriptor has already been polled. 190 * @param polled true if the file descriptor has already been polled 191 */ 192 abstract void implStopPoll(int fdVal, boolean polled) throws IOException; 193 194 /** 195 * Poll for events. The {@link #polled(int)} method is invoked for each 196 * polled file descriptor. 197 * 198 * @param timeout if positive then block for up to {@code timeout} milliseconds, 199 * if zero then don't block, if -1 then block indefinitely 200 * @return >0 if file descriptors are polled, 0 if no file descriptor polled 201 */ 202 abstract int poll(int timeout) throws IOException; 203 204 /** 205 * Wakeup the poller thread if blocked in poll so it can shutdown. 206 * @throws UnsupportedOperationException if not supported 207 */ 208 void wakeupPoller() throws IOException { 209 throw new UnsupportedOperationException(); 210 } 211 212 /** 213 * Callback by the poll method when a file descriptor is polled. 214 */ 215 final void polled(int fdVal) { 216 Thread t = map.remove(fdVal); 217 if (t != null) { 218 LockSupport.unpark(t); 219 } 220 } 221 222 /** 223 * Parks the current thread until a file descriptor is ready for the given op. 224 * @param fdVal the file descriptor 225 * @param event POLLIN or POLLOUT 226 * @param nanos the waiting time or 0 to wait indefinitely 227 * @param isOpen supplies a boolean to indicate if the enclosing object is open 228 */ 229 public static void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException { 230 POLLER_GROUP.poll(fdVal, event, nanos, isOpen); 231 } 232 233 /** 234 * Parks the current thread until a Selector's file descriptor is ready. 235 * @param fdVal the Selector's file descriptor 236 * @param nanos the waiting time or 0 to wait indefinitely 237 */ 238 public static void pollSelector(int fdVal, long nanos) throws IOException { 239 POLLER_GROUP.pollSelector(fdVal, nanos); 240 } 241 242 /** 243 * Unpark the given thread so that it stops polling. 244 */ 245 public static void stopPoll(Thread thread) { 246 LockSupport.unpark(thread); 247 } 248 249 /** 250 * Parks the current thread until a file descriptor is ready. 251 */ 252 private void poll(int fdVal, long nanos, BooleanSupplier isOpen) throws IOException { 253 startPoll(fdVal); 254 try { 255 if (isOpen.getAsBoolean() && !isShutdown()) { 256 if (nanos > 0) { 257 LockSupport.parkNanos(nanos); 258 } else { 259 LockSupport.park(); 260 } 261 } 262 } finally { 263 stopPoll(fdVal); 264 } 265 } 266 267 /** 268 * Register a file descriptor with the I/O event management facility so that it is 269 * polled when the file descriptor is ready for I/O. 270 */ 271 private void startPoll(int fdVal) throws IOException { 272 Thread previous = map.put(fdVal, Thread.currentThread()); 273 assert previous == null; 274 try { 275 implStartPoll(fdVal); 276 } catch (Throwable t) { 277 map.remove(fdVal); 278 throw t; 279 } finally { 280 Reference.reachabilityFence(this); 281 } 282 } 283 284 /** 285 * Deregister a file descriptor from the I/O event management facility. 286 */ 287 private void stopPoll(int fdVal) throws IOException { 288 Thread previous = map.remove(fdVal); 289 boolean polled = (previous == null); 290 assert polled || previous == Thread.currentThread(); 291 try { 292 implStopPoll(fdVal, polled); 293 } finally { 294 Reference.reachabilityFence(this); 295 } 296 } 297 298 /** 299 * Master polling loop. The {@link #polled(int)} method is invoked for each file 300 * descriptor that is polled. 301 */ 302 private void pollerLoop() { 303 setOwner(); 304 try { 305 while (!isShutdown()) { 306 poll(-1); 307 } 308 } catch (Exception e) { 309 e.printStackTrace(); 310 } 311 } 312 313 /** 314 * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file 315 * descriptor that is polled. 316 * 317 * The sub-poller registers its file descriptor with the master poller to park until 318 * there are events to poll. When unparked, it does non-blocking polls and parks 319 * again when there are no more events. The sub-poller yields after each poll to help 320 * with fairness and to avoid re-registering with the master poller where possible. 321 */ 322 private void subPollerLoop(Poller masterPoller) { 323 assert Thread.currentThread().isVirtual(); 324 setOwner(); 325 try { 326 int polled = 0; 327 while (!isShutdown()) { 328 if (polled == 0) { 329 masterPoller.poll(fdVal(), 0, () -> true); // park 330 pollerPolled(); 331 } else { 332 Thread.yield(); 333 } 334 polled = poll(0); 335 } 336 } catch (Exception e) { 337 e.printStackTrace(); 338 } 339 } 340 341 /** 342 * Unparks all threads waiting on a file descriptor registered with this poller. 343 */ 344 private void wakeupAll() { 345 map.values().forEach(LockSupport::unpark); 346 } 347 348 @Override 349 public String toString() { 350 return String.format("%s [registered = %d, owner = %s]", 351 Objects.toIdentityString(this), map.size(), owner); 352 } 353 354 /** 355 * A group of poller threads that support virtual threads polling file descriptors. 356 */ 357 private static abstract class PollerGroup { 358 private final PollerProvider provider; 359 360 PollerGroup(PollerProvider provider) { 361 this.provider = provider; 362 } 363 364 final PollerProvider provider() { 365 return provider; 366 } 367 368 /** 369 * Starts the poller group and any system-wide poller threads. 370 */ 371 abstract void start(); 372 373 /** 374 * Parks the current thread until a file descriptor is ready for the given op. 375 */ 376 abstract void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException; 377 378 /** 379 * Parks the current thread until a Selector's file descriptor is ready. 380 */ 381 void pollSelector(int fdVal, long nanos) throws IOException { 382 poll(fdVal, Net.POLLIN, nanos, () -> true); 383 } 384 385 /** 386 * Starts a platform thread to run the given task. 387 */ 388 protected final void startPlatformThread(String name, Runnable task) { 389 Thread thread = InnocuousThread.newSystemThread(name, task); 390 thread.setDaemon(true); 391 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace()); 392 thread.start(); 393 } 394 395 /** 396 * Return the master poller, or null if no master poller. 397 */ 398 abstract Poller masterPoller(); 399 400 /** 401 * Return the read pollers. 402 */ 403 abstract List<Poller> readPollers(); 404 405 /** 406 * Return the write pollers. 407 */ 408 abstract List<Poller> writePollers(); 409 410 /** 411 * Close the given pollers. 412 */ 413 static void closeAll(Poller... pollers) { 414 for (Poller poller : pollers) { 415 if (poller != null) { 416 try { 417 poller.close(); 418 } catch (IOException _) { } 419 } 420 } 421 } 422 423 /** 424 * Returns true if the read pollers in this poller group support read ops in 425 * addition to POLLIN polling. 426 */ 427 boolean supportReadOps() { 428 return provider().supportReadOps(); 429 } 430 431 /** 432 * Reads bytes into a byte array. 433 * @throws UnsupportedOperationException if not supported 434 */ 435 abstract int read(int fdVal, byte[] b, int off, int len, long nanos, 436 BooleanSupplier isOpen) throws IOException; 437 438 /** 439 * Returns true if the write pollers in this poller group support write ops in 440 * addition to POLLOUT polling. 441 */ 442 boolean supportWriteOps() { 443 return provider().supportWriteOps(); 444 } 445 446 /** 447 * Write bytes from a byte array. 448 * @throws UnsupportedOperationException if not supported 449 */ 450 abstract int write(int fdVal, byte[] b, int off, int len, 451 BooleanSupplier isOpen) throws IOException; 452 } 453 454 /** 455 * SYSTEM_THREADS poller group. The read and write pollers are system-wide platform threads. 456 */ 457 private static class SystemThreadsPollerGroup extends PollerGroup { 458 // system-wide read and write pollers 459 private final Poller[] readPollers; 460 private final Poller[] writePollers; 461 462 SystemThreadsPollerGroup(PollerProvider provider, 463 int readPollerCount, 464 int writePollerCount) throws IOException { 465 super(provider); 466 Poller[] readPollers = new Poller[readPollerCount]; 467 Poller[] writePollers = new Poller[writePollerCount]; 468 try { 469 for (int i = 0; i < readPollerCount; i++) { 470 readPollers[i] = provider.readPoller(false); 471 } 472 for (int i = 0; i < writePollerCount; i++) { 473 writePollers[i] = provider.writePoller(false); 474 } 475 } catch (Throwable e) { 476 closeAll(readPollers); 477 closeAll(writePollers); 478 throw e; 479 } 480 481 this.readPollers = readPollers; 482 this.writePollers = writePollers; 483 } 484 485 @Override 486 void start() { 487 Arrays.stream(readPollers).forEach(p -> { 488 startPlatformThread("Read-Poller", p::pollerLoop); 489 }); 490 Arrays.stream(writePollers).forEach(p -> { 491 startPlatformThread("Write-Poller", p::pollerLoop); 492 }); 493 } 494 495 private Poller readPoller(int fdVal) { 496 int index = provider().fdValToIndex(fdVal, readPollers.length); 497 return readPollers[index]; 498 } 499 500 private Poller writePoller(int fdVal) { 501 int index = provider().fdValToIndex(fdVal, writePollers.length); 502 return writePollers[index]; 503 } 504 505 @Override 506 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException { 507 Poller poller = (event == Net.POLLIN) 508 ? readPoller(fdVal) 509 : writePoller(fdVal); 510 poller.poll(fdVal, nanos, isOpen); 511 } 512 513 @Override 514 int read(int fdVal, byte[] b, int off, int len, long nanos, 515 BooleanSupplier isOpen) throws IOException { 516 return readPoller(fdVal).implRead(fdVal, b, off, len, nanos, isOpen); 517 } 518 519 @Override 520 int write(int fdVal, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException { 521 return writePoller(fdVal).implWrite(fdVal, b, off, len, isOpen); 522 } 523 524 @Override 525 Poller masterPoller() { 526 return null; 527 } 528 529 @Override 530 List<Poller> readPollers() { 531 return List.of(readPollers); 532 } 533 534 @Override 535 List<Poller> writePollers() { 536 return List.of(writePollers); 537 } 538 } 539 540 /** 541 * VTHREAD_POLLERS poller group. The read and write pollers are virtual threads. 542 * When read and write pollers need to block then they register with a system-wide 543 * "master poller" that runs in a dedicated platform thread. 544 */ 545 private static class VThreadsPollerGroup extends PollerGroup { 546 private final Poller masterPoller; 547 private final Poller[] readPollers; 548 private final Poller[] writePollers; 549 550 // keep virtual thread pollers alive 551 private final Executor executor; 552 553 VThreadsPollerGroup(PollerProvider provider, 554 int readPollerCount, 555 int writePollerCount) throws IOException { 556 super(provider); 557 Poller masterPoller = provider.readPoller(false); 558 Poller[] readPollers = new Poller[readPollerCount]; 559 Poller[] writePollers = new Poller[writePollerCount]; 560 561 try { 562 for (int i = 0; i < readPollerCount; i++) { 563 readPollers[i] = provider.readPoller(true); 564 } 565 for (int i = 0; i < writePollerCount; i++) { 566 writePollers[i] = provider.writePoller(true); 567 } 568 } catch (Throwable e) { 569 masterPoller.close(); 570 closeAll(readPollers); 571 closeAll(writePollers); 572 throw e; 573 } 574 575 this.masterPoller = masterPoller; 576 this.readPollers = readPollers; 577 this.writePollers = writePollers; 578 579 ThreadFactory factory = Thread.ofVirtual() 580 .inheritInheritableThreadLocals(false) 581 .name("SubPoller-", 0) 582 .uncaughtExceptionHandler((_, e) -> e.printStackTrace()) 583 .factory(); 584 this.executor = Executors.newThreadPerTaskExecutor(factory); 585 } 586 587 @Override 588 void start() { 589 startPlatformThread("Master-Poller", masterPoller::pollerLoop); 590 Arrays.stream(readPollers).forEach(p -> { 591 executor.execute(() -> p.subPollerLoop(masterPoller)); 592 }); 593 Arrays.stream(writePollers).forEach(p -> { 594 executor.execute(() -> p.subPollerLoop(masterPoller)); 595 }); 596 } 597 598 private Poller readPoller(int fdVal) { 599 int index = provider().fdValToIndex(fdVal, readPollers.length); 600 return readPollers[index]; 601 } 602 603 private Poller writePoller(int fdVal) { 604 int index = provider().fdValToIndex(fdVal, writePollers.length); 605 return writePollers[index]; 606 } 607 608 @Override 609 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException { 610 Poller poller = (event == Net.POLLIN) 611 ? readPoller(fdVal) 612 : writePoller(fdVal); 613 poller.poll(fdVal, nanos, isOpen); 614 } 615 616 @Override 617 int read(int fdVal, byte[] b, int off, int len, long nanos, 618 BooleanSupplier isOpen) throws IOException { 619 return readPoller(fdVal).implRead(fdVal, b, off, len, nanos, isOpen); 620 } 621 622 @Override 623 int write(int fdVal, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException { 624 return writePoller(fdVal).implWrite(fdVal, b, off, len, isOpen); 625 } 626 627 @Override 628 void pollSelector(int fdVal, long nanos) throws IOException { 629 masterPoller.poll(fdVal, nanos, () -> true); 630 } 631 632 @Override 633 Poller masterPoller() { 634 return masterPoller; 635 } 636 637 @Override 638 List<Poller> readPollers() { 639 return List.of(readPollers); 640 } 641 642 @Override 643 List<Poller> writePollers() { 644 return List.of(writePollers); 645 } 646 } 647 648 /** 649 * POLLER_PER_CARRIER poller group. The read poller is a per-carrier virtual thread. 650 * When a virtual thread polls a file descriptor for POLLIN, then it will use (almost 651 * always, not guaranteed) the read poller for its carrier. When a read poller needs 652 * to block then it registers with a system-wide "master poller" that runs in a 653 * dedicated platform thread. The read poller terminates if the carrier terminates. 654 * The write pollers are system-wide platform threads (usually one). 655 */ 656 private static class PollerPerCarrierPollerGroup extends PollerGroup { 657 private record CarrierPoller(PollerPerCarrierPollerGroup group, Poller readPoller) { } 658 private static final TerminatingThreadLocal<CarrierPoller> CARRIER_POLLER = 659 new TerminatingThreadLocal<>() { 660 @Override 661 protected void threadTerminated(CarrierPoller carrierPoller) { 662 Poller readPoller = carrierPoller.readPoller(); 663 carrierPoller.group().carrierTerminated(readPoller); 664 } 665 }; 666 667 private final Poller masterPoller; 668 private final Set<Poller> readPollers; 669 private final Poller[] writePollers; 670 671 /** 672 * Create a PollerPerCarrierPollerGroup with the given number of write pollers. 673 */ 674 PollerPerCarrierPollerGroup(PollerProvider provider, 675 int writePollerCount) throws IOException { 676 super(provider); 677 Poller masterPoller = provider.readPoller(false); 678 Poller[] writePollers = new Poller[writePollerCount]; 679 try { 680 for (int i = 0; i < writePollerCount; i++) { 681 writePollers[i] = provider.writePoller(false); 682 } 683 } catch (Throwable e) { 684 masterPoller.close(); 685 closeAll(writePollers); 686 throw e; 687 } 688 this.masterPoller = masterPoller; 689 this.readPollers = ConcurrentHashMap.newKeySet();; 690 this.writePollers = writePollers; 691 } 692 693 @Override 694 void start() { 695 startPlatformThread("Master-Poller", masterPoller::pollerLoop); 696 Arrays.stream(writePollers).forEach(p -> { 697 startPlatformThread("Write-Poller", p::pollerLoop); 698 }); 699 } 700 701 private Poller writePoller(int fdVal) { 702 int index = provider().fdValToIndex(fdVal, writePollers.length); 703 return writePollers[index]; 704 } 705 706 /** 707 * Starts a read sub-poller in a virtual thread. 708 */ 709 private Poller startReadPoller() throws IOException { 710 assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported(); 711 712 // create read sub-poller 713 Poller readPoller = provider().readPoller(true); 714 readPollers.add(readPoller); 715 716 // start virtual thread to execute sub-polling loop 717 Thread carrier = JLA.currentCarrierThread(); 718 var scheduler = JLA.virtualThreadScheduler(Thread.currentThread()); 719 @SuppressWarnings("restricted") 720 var _ = Thread.ofVirtual() 721 .scheduler(scheduler) 722 .inheritInheritableThreadLocals(false) 723 .name(carrier.getName() + "-Read-Poller") 724 .uncaughtExceptionHandler((_, e) -> e.printStackTrace()) 725 .start(() -> subPollerLoop(readPoller)); 726 return readPoller; 727 } 728 729 /** 730 * Returns the read poller for the current carrier, starting it if required. 731 */ 732 private Poller readPoller() throws IOException { 733 assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported(); 734 Continuation.pin(); 735 try { 736 CarrierPoller carrierPoller = CARRIER_POLLER.get(); 737 if (carrierPoller != null) { 738 return carrierPoller.readPoller(); 739 } else { 740 // first poll on this carrier will start poller 741 Poller readPoller = startReadPoller(); 742 CARRIER_POLLER.set(new CarrierPoller(this, readPoller)); 743 return readPoller; 744 } 745 } finally { 746 Continuation.unpin(); 747 } 748 } 749 750 @Override 751 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException { 752 // for POLLIN, get the read poller for this carrier 753 if (event == Net.POLLIN 754 && Thread.currentThread().isVirtual() 755 && ContinuationSupport.isSupported()) { 756 readPoller().poll(fdVal, nanos, isOpen); 757 return; 758 } 759 760 // -XX:-VMContinuations or POLLIN from platform thread does master poller 761 if (event == Net.POLLIN) { 762 masterPoller.poll(fdVal, nanos, isOpen); 763 } else { 764 writePoller(fdVal).poll(fdVal, nanos, isOpen); 765 } 766 } 767 768 @Override 769 void pollSelector(int fdVal, long nanos) throws IOException { 770 masterPoller.poll(fdVal, nanos, () -> true); 771 } 772 773 @Override 774 int read(int fdVal, byte[] b, int off, int len, long nanos, 775 BooleanSupplier isOpen) throws IOException { 776 return readPoller().implRead(fdVal, b, off, len, nanos, isOpen); 777 } 778 779 @Override 780 int write(int fdVal, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException { 781 return writePoller(fdVal).implWrite(fdVal, b, off, len, isOpen); 782 } 783 784 /** 785 * Sub-poller polling loop. 786 */ 787 private void subPollerLoop(Poller readPoller) { 788 try { 789 readPoller.subPollerLoop(masterPoller); 790 } finally { 791 // wakeup all threads waiting on file descriptors registered with the 792 // read poller, these I/O operation will migrate to another carrier. 793 readPoller.wakeupAll(); 794 795 // remove from serviceability view 796 readPollers.remove(readPoller); 797 } 798 } 799 800 /** 801 * Invoked by the carrier thread before it terminates. 802 */ 803 private void carrierTerminated(Poller readPoller) { 804 readPoller.setShutdown(); 805 try { 806 readPoller.wakeupPoller(); 807 } catch (Throwable e) { 808 e.printStackTrace(); 809 } 810 } 811 812 @Override 813 Poller masterPoller() { 814 return masterPoller; 815 } 816 817 @Override 818 List<Poller> readPollers() { 819 return readPollers.stream().toList(); 820 } 821 822 @Override 823 List<Poller> writePollers() { 824 return List.of(writePollers); 825 } 826 } 827 828 /** 829 * Reads the given property name to get the poller count. If the property is 830 * set then the value must be a power of 2. Returns 1 if the property is not 831 * set. 832 * @throws IllegalArgumentException if the property is set to a value that 833 * is not a power of 2. 834 */ 835 private static int pollerCount(String propName, int defaultCount) { 836 String s = System.getProperty(propName); 837 int count = (s != null) ? Integer.parseInt(s) : defaultCount; 838 839 // check power of 2 840 if (count != Integer.highestOneBit(count)) { 841 String msg = propName + " is set to a value that is not a power of 2"; 842 throw new IllegalArgumentException(msg); 843 } 844 return count; 845 } 846 847 848 /** 849 * Returns true if read ops are supported in addition to POLLIN polling. 850 */ 851 public static boolean supportReadOps() { 852 return POLLER_GROUP.supportReadOps(); 853 } 854 855 /** 856 * Returns true if write ops are supported in addition to POLLOUT polling. 857 */ 858 public static boolean supportWriteOps() { 859 return POLLER_GROUP.supportWriteOps(); 860 } 861 862 /** 863 * Parks the current thread until bytes are read into a byte array. 864 * @param isOpen supplies a boolean to indicate if the enclosing object is open 865 * @return the number of bytes read (>0), EOF (-1), or UNAVAILABLE (-2) if unparked 866 * or the timeout expires while waiting for bytes to be read 867 * @throws UnsupportedOperationException if not supported 868 */ 869 public static int read(int fdVal, byte[] b, int off, int len, long nanos, 870 BooleanSupplier isOpen) throws IOException { 871 return POLLER_GROUP.read(fdVal, b, off, len, nanos, isOpen); 872 } 873 874 /** 875 * Parks the current thread until bytes are written from a byte array. 876 * @param isOpen supplies a boolean to indicate if the enclosing object is open 877 * @return the number of bytes read (>0), EOF (-1), or UNAVAILABLE (-2) if unparked 878 * or the timeout expires while waiting for bytes to be read 879 * @throws UnsupportedOperationException if not supported 880 */ 881 public static int write(int fdVal, byte[] b, int off, int len, 882 BooleanSupplier isOpen) throws IOException { 883 return POLLER_GROUP.write(fdVal, b, off, len, isOpen); 884 } 885 886 /** 887 * Parks the current thread until bytes are read a byte array. This method is 888 * overridden by poller implementations that support this operation. 889 */ 890 int implRead(int fdVal, byte[] b, int off, int len, long nanos, 891 BooleanSupplier isOpen) throws IOException { 892 throw new UnsupportedOperationException(); 893 } 894 895 /** 896 * Parks the current thread until bytes are written from a byte array. This 897 * method is overridden by poller implementations that support this operation. 898 */ 899 int implWrite(int fdVal, byte[] b, int off, int len, 900 BooleanSupplier isOpen) throws IOException { 901 throw new UnsupportedOperationException(); 902 } 903 904 /** 905 * Return the master poller or null if there is no master poller. 906 */ 907 public static Poller masterPoller() { 908 return POLLER_GROUP.masterPoller(); 909 } 910 911 /** 912 * Return the list of read pollers. 913 */ 914 public static List<Poller> readPollers() { 915 return POLLER_GROUP.readPollers(); 916 } 917 918 /** 919 * Return the list of write pollers. 920 */ 921 public static List<Poller> writePollers() { 922 return POLLER_GROUP.writePollers(); 923 } 924 }