< prev index next >

src/java.base/share/classes/sun/nio/ch/Poller.java

Print this page
*** 46,10 ***
--- 46,12 ---
  import jdk.internal.vm.ContinuationSupport;
  import jdk.internal.vm.annotation.Stable;
  
  /**
   * I/O poller to allow virtual threads park until a file descriptor is ready for I/O.
+  * Implementations also optionally support read/write operations where virtual threads
+  * park until bytes are read or written.
   */
  public abstract class Poller {
      private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
  
      // the poller group for the I/O pollers and poller threads

*** 167,10 ***
--- 169,16 ---
       */
      int fdVal() {
          throw new UnsupportedOperationException();
      }
  
+     /**
+      * Invoked if when this poller's file descriptor is polled by the master poller.
+      */
+     void pollerPolled() throws IOException {
+     }
+ 
      /**
       * Register the file descriptor with the I/O event management facility so that it is
       * polled when the file descriptor is ready for I/O. The registration is "one shot",
       * meaning it should be polled at most once.
       */

*** 317,10 ***
--- 325,11 ---
          try {
              int polled = 0;
              while (!isShutdown()) {
                  if (polled == 0) {
                      masterPoller.poll(fdVal(), 0, () -> true);  // park
+                     pollerPolled();
                  } else {
                      Thread.yield();
                  }
                  polled = poll(0);
              }

*** 408,10 ***
--- 417,40 ---
                          poller.close();
                      } catch (IOException _) { }
                  }
              }
          }
+ 
+         /**
+          * Returns true if the read pollers in this poller group support read ops in
+          * addition to POLLIN polling.
+          */
+         boolean supportReadOps() {
+             return provider().supportReadOps();
+         }
+ 
+         /**
+          * Reads bytes into a byte array.
+          * @throws UnsupportedOperationException if not supported
+          */
+         abstract int read(int fdVal, byte[] b, int off, int len, long nanos,
+                           BooleanSupplier isOpen) throws IOException;
+ 
+         /**
+          * Returns true if the write pollers in this poller group support write ops in
+          * addition to POLLOUT polling.
+          */
+         boolean supportWriteOps() {
+             return provider().supportWriteOps();
+         }
+ 
+         /**
+          * Write bytes from a byte array.
+          * @throws UnsupportedOperationException if not supported
+          */
+         abstract int write(int fdVal, byte[] b, int off, int len,
+                            BooleanSupplier isOpen) throws IOException;
      }
  
      /**
       * SYSTEM_THREADS poller group. The read and write pollers are system-wide platform threads.
       */

*** 469,10 ***
--- 508,21 ---
                      ? readPoller(fdVal)
                      : writePoller(fdVal);
              poller.poll(fdVal, nanos, isOpen);
          }
  
+         @Override
+         int read(int fdVal, byte[] b, int off, int len, long nanos,
+                  BooleanSupplier isOpen) throws IOException {
+             return readPoller(fdVal).implRead(fdVal, b, off, len, nanos, isOpen);
+         }
+ 
+         @Override
+         int write(int fdVal, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException {
+             return writePoller(fdVal).implWrite(fdVal, b, off, len, isOpen);
+         }
+ 
          @Override
          Poller masterPoller() {
              return null;
          }
  

*** 561,10 ***
--- 611,21 ---
                      ? readPoller(fdVal)
                      : writePoller(fdVal);
              poller.poll(fdVal, nanos, isOpen);
          }
  
