< prev index next >

src/java.base/share/classes/sun/nio/ch/Poller.java

Print this page
*** 23,38 ***
   * questions.
   */
  package sun.nio.ch;
  
  import java.io.IOException;
  import java.util.Arrays;
  import java.util.List;
  import java.util.Map;
  import java.util.Objects;
  import java.util.concurrent.ConcurrentHashMap;
  import java.util.concurrent.Executor;
  import java.util.concurrent.Executors;
  import java.util.concurrent.ThreadFactory;
  import java.util.concurrent.locks.LockSupport;
  import java.util.function.BooleanSupplier;
  import jdk.internal.misc.InnocuousThread;
  import jdk.internal.vm.annotation.Stable;
  
  /**
   * Polls file descriptors. Virtual threads invoke the poll method to park
   * until a given file descriptor is ready for I/O.
   */
  public abstract class Poller {
!     private static final Pollers POLLERS;
!     static {
!         try {
!             var pollers = new Pollers();
!             pollers.start();
!             POLLERS = pollers;
!         } catch (IOException ioe) {
!             throw new ExceptionInInitializerError(ioe);
!         }
!     }
  
      // the poller or sub-poller thread
      private @Stable Thread owner;
  
      // maps file descriptors to parked Thread
--- 23,45 ---
   * questions.
   */
  package sun.nio.ch;
  
  import java.io.IOException;
+ import java.io.UncheckedIOException;
  import java.util.Arrays;
  import java.util.List;
  import java.util.Map;
  import java.util.Objects;
  import java.util.concurrent.ConcurrentHashMap;
  import java.util.concurrent.Executor;
  import java.util.concurrent.Executors;
  import java.util.concurrent.ThreadFactory;
  import java.util.concurrent.locks.LockSupport;
  import java.util.function.BooleanSupplier;
+ import java.util.function.Supplier;
+ import jdk.internal.access.JavaLangAccess;
+ import jdk.internal.access.SharedSecrets;
  import jdk.internal.misc.InnocuousThread;
  import jdk.internal.vm.annotation.Stable;
  
  /**
   * Polls file descriptors. Virtual threads invoke the poll method to park
   * until a given file descriptor is ready for I/O.
   */
  public abstract class Poller {
!     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
! 
!     private static final PollerProvider PROVIDER = PollerProvider.provider();
! 
!     private static final Mode POLLER_MODE = pollerMode();
! 
!     private static final Executor DEFAULT_SCHEDULER = JLA.defaultVirtualThreadScheduler();
! 
!     // poller group for default scheduler
!     private static final Supplier<PollerGroup> DEFAULT_POLLER_GROUP = StableValue.supplier(PollerGroup::create);
+ 
+     // maps scheduler to PollerGroup, custom schedulers can't be GC'ed at this time
+     private static final Map<Executor, PollerGroup> POLLER_GROUPS = new ConcurrentHashMap<>();
  
      // the poller or sub-poller thread
      private @Stable Thread owner;
  
      // maps file descriptors to parked Thread

*** 86,10 ***
--- 93,16 ---
       * Initialize a Poller.
       */
      protected Poller() {
      }
  
+     /**
+      * Closes the poller and release resources. This method can only be used to cleanup
+      * when creating a poller group fails.
+      */
+     abstract void close();
+ 
      /**
       * Returns the poller's file descriptor, used when the read and write poller threads
       * are virtual threads.
       *
       * @throws UnsupportedOperationException if not supported

*** 136,14 ***
       */
      static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier)
          throws IOException
      {
          assert nanos >= 0L;
          if (event == Net.POLLIN) {
!             POLLERS.readPoller(fdVal).poll(fdVal, nanos, supplier);
          } else if (event == Net.POLLOUT) {
!             POLLERS.writePoller(fdVal).poll(fdVal, nanos, supplier);
          } else {
              assert false;
          }
      }
  
--- 149,15 ---
       */
      static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier)
          throws IOException
      {
          assert nanos >= 0L;
+         PollerGroup pollerGroup = pollerGroup(Thread.currentThread());
          if (event == Net.POLLIN) {
!             pollerGroup.readPoller(fdVal).poll(fdVal, nanos, supplier);
          } else if (event == Net.POLLOUT) {
!             pollerGroup.writePoller(fdVal).poll(fdVal, nanos, supplier);
          } else {
              assert false;
          }
      }
  

