< prev index next > src/java.base/share/classes/sun/nio/ch/Poller.java
Print this page
* questions.
*/
package sun.nio.ch;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BooleanSupplier;
import jdk.internal.misc.InnocuousThread;
import jdk.internal.vm.annotation.Stable;
/**
* Polls file descriptors. Virtual threads invoke the poll method to park
* until a given file descriptor is ready for I/O.
*/
public abstract class Poller {
! private static final Pollers POLLERS;
! static {
! try {
! var pollers = new Pollers();
! pollers.start();
! POLLERS = pollers;
! } catch (IOException ioe) {
! throw new ExceptionInInitializerError(ioe);
! }
! }
// the poller or sub-poller thread
private @Stable Thread owner;
// maps file descriptors to parked Thread
* questions.
*/
package sun.nio.ch;
import java.io.IOException;
+ import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BooleanSupplier;
+ import java.util.function.Supplier;
+ import jdk.internal.access.JavaLangAccess;
+ import jdk.internal.access.SharedSecrets;
import jdk.internal.misc.InnocuousThread;
import jdk.internal.vm.annotation.Stable;
/**
* Polls file descriptors. Virtual threads invoke the poll method to park
* until a given file descriptor is ready for I/O.
*/
public abstract class Poller {
! private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
!
! private static final PollerProvider PROVIDER = PollerProvider.provider();
!
! private static final Mode POLLER_MODE = pollerMode();
!
! private static final Executor DEFAULT_SCHEDULER = JLA.defaultVirtualThreadScheduler();
!
! // poller group for default scheduler
! private static final Supplier<PollerGroup> DEFAULT_POLLER_GROUP = StableValue.supplier(PollerGroup::create);
+
+ // maps scheduler to PollerGroup, custom schedulers can't be GC'ed at this time
+ private static final Map<Executor, PollerGroup> POLLER_GROUPS = new ConcurrentHashMap<>();
// the poller or sub-poller thread
private @Stable Thread owner;
// maps file descriptors to parked Thread
* Initialize a Poller.
*/
protected Poller() {
}
+ /**
+ * Closes the poller and release resources. This method can only be used to cleanup
+ * when creating a poller group fails.
+ */
+ abstract void close();
+
/**
* Returns the poller's file descriptor, used when the read and write poller threads
* are virtual threads.
*
* @throws UnsupportedOperationException if not supported
*/
static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier)
throws IOException
{
assert nanos >= 0L;
if (event == Net.POLLIN) {
! POLLERS.readPoller(fdVal).poll(fdVal, nanos, supplier);
} else if (event == Net.POLLOUT) {
! POLLERS.writePoller(fdVal).poll(fdVal, nanos, supplier);
} else {
assert false;
}
}
*/
static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier)
throws IOException
{
assert nanos >= 0L;
+ PollerGroup pollerGroup = pollerGroup(Thread.currentThread());
if (event == Net.POLLIN) {
! pollerGroup.readPoller(fdVal).poll(fdVal, nanos, supplier);
} else if (event == Net.POLLOUT) {
! pollerGroup.writePoller(fdVal).poll(fdVal, nanos, supplier);
} else {
assert false;
}
}
* @param fdVal the Selector's file descriptor
* @param nanos the waiting time or 0 to wait indefinitely
*/
static void pollSelector(int fdVal, long nanos) throws IOException {
assert nanos >= 0L;
! Poller poller = POLLERS.masterPoller();
if (poller == null) {
! poller = POLLERS.readPoller(fdVal);
}
poller.poll(fdVal, nanos, () -> true);
}
/**
! * If there is a thread polling the given file descriptor for the given event then
- * the thread is unparked.
- */
- static void stopPoll(int fdVal, int event) {
- if (event == Net.POLLIN) {
- POLLERS.readPoller(fdVal).wakeup(fdVal);
- } else if (event == Net.POLLOUT) {
- POLLERS.writePoller(fdVal).wakeup(fdVal);
- } else {
- throw new IllegalArgumentException();
- }
- }
-
- /**
- * If there are any threads polling the given file descriptor then they are unparked.
*/
! static void stopPoll(int fdVal) {
! stopPoll(fdVal, Net.POLLIN);
- stopPoll(fdVal, Net.POLLOUT);
}
/**
* Parks the current thread until a file descriptor is ready.
*/
* @param fdVal the Selector's file descriptor
* @param nanos the waiting time or 0 to wait indefinitely
*/
static void pollSelector(int fdVal, long nanos) throws IOException {
assert nanos >= 0L;
! PollerGroup pollerGroup = pollerGroup(Thread.currentThread());
+ Poller poller = pollerGroup.masterPoller();
if (poller == null) {
! poller = pollerGroup.readPoller(fdVal);
}
poller.poll(fdVal, nanos, () -> true);
}
/**
! * Unpark the given thread so that it stops polling.
*/
! static void stopPoll(Thread thread) {
! LockSupport.unpark(thread);
}
/**
* Parks the current thread until a file descriptor is ready.
*/
* The sub-poller registers its file descriptor with the master poller to park until
* there are events to poll. When unparked, it does non-blocking polls and parks
* again when there are no more events. The sub-poller yields after each poll to help
* with fairness and to avoid re-registering with the master poller where possible.
*/
! private void subPollerLoop(Poller masterPoller) {
assert Thread.currentThread().isVirtual();
owner = Thread.currentThread();
try {
int polled = 0;
! for (;;) {
if (polled == 0) {
masterPoller.poll(fdVal(), 0, () -> true); // park
} else {
Thread.yield();
}
* The sub-poller registers its file descriptor with the master poller to park until
* there are events to poll. When unparked, it does non-blocking polls and parks
* again when there are no more events. The sub-poller yields after each poll to help
* with fairness and to avoid re-registering with the master poller where possible.
*/
! private void subPollerLoop(PollerGroup pollerGroup, Poller masterPoller) {
assert Thread.currentThread().isVirtual();
owner = Thread.currentThread();
try {
int polled = 0;
! while (!pollerGroup.isShutdown()) {
if (polled == 0) {
masterPoller.poll(fdVal(), 0, () -> true); // park
} else {
Thread.yield();
}
} catch (Exception e) {
e.printStackTrace();
}
}
- /**
- * Returns the number I/O operations currently registered with this poller.
- */
- public int registered() {
- return map.size();
- }
-
@Override
public String toString() {
return String.format("%s [registered = %d, owner = %s]",
! Objects.toIdentityString(this), registered(), owner);
}
/**
! * The Pollers used for read and write events.
*/
! private static class Pollers {
! private final PollerProvider provider;
- private final Poller.Mode pollerMode;
- private final Poller masterPoller;
private final Poller[] readPollers;
private final Poller[] writePollers;
!
! // used by start method to executor is kept alive
! private Executor executor;
!
! /**
! * Creates the Poller instances based on configuration.
! */
! Pollers() throws IOException {
! PollerProvider provider = PollerProvider.provider();
! Poller.Mode mode;
! String s = System.getProperty("jdk.pollerMode");
! if (s != null) {
! if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) {
! mode = Mode.SYSTEM_THREADS;
- } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) {
- mode = Mode.VTHREAD_POLLERS;
} else {
! throw new RuntimeException("Can't parse '" + s + "' as polling mode");
}
! } else {
! mode = provider.defaultPollerMode();
}
! // vthread poller mode needs a master poller
- Poller masterPoller = (mode == Mode.VTHREAD_POLLERS)
- ? provider.readPoller(false)
- : null;
-
- // read pollers (or sub-pollers)
- int readPollerCount = pollerCount("jdk.readPollers", provider.defaultReadPollers(mode));
Poller[] readPollers = new Poller[readPollerCount];
- for (int i = 0; i < readPollerCount; i++) {
- readPollers[i] = provider.readPoller(mode == Mode.VTHREAD_POLLERS);
- }
-
- // write pollers (or sub-pollers)
- int writePollerCount = pollerCount("jdk.writePollers", provider.defaultWritePollers(mode));
Poller[] writePollers = new Poller[writePollerCount];
! for (int i = 0; i < writePollerCount; i++) {
! writePollers[i] = provider.writePoller(mode == Mode.VTHREAD_POLLERS);
}
! this.provider = provider;
- this.pollerMode = mode;
this.masterPoller = masterPoller;
this.readPollers = readPollers;
this.writePollers = writePollers;
}
/**
! * Starts the Poller threads.
*/
! void start() {
! if (pollerMode == Mode.VTHREAD_POLLERS) {
! startPlatformThread("MasterPoller", masterPoller::pollerLoop);
! ThreadFactory factory = Thread.ofVirtual()
! .inheritInheritableThreadLocals(false)
! .name("SubPoller-", 0)
! .uncaughtExceptionHandler((t, e) -> e.printStackTrace())
! .factory();
! executor = Executors.newThreadPerTaskExecutor(factory);
Arrays.stream(readPollers).forEach(p -> {
! executor.execute(() -> p.subPollerLoop(masterPoller));
});
Arrays.stream(writePollers).forEach(p -> {
! executor.execute(() -> p.subPollerLoop(masterPoller));
});
} else {
Arrays.stream(readPollers).forEach(p -> {
startPlatformThread("Read-Poller", p::pollerLoop);
});
Arrays.stream(writePollers).forEach(p -> {
startPlatformThread("Write-Poller", p::pollerLoop);
});
}
}
/**
! * Returns the master poller, or null if there is no master poller.
*/
! Poller masterPoller() {
! return masterPoller;
}
/**
! * Returns the read poller for the given file descriptor.
*/
! Poller readPoller(int fdVal) {
! int index = provider.fdValToIndex(fdVal, readPollers.length);
! return readPollers[index];
}
! /**
! * Returns the write poller for the given file descriptor.
! */
! Poller writePoller(int fdVal) {
! int index = provider.fdValToIndex(fdVal, writePollers.length);
! return writePollers[index];
}
/**
! * Return the list of read pollers.
*/
List<Poller> readPollers() {
return List.of(readPollers);
}
- /**
- * Return the list of write pollers.
- */
List<Poller> writePollers() {
return List.of(writePollers);
}
/**
* Reads the given property name to get the poller count. If the property is
* set then the value must be a power of 2. Returns 1 if the property is not
* set.
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return String.format("%s [registered = %d, owner = %s]",
! Objects.toIdentityString(this), map.size(), owner);
}
/**
! * The read/write pollers for a virtual thread scheduler.
*/
! private static class PollerGroup {
! private final Executor scheduler;
private final Poller[] readPollers;
private final Poller[] writePollers;
! private final Poller masterPoller;
! private final Executor executor;
! private volatile boolean shutdown;
!
! PollerGroup(Executor scheduler,
! Poller masterPoller,
! int readPollerCount,
! int writePollerCount) throws IOException {
! boolean subPoller = (POLLER_MODE == Mode.VTHREAD_POLLERS);
! Executor executor = null;
! if (subPoller) {
! String namePrefix;
! if (scheduler == DEFAULT_SCHEDULER) {
! namePrefix = "SubPoller-";
} else {
! namePrefix = Objects.toIdentityString(scheduler) + "-SubPoller-";
}
! @SuppressWarnings("restricted")
! ThreadFactory factory = Thread.ofVirtual()
+ .scheduler(scheduler)
+ .inheritInheritableThreadLocals(false)
+ .name(namePrefix, 0)
+ .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
+ .factory();
+ executor = Executors.newThreadPerTaskExecutor(factory);
}
! // read and write pollers (or sub-pollers)
Poller[] readPollers = new Poller[readPollerCount];
Poller[] writePollers = new Poller[writePollerCount];
! try {
! for (int i = 0; i < readPollerCount; i++) {
+ readPollers[i] = PROVIDER.readPoller(subPoller);
+ }
+ for (int i = 0; i < writePollerCount; i++) {
+ writePollers[i] = PROVIDER.writePoller(subPoller);
+ }
+ } catch (Exception e) {
+ closeAll(readPollers);
+ closeAll(writePollers);
+ throw e;
}
! this.scheduler = scheduler;
this.masterPoller = masterPoller;
this.readPollers = readPollers;
this.writePollers = writePollers;
+ this.executor = executor;
}
/**
! * Create and starts the poller group for the default scheduler.
*/
! static PollerGroup create() {
! try {
! Poller masterPoller = (POLLER_MODE == Mode.VTHREAD_POLLERS)
! ? PROVIDER.readPoller(false)
! : null;
! PollerGroup pollerGroup;
! try {
! int rc = pollerCount("jdk.readPollers", PROVIDER.defaultReadPollers(POLLER_MODE));
! int wc = pollerCount("jdk.writePollers", PROVIDER.defaultWritePollers(POLLER_MODE));
+ pollerGroup = new PollerGroup(DEFAULT_SCHEDULER, masterPoller, rc, wc);
+ } catch (Exception e) {
+ masterPoller.close();
+ throw e;
+ }
+ pollerGroup.start();
+ return pollerGroup;
+ } catch (IOException ioe) {
+ throw new UncheckedIOException(ioe);
+ }
+ }
+
+ /**
+ * Create and starts the poller group for a custom scheduler.
+ */
+ static PollerGroup create(Executor scheduler) {
+ try {
+ Poller masterPoller = DEFAULT_POLLER_GROUP.get().masterPoller();
+ var pollerGroup = new PollerGroup(scheduler, masterPoller, 1, 1);
+ pollerGroup.start();
+ return pollerGroup;
+ } catch (IOException ioe) {
+ throw new UncheckedIOException(ioe);
+ }
+ }
+
+ /**
+ * Start poller threads.
+ */
+ private void start() {
+ if (POLLER_MODE == Mode.VTHREAD_POLLERS) {
+ if (scheduler == DEFAULT_SCHEDULER) {
+ startPlatformThread("Master-Poller", masterPoller::pollerLoop);
+ }
Arrays.stream(readPollers).forEach(p -> {
! executor.execute(() -> p.subPollerLoop(this, masterPoller));
});
Arrays.stream(writePollers).forEach(p -> {
! executor.execute(() -> p.subPollerLoop(this, masterPoller));
});
} else {
+ // Mode.SYSTEM_THREADS
Arrays.stream(readPollers).forEach(p -> {
startPlatformThread("Read-Poller", p::pollerLoop);
});
Arrays.stream(writePollers).forEach(p -> {
startPlatformThread("Write-Poller", p::pollerLoop);
});
}
}
/**
! * Close the given pollers.
*/
! private void closeAll(Poller... pollers) {
! for (Poller poller : pollers) {
+ if (poller != null) {
+ poller.close();
+ }
+ }
}
/**
! * Invoked during shutdown to unpark all subpoller threads and wait for
+ * them to terminate.
*/
! private void shutdownPollers(Poller... pollers) {
! boolean interrupted = false;
! for (Poller poller : pollers) {
+ if (poller.owner instanceof Thread owner) {
+ LockSupport.unpark(owner);
+ while (owner.isAlive()) {
+ try {
+ owner.join();
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ }
+ }
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
}
! void shutdown() {
! if (scheduler == DEFAULT_SCHEDULER || POLLER_MODE == Mode.SYSTEM_THREADS) {
! throw new UnsupportedOperationException();
! }
! shutdown = true;
! shutdownPollers(readPollers);
+ shutdownPollers(writePollers);
}
/**
! *
+ * @return
*/
+ boolean isShutdown() {
+ return shutdown;
+ }
+
+ Poller masterPoller() {
+ return masterPoller;
+ }
+
List<Poller> readPollers() {
return List.of(readPollers);
}
List<Poller> writePollers() {
return List.of(writePollers);
}
+ /**
+ * Returns the read poller for the given file descriptor.
+ */
+ Poller readPoller(int fdVal) {
+ int index = PROVIDER.fdValToIndex(fdVal, readPollers.length);
+ return readPollers[index];
+ }
+
+ /**
+ * Returns the write poller for the given file descriptor.
+ */
+ Poller writePoller(int fdVal) {
+ int index = PROVIDER.fdValToIndex(fdVal, writePollers.length);
+ return writePollers[index];
+ }
/**
* Reads the given property name to get the poller count. If the property is
* set then the value must be a power of 2. Returns 1 if the property is not
* set.
throw new InternalError(e);
}
}
}
/**
* Return the master poller or null if there is no master poller.
*/
public static Poller masterPoller() {
! return POLLERS.masterPoller();
}
/**
* Return the list of read pollers.
*/
public static List<Poller> readPollers() {
! return POLLERS.readPollers();
}
/**
* Return the list of write pollers.
*/
public static List<Poller> writePollers() {
! return POLLERS.writePollers();
}
}
throw new InternalError(e);
}
}
}
+ /**
+ * Returns the poller mode.
+ */
+ private static Mode pollerMode() {
+ String s = System.getProperty("jdk.pollerMode");
+ if (s != null) {
+ if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) {
+ return Mode.SYSTEM_THREADS;
+ } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) {
+ return Mode.VTHREAD_POLLERS;
+ } else {
+ throw new RuntimeException("Can't parse '" + s + "' as polling mode");
+ }
+ } else {
+ return PROVIDER.defaultPollerMode();
+ }
+ }
+
+ /**
+ * Returns the PollerGroup that the given thread uses to poll file descriptors.
+ */
+ private static PollerGroup pollerGroup(Thread thread) {
+ if (POLLER_MODE == Mode.SYSTEM_THREADS) {
+ return DEFAULT_POLLER_GROUP.get();
+ }
+ Executor scheduler;
+ if (thread.isVirtual()) {
+ scheduler = JLA.virtualThreadScheduler(thread);
+ } else {
+ scheduler = DEFAULT_SCHEDULER;
+ }
+ return POLLER_GROUPS.computeIfAbsent(scheduler, _ -> PollerGroup.create(scheduler));
+ }
+
+ /**
+ * Invoked before the given scheduler is shutdown. In VTHREAD_POLLERS mode, this
+ * method will arrange for the sub poller threads to terminate. Does nothing in
+ * SYSTEM_THREADS mode.
+ */
+ public static void beforeShutdown(Executor executor) {
+ if (POLLER_MODE == Mode.VTHREAD_POLLERS) {
+ PollerGroup group = POLLER_GROUPS.remove(executor);
+ if (group != null) {
+ group.shutdown();
+ }
+ }
+ }
+
/**
* Return the master poller or null if there is no master poller.
*/
public static Poller masterPoller() {
! return DEFAULT_POLLER_GROUP.get().masterPoller();
}
/**
* Return the list of read pollers.
*/
public static List<Poller> readPollers() {
! return DEFAULT_POLLER_GROUP.get().readPollers();
}
/**
* Return the list of write pollers.
*/
public static List<Poller> writePollers() {
! return DEFAULT_POLLER_GROUP.get().writePollers();
}
+
}
< prev index next >