+         @Override
+         int read(int fdVal, byte[] b, int off, int len, long nanos,
+                  BooleanSupplier isOpen) throws IOException {
+             return readPoller(fdVal).implRead(fdVal, b, off, len, nanos, isOpen);
+         }
+ 
+         @Override
+         int write(int fdVal, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException {
+             return writePoller(fdVal).implWrite(fdVal, b, off, len, isOpen);
+         }
+ 
          @Override
          void pollSelector(int fdVal, long nanos) throws IOException {
              masterPoller.poll(fdVal, nanos, () -> true);
          }
  

*** 652,15 ***
              Poller readPoller = provider().readPoller(true);
              readPollers.add(readPoller);
  
              // start virtual thread to execute sub-polling loop
              Thread carrier = JLA.currentCarrierThread();
!             Thread.ofVirtual()
                      .inheritInheritableThreadLocals(false)
                      .name(carrier.getName() + "-Read-Poller")
!                     .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
!                     .start(() -> subPollerLoop(readPoller));
              return readPoller;
          }
  
          /**
           * Returns the read poller for the current carrier, starting it if required.
--- 713,18 ---
              Poller readPoller = provider().readPoller(true);
              readPollers.add(readPoller);
  
              // start virtual thread to execute sub-polling loop
              Thread carrier = JLA.currentCarrierThread();
!             Thread.Builder.OfVirtual builder = Thread.ofVirtual()
                      .inheritInheritableThreadLocals(false)
                      .name(carrier.getName() + "-Read-Poller")
!                     .uncaughtExceptionHandler((_, e) -> e.printStackTrace());
!             Thread thread = JLA.defaultVirtualThreadScheduler()
+                     .newThread(builder, carrier, () -> subPollerLoop(readPoller))
+                     .thread();
+             thread.start();
              return readPoller;
          }
  
          /**
           * Returns the read poller for the current carrier, starting it if required.

*** 704,10 ***
--- 768,21 ---
          @Override
          void pollSelector(int fdVal, long nanos) throws IOException {
              masterPoller.poll(fdVal, nanos, () -> true);
          }
  
+         @Override
+         int read(int fdVal, byte[] b, int off, int len, long nanos,
+                  BooleanSupplier isOpen) throws IOException {
+             return readPoller().implRead(fdVal, b, off, len, nanos, isOpen);
+         }
+ 
+         @Override
+         int write(int fdVal, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException {
+             return writePoller(fdVal).implWrite(fdVal, b, off, len, isOpen);
+         }
+ 
          /**
           * Sub-poller polling loop.
           */
          private void subPollerLoop(Poller readPoller) {
              try {

*** 767,10 ***
--- 842,67 ---
              throw new IllegalArgumentException(msg);
          }
          return count;
      }
  
+ 
+     /**
+      * Returns true if read ops are supported in addition to POLLIN polling.
+      */
+     public static boolean supportReadOps() {
+         return POLLER_GROUP.supportReadOps();
+     }
+ 
+     /**
+      * Returns true if write ops are supported in addition to POLLOUT polling.
+      */
+     public static boolean supportWriteOps() {
+         return POLLER_GROUP.supportWriteOps();
+     }
+ 
+     /**
+      * Parks the current thread until bytes are read into a byte array.
+      * @param isOpen supplies a boolean to indicate if the enclosing object is open
+      * @return the number of bytes read (>0), EOF (-1), or UNAVAILABLE (-2) if unparked
+      * or the timeout expires while waiting for bytes to be read
+      * @throws UnsupportedOperationException if not supported
+      */
+     public static int read(int fdVal, byte[] b, int off, int len, long nanos,
+                            BooleanSupplier isOpen) throws IOException {
+         return POLLER_GROUP.read(fdVal, b, off, len, nanos, isOpen);
+     }
+ 
+     /**
+      * Parks the current thread until bytes are written from a byte array.
+      * @param isOpen supplies a boolean to indicate if the enclosing object is open
+      * @return the number of bytes read (>0), EOF (-1), or UNAVAILABLE (-2) if unparked
+      * or the timeout expires while waiting for bytes to be read
+      * @throws UnsupportedOperationException if not supported
+      */
+     public static int write(int fdVal, byte[] b, int off, int len,
+                             BooleanSupplier isOpen) throws IOException {
+         return POLLER_GROUP.write(fdVal, b, off, len, isOpen);
+     }
+ 
+     /**
+      * Parks the current thread until bytes are read a byte array. This method is
+      * overridden by poller implementations that support this operation.
+      */
+     int implRead(int fdVal, byte[] b, int off, int len, long nanos,
+                  BooleanSupplier isOpen) throws IOException {
+         throw new UnsupportedOperationException();
+     }
+ 
+     /**
+      * Parks the current thread until bytes are written from a byte array. This
+      * method is overridden by poller implementations that support this operation.
+      */
+     int implWrite(int fdVal, byte[] b, int off, int len,
+                  BooleanSupplier isOpen) throws IOException {
+         throw new UnsupportedOperationException();
+     }
+ 
      /**
       * Return the master poller or null if there is no master poller.
       */
      public static Poller masterPoller() {
          return POLLER_GROUP.masterPoller();
< prev index next >