*** 152,37 ***
       * @param fdVal the Selector's file descriptor
       * @param nanos the waiting time or 0 to wait indefinitely
       */
      static void pollSelector(int fdVal, long nanos) throws IOException {
          assert nanos >= 0L;
!         Poller poller = POLLERS.masterPoller();
          if (poller == null) {
!             poller = POLLERS.readPoller(fdVal);
          }
          poller.poll(fdVal, nanos, () -> true);
      }
  
      /**
!      * If there is a thread polling the given file descriptor for the given event then
-      * the thread is unparked.
-      */
-     static void stopPoll(int fdVal, int event) {
-         if (event == Net.POLLIN) {
-             POLLERS.readPoller(fdVal).wakeup(fdVal);
-         } else if (event == Net.POLLOUT) {
-             POLLERS.writePoller(fdVal).wakeup(fdVal);
-         } else {
-             throw new IllegalArgumentException();
-         }
-     }
- 
-     /**
-      * If there are any threads polling the given file descriptor then they are unparked.
       */
!     static void stopPoll(int fdVal) {
!         stopPoll(fdVal, Net.POLLIN);
-         stopPoll(fdVal, Net.POLLOUT);
      }
  
      /**
       * Parks the current thread until a file descriptor is ready.
       */
--- 166,23 ---
       * @param fdVal the Selector's file descriptor
       * @param nanos the waiting time or 0 to wait indefinitely
       */
      static void pollSelector(int fdVal, long nanos) throws IOException {
          assert nanos >= 0L;
!         PollerGroup pollerGroup = pollerGroup(Thread.currentThread());
+         Poller poller = pollerGroup.masterPoller();
          if (poller == null) {
!             poller = pollerGroup.readPoller(fdVal);
          }
          poller.poll(fdVal, nanos, () -> true);
      }
  
      /**
!      * Unpark the given thread so that it stops polling.
       */
!     static void stopPoll(Thread thread) {
!         LockSupport.unpark(thread);
      }
  
      /**
       * Parks the current thread until a file descriptor is ready.
       */

*** 259,16 ***
       * The sub-poller registers its file descriptor with the master poller to park until
       * there are events to poll. When unparked, it does non-blocking polls and parks
       * again when there are no more events. The sub-poller yields after each poll to help
       * with fairness and to avoid re-registering with the master poller where possible.
       */
!     private void subPollerLoop(Poller masterPoller) {
          assert Thread.currentThread().isVirtual();
          owner = Thread.currentThread();
          try {
              int polled = 0;
!             for (;;) {
                  if (polled == 0) {
                      masterPoller.poll(fdVal(), 0, () -> true);  // park
                  } else {
                      Thread.yield();
                  }
--- 259,16 ---
       * The sub-poller registers its file descriptor with the master poller to park until
       * there are events to poll. When unparked, it does non-blocking polls and parks
       * again when there are no more events. The sub-poller yields after each poll to help
       * with fairness and to avoid re-registering with the master poller where possible.
       */
!     private void subPollerLoop(PollerGroup pollerGroup, Poller masterPoller) {
          assert Thread.currentThread().isVirtual();
          owner = Thread.currentThread();
          try {
              int polled = 0;
!             while (!pollerGroup.isShutdown()) {
                  if (polled == 0) {
                      masterPoller.poll(fdVal(), 0, () -> true);  // park
                  } else {
                      Thread.yield();
                  }

*** 277,146 ***
          } catch (Exception e) {
              e.printStackTrace();
          }
      }
  
-     /**
-      * Returns the number I/O operations currently registered with this poller.
-      */
-     public int registered() {
-         return map.size();
-     }
- 
      @Override
      public String toString() {
          return String.format("%s [registered = %d, owner = %s]",
!                 Objects.toIdentityString(this), registered(), owner);
      }
  
      /**
!      * The Pollers used for read and write events.
       */
!     private static class Pollers {
!         private final PollerProvider provider;
-         private final Poller.Mode pollerMode;
-         private final Poller masterPoller;
          private final Poller[] readPollers;
          private final Poller[] writePollers;
! 
!         // used by start method to executor is kept alive
!         private Executor executor;
! 
!         /**
!          * Creates the Poller instances based on configuration.
!          */
!         Pollers() throws IOException {
!             PollerProvider provider = PollerProvider.provider();
!             Poller.Mode mode;
!             String s = System.getProperty("jdk.pollerMode");
!             if (s != null) {
!                 if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) {
!                     mode = Mode.SYSTEM_THREADS;
-                 } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) {
-                     mode = Mode.VTHREAD_POLLERS;
                  } else {
!                     throw new RuntimeException("Can't parse '" + s + "' as polling mode");
                  }
!             } else {
!                 mode = provider.defaultPollerMode();
              }
  
!             // vthread poller mode needs a master poller
-             Poller masterPoller = (mode == Mode.VTHREAD_POLLERS)
-                     ? provider.readPoller(false)
-                     : null;
- 
-             // read pollers (or sub-pollers)
-             int readPollerCount = pollerCount("jdk.readPollers", provider.defaultReadPollers(mode));
              Poller[] readPollers = new Poller[readPollerCount];
-             for (int i = 0; i < readPollerCount; i++) {
-                 readPollers[i] = provider.readPoller(mode == Mode.VTHREAD_POLLERS);
-             }
- 
-             // write pollers (or sub-pollers)
-             int writePollerCount = pollerCount("jdk.writePollers", provider.defaultWritePollers(mode));
              Poller[] writePollers = new Poller[writePollerCount];
!             for (int i = 0; i < writePollerCount; i++) {
!                 writePollers[i] = provider.writePoller(mode == Mode.VTHREAD_POLLERS);
              }
  
!             this.provider = provider;
-             this.pollerMode = mode;
              this.masterPoller = masterPoller;
              this.readPollers = readPollers;
              this.writePollers = writePollers;
          }
  
          /**
!          * Starts the Poller threads.
           */
!         void start() {
!             if (pollerMode == Mode.VTHREAD_POLLERS) {
!                 startPlatformThread("MasterPoller", masterPoller::pollerLoop);
!                 ThreadFactory factory = Thread.ofVirtual()
!                         .inheritInheritableThreadLocals(false)
!                         .name("SubPoller-", 0)
!                         .uncaughtExceptionHandler((t, e) -> e.printStackTrace())
!                         .factory();
!                 executor = Executors.newThreadPerTaskExecutor(factory);
                  Arrays.stream(readPollers).forEach(p -> {
!                     executor.execute(() -> p.subPollerLoop(masterPoller));
                  });
                  Arrays.stream(writePollers).forEach(p -> {
!                     executor.execute(() -> p.subPollerLoop(masterPoller));
                  });
              } else {
                  Arrays.stream(readPollers).forEach(p -> {
                      startPlatformThread("Read-Poller", p::pollerLoop);
                  });
                  Arrays.stream(writePollers).forEach(p -> {
                      startPlatformThread("Write-Poller", p::pollerLoop);
                  });
              }
          }
  
          /**
!          * Returns the master poller, or null if there is no master poller.
           */
!         Poller masterPoller() {
!             return masterPoller;
          }
  
          /**
!          * Returns the read poller for the given file descriptor.
           */
!         Poller readPoller(int fdVal) {
!             int index = provider.fdValToIndex(fdVal, readPollers.length);
!             return readPollers[index];
          }
  
!         /**
!          * Returns the write poller for the given file descriptor.
!          */
!         Poller writePoller(int fdVal) {
!             int index = provider.fdValToIndex(fdVal, writePollers.length);
!             return writePollers[index];
          }
  
          /**
!          * Return the list of read pollers.
           */
          List<Poller> readPollers() {
              return List.of(readPollers);
          }
  
-         /**
-          * Return the list of write pollers.
-          */
          List<Poller> writePollers() {
              return List.of(writePollers);
          }
  
  
          /**
           * Reads the given property name to get the poller count. If the property is
           * set then the value must be a power of 2. Returns 1 if the property is not
           * set.
--- 277,214 ---
          } catch (Exception e) {
              e.printStackTrace();
          }
      }
  
      @Override
      public String toString() {
          return String.format("%s [registered = %d, owner = %s]",
!             Objects.toIdentityString(this), map.size(), owner);
      }
  
      /**
!      * The read/write pollers for a virtual thread scheduler.
       */
!     private static class PollerGroup {
!         private final Executor scheduler;
          private final Poller[] readPollers;
          private final Poller[] writePollers;
!         private final Poller masterPoller;
!         private final Executor executor;
!         private volatile boolean shutdown;
! 
!         PollerGroup(Executor scheduler,
!                     Poller masterPoller,
!                     int readPollerCount,
!                     int writePollerCount) throws IOException {
!             boolean subPoller = (POLLER_MODE == Mode.VTHREAD_POLLERS);
!             Executor executor = null;
!             if (subPoller) {
!                 String namePrefix;
!                 if (scheduler == DEFAULT_SCHEDULER) {
!                     namePrefix = "SubPoller-";
                  } else {
!                     namePrefix = Objects.toIdentityString(scheduler) + "-SubPoller-";
                  }
!                 @SuppressWarnings("restricted")
!                 ThreadFactory factory = Thread.ofVirtual()
+                         .scheduler(scheduler)
+                         .inheritInheritableThreadLocals(false)
+                         .name(namePrefix, 0)
+                         .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
+                         .factory();
+                 executor = Executors.newThreadPerTaskExecutor(factory);
              }
  
!             // read and write pollers (or sub-pollers)
              Poller[] readPollers = new Poller[readPollerCount];
              Poller[] writePollers = new Poller[writePollerCount];
!             try {
!                 for (int i = 0; i < readPollerCount; i++) {
+                     readPollers[i] = PROVIDER.readPoller(subPoller);
+                 }
+                 for (int i = 0; i < writePollerCount; i++) {
+                     writePollers[i] = PROVIDER.writePoller(subPoller);
+                 }
+             } catch (Exception e) {
+                 closeAll(readPollers);
+                 closeAll(writePollers);
+                 throw e;
              }
  
!             this.scheduler = scheduler;
              this.masterPoller = masterPoller;
              this.readPollers = readPollers;
              this.writePollers = writePollers;
+             this.executor = executor;
          }
  
          /**
!          * Create and starts the poller group for the default scheduler.
           */
!         static PollerGroup create() {
!             try {
!                 Poller masterPoller = (POLLER_MODE == Mode.VTHREAD_POLLERS)
!                         ? PROVIDER.readPoller(false)
!                         : null;
!                 PollerGroup pollerGroup;
!                 try {
!                     int rc = pollerCount("jdk.readPollers", PROVIDER.defaultReadPollers(POLLER_MODE));
!                     int wc = pollerCount("jdk.writePollers", PROVIDER.defaultWritePollers(POLLER_MODE));
+                     pollerGroup = new PollerGroup(DEFAULT_SCHEDULER, masterPoller, rc, wc);
+                 } catch (Exception e) {
+                     masterPoller.close();
+                     throw e;
+                 }
+                 pollerGroup.start();
+                 return pollerGroup;
+             } catch (IOException ioe) {
+                 throw new UncheckedIOException(ioe);
+             }
+         }
+ 
+         /**
+          * Create and starts the poller group for a custom scheduler.
+          */
+         static PollerGroup create(Executor scheduler) {
+             try {
+                 Poller masterPoller = DEFAULT_POLLER_GROUP.get().masterPoller();
+                 var pollerGroup = new PollerGroup(scheduler, masterPoller, 1, 1);
+                 pollerGroup.start();
+                 return pollerGroup;
+             } catch (IOException ioe) {
+                 throw new UncheckedIOException(ioe);
+             }
+         }
+ 
+         /**
+          * Start poller threads.
+          */
+         private void start() {
+             if (POLLER_MODE == Mode.VTHREAD_POLLERS) {
+                 if (scheduler == DEFAULT_SCHEDULER) {
+                     startPlatformThread("Master-Poller", masterPoller::pollerLoop);
+                 }
                  Arrays.stream(readPollers).forEach(p -> {
!                     executor.execute(() -> p.subPollerLoop(this, masterPoller));
                  });
                  Arrays.stream(writePollers).forEach(p -> {
!                     executor.execute(() -> p.subPollerLoop(this, masterPoller));
                  });
              } else {
+                 // Mode.SYSTEM_THREADS
                  Arrays.stream(readPollers).forEach(p -> {
                      startPlatformThread("Read-Poller", p::pollerLoop);
                  });
                  Arrays.stream(writePollers).forEach(p -> {
                      startPlatformThread("Write-Poller", p::pollerLoop);
                  });
              }
          }
  
          /**
!          * Close the given pollers.
           */
!         private void closeAll(Poller... pollers) {
!             for (Poller poller : pollers) {
+                 if (poller != null) {
+                     poller.close();
+                 }
+             }
          }
  
          /**
!          * Invoked during shutdown to unpark all subpoller threads and wait for
+          * them to terminate.
           */
!         private void shutdownPollers(Poller... pollers) {
!             boolean interrupted = false;
!             for (Poller poller : pollers) {
+                 if (poller.owner instanceof Thread owner) {
+                     LockSupport.unpark(owner);
+                     while (owner.isAlive()) {
+                         try {
+                             owner.join();
+                         } catch (InterruptedException e) {
+                             interrupted = true;
+                         }
+                     }
+                 }
+             }
+             if (interrupted) {
+                 Thread.currentThread().interrupt();
+             }
          }
  
!         void shutdown() {
!             if (scheduler == DEFAULT_SCHEDULER || POLLER_MODE == Mode.SYSTEM_THREADS) {
!                 throw new UnsupportedOperationException();
!             }
!             shutdown = true;
!             shutdownPollers(readPollers);
+             shutdownPollers(writePollers);
          }
  
          /**
!          *
+          * @return
           */
+         boolean isShutdown() {
+             return shutdown;
+         }
+ 
+         Poller masterPoller() {
+             return masterPoller;
+         }
+ 
          List<Poller> readPollers() {
              return List.of(readPollers);
          }
  
          List<Poller> writePollers() {
              return List.of(writePollers);
          }
  
+         /**
+          * Returns the read poller for the given file descriptor.
+          */
+         Poller readPoller(int fdVal) {
+             int index = PROVIDER.fdValToIndex(fdVal, readPollers.length);
+             return readPollers[index];
+         }
+ 
+         /**
+          * Returns the write poller for the given file descriptor.
+          */
+         Poller writePoller(int fdVal) {
+             int index = PROVIDER.fdValToIndex(fdVal, writePollers.length);
+             return writePollers[index];
+         }
  
          /**
           * Reads the given property name to get the poller count. If the property is
           * set then the value must be a power of 2. Returns 1 if the property is not
           * set.

*** 448,26 ***
                  throw new InternalError(e);
              }
          }
      }
  
      /**
       * Return the master poller or null if there is no master poller.
       */
      public static Poller masterPoller() {
!         return POLLERS.masterPoller();
      }
  
      /**
       * Return the list of read pollers.
       */
      public static List<Poller> readPollers() {
!         return POLLERS.readPollers();
      }
  
      /**
       * Return the list of write pollers.
       */
      public static List<Poller> writePollers() {
!         return POLLERS.writePollers();
      }
  }
--- 516,75 ---
                  throw new InternalError(e);
              }
          }
      }
  
