< prev index next > src/java.base/share/classes/sun/nio/ch/Poller.java
Print this page
import jdk.internal.vm.ContinuationSupport;
import jdk.internal.vm.annotation.Stable;
/**
* I/O poller to allow virtual threads park until a file descriptor is ready for I/O.
+ * Implementations also optionally support read/write operations where virtual threads
+ * park until bytes are read or written.
*/
public abstract class Poller {
private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
// the poller group for the I/O pollers and poller threads
*/
int fdVal() {
throw new UnsupportedOperationException();
}
+ /**
+ * Invoked if when this poller's file descriptor is polled by the master poller.
+ */
+ void pollerPolled() throws IOException {
+ }
+
/**
* Register the file descriptor with the I/O event management facility so that it is
* polled when the file descriptor is ready for I/O. The registration is "one shot",
* meaning it should be polled at most once.
*/
try {
int polled = 0;
while (!isShutdown()) {
if (polled == 0) {
masterPoller.poll(fdVal(), 0, () -> true); // park
+ pollerPolled();
} else {
Thread.yield();
}
polled = poll(0);
}
poller.close();
} catch (IOException _) { }
}
}
}
+
+ /**
+ * Returns true if the read pollers in this poller group support read ops in
+ * addition to POLLIN polling.
+ */
+ boolean supportReadOps() {
+ return provider().supportReadOps();
+ }
+
+ /**
+ * Reads bytes into a byte array.
+ * @throws UnsupportedOperationException if not supported
+ */
+ abstract int read(int fdVal, byte[] b, int off, int len, long nanos,
+ BooleanSupplier isOpen) throws IOException;
+
+ /**
+ * Returns true if the write pollers in this poller group support write ops in
+ * addition to POLLOUT polling.
+ */
+ boolean supportWriteOps() {
+ return provider().supportWriteOps();
+ }
+
+ /**
+ * Write bytes from a byte array.
+ * @throws UnsupportedOperationException if not supported
+ */
+ abstract int write(int fdVal, byte[] b, int off, int len,
+ BooleanSupplier isOpen) throws IOException;
}
/**
* SYSTEM_THREADS poller group. The read and write pollers are system-wide platform threads.
*/
? readPoller(fdVal)
: writePoller(fdVal);
poller.poll(fdVal, nanos, isOpen);
}
+ @Override
+ int read(int fdVal, byte[] b, int off, int len, long nanos,
+ BooleanSupplier isOpen) throws IOException {
+ return readPoller(fdVal).implRead(fdVal, b, off, len, nanos, isOpen);
+ }
+
+ @Override
+ int write(int fdVal, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException {
+ return writePoller(fdVal).implWrite(fdVal, b, off, len, isOpen);
+ }
+
@Override
Poller masterPoller() {
return null;
}
? readPoller(fdVal)
: writePoller(fdVal);
poller.poll(fdVal, nanos, isOpen);
}
+ @Override
+ int read(int fdVal, byte[] b, int off, int len, long nanos,
+ BooleanSupplier isOpen) throws IOException {
+ return readPoller(fdVal).implRead(fdVal, b, off, len, nanos, isOpen);
+ }
+
+ @Override
+ int write(int fdVal, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException {
+ return writePoller(fdVal).implWrite(fdVal, b, off, len, isOpen);
+ }
+
@Override
void pollSelector(int fdVal, long nanos) throws IOException {
masterPoller.poll(fdVal, nanos, () -> true);
}
Poller readPoller = provider().readPoller(true);
readPollers.add(readPoller);
// start virtual thread to execute sub-polling loop
Thread carrier = JLA.currentCarrierThread();
! Thread.ofVirtual()
.inheritInheritableThreadLocals(false)
.name(carrier.getName() + "-Read-Poller")
! .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
! .start(() -> subPollerLoop(readPoller));
return readPoller;
}
/**
* Returns the read poller for the current carrier, starting it if required.
Poller readPoller = provider().readPoller(true);
readPollers.add(readPoller);
// start virtual thread to execute sub-polling loop
Thread carrier = JLA.currentCarrierThread();
! Thread.Builder.OfVirtual builder = Thread.ofVirtual()
.inheritInheritableThreadLocals(false)
.name(carrier.getName() + "-Read-Poller")
! .uncaughtExceptionHandler((_, e) -> e.printStackTrace());
! Thread thread = JLA.defaultVirtualThreadScheduler()
+ .newThread(builder, carrier, () -> subPollerLoop(readPoller))
+ .thread();
+ thread.start();
return readPoller;
}
/**
* Returns the read poller for the current carrier, starting it if required.
@Override
void pollSelector(int fdVal, long nanos) throws IOException {
masterPoller.poll(fdVal, nanos, () -> true);
}
+ @Override
+ int read(int fdVal, byte[] b, int off, int len, long nanos,
+ BooleanSupplier isOpen) throws IOException {
+ return readPoller().implRead(fdVal, b, off, len, nanos, isOpen);
+ }
+
+ @Override
+ int write(int fdVal, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException {
+ return writePoller(fdVal).implWrite(fdVal, b, off, len, isOpen);
+ }
+
/**
* Sub-poller polling loop.
*/
private void subPollerLoop(Poller readPoller) {
try {
throw new IllegalArgumentException(msg);
}
return count;
}
+
+ /**
+ * Returns true if read ops are supported in addition to POLLIN polling.
+ */
+ public static boolean supportReadOps() {
+ return POLLER_GROUP.supportReadOps();
+ }
+
+ /**
+ * Returns true if write ops are supported in addition to POLLOUT polling.
+ */
+ public static boolean supportWriteOps() {
+ return POLLER_GROUP.supportWriteOps();
+ }
+
+ /**
+ * Parks the current thread until bytes are read into a byte array.
+ * @param isOpen supplies a boolean to indicate if the enclosing object is open
+ * @return the number of bytes read (>0), EOF (-1), or UNAVAILABLE (-2) if unparked
+ * or the timeout expires while waiting for bytes to be read
+ * @throws UnsupportedOperationException if not supported
+ */
+ public static int read(int fdVal, byte[] b, int off, int len, long nanos,
+ BooleanSupplier isOpen) throws IOException {
+ return POLLER_GROUP.read(fdVal, b, off, len, nanos, isOpen);
+ }
+
+ /**
+ * Parks the current thread until bytes are written from a byte array.
+ * @param isOpen supplies a boolean to indicate if the enclosing object is open
+ * @return the number of bytes read (>0), EOF (-1), or UNAVAILABLE (-2) if unparked
+ * or the timeout expires while waiting for bytes to be read
+ * @throws UnsupportedOperationException if not supported
+ */
+ public static int write(int fdVal, byte[] b, int off, int len,
+ BooleanSupplier isOpen) throws IOException {
+ return POLLER_GROUP.write(fdVal, b, off, len, isOpen);
+ }
+
+ /**
+ * Parks the current thread until bytes are read a byte array. This method is
+ * overridden by poller implementations that support this operation.
+ */
+ int implRead(int fdVal, byte[] b, int off, int len, long nanos,
+ BooleanSupplier isOpen) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Parks the current thread until bytes are written from a byte array. This
+ * method is overridden by poller implementations that support this operation.
+ */
+ int implWrite(int fdVal, byte[] b, int off, int len,
+ BooleanSupplier isOpen) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Return the master poller or null if there is no master poller.
*/
public static Poller masterPoller() {
return POLLER_GROUP.masterPoller();
< prev index next >