+     /**
+      * Returns the poller mode.
+      */
+     private static Mode pollerMode() {
+         String s = System.getProperty("jdk.pollerMode");
+         if (s != null) {
+             if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) {
+                 return Mode.SYSTEM_THREADS;
+             } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) {
+                 return Mode.VTHREAD_POLLERS;
+             } else {
+                 throw new RuntimeException("Can't parse '" + s + "' as polling mode");
+             }
+         } else {
+             return PROVIDER.defaultPollerMode();
+         }
+     }
+ 
+     /**
+      * Returns the PollerGroup that the given thread uses to poll file descriptors.
+      */
+     private static PollerGroup pollerGroup(Thread thread) {
+         if (POLLER_MODE == Mode.SYSTEM_THREADS) {
+             return DEFAULT_POLLER_GROUP.get();
+         }
+         Executor scheduler;
+         if (thread.isVirtual()) {
+             scheduler = JLA.virtualThreadScheduler(thread);
+         } else {
+             scheduler = DEFAULT_SCHEDULER;
+         }
+         return POLLER_GROUPS.computeIfAbsent(scheduler, _ -> PollerGroup.create(scheduler));
+     }
+ 
+     /**
+      * Invoked before the given scheduler is shutdown. In VTHREAD_POLLERS mode, this
+      * method will arrange for the sub poller threads to terminate. Does nothing in
+      * SYSTEM_THREADS mode.
+      */
+     public static void beforeShutdown(Executor executor) {
+         if (POLLER_MODE == Mode.VTHREAD_POLLERS) {
+             PollerGroup group = POLLER_GROUPS.remove(executor);
+             if (group != null) {
+                 group.shutdown();
+             }
+         }
+     }
+ 
      /**
       * Return the master poller or null if there is no master poller.
       */
      public static Poller masterPoller() {
!         return DEFAULT_POLLER_GROUP.get().masterPoller();
      }
  
      /**
       * Return the list of read pollers.
       */
      public static List<Poller> readPollers() {
!         return DEFAULT_POLLER_GROUP.get().readPollers();
      }
  
      /**
       * Return the list of write pollers.
       */
      public static List<Poller> writePollers() {
!         return DEFAULT_POLLER_GROUP.get().writePollers();
      }
+ 
  }
< prev index next >