< prev index next > src/java.base/share/classes/sun/nio/ch/Poller.java
Print this page
* questions.
*/
package sun.nio.ch;
import java.io.IOException;
+ import java.io.UncheckedIOException;
+ import java.lang.ref.Reference;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BooleanSupplier;
+ import jdk.internal.access.JavaLangAccess;
+ import jdk.internal.access.SharedSecrets;
import jdk.internal.misc.InnocuousThread;
+ import jdk.internal.misc.TerminatingThreadLocal;
+ import jdk.internal.vm.Continuation;
+ import jdk.internal.vm.ContinuationSupport;
import jdk.internal.vm.annotation.Stable;
/**
* Polls file descriptors. Virtual threads invoke the poll method to park
* until a given file descriptor is ready for I/O.
*/
public abstract class Poller {
- private static final Pollers POLLERS;
- static {
- try {
- var pollers = new Pollers();
- pollers.start();
- POLLERS = pollers;
- } catch (IOException ioe) {
- throw new ExceptionInInitializerError(ioe);
- }
- }
+ private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
- // the poller or sub-poller thread
+ // the poller group for the I/O pollers and poller threads
+ private static final PollerGroup POLLER_GROUP = createPollerGroup();
+
+ // the poller or sub-poller thread (used for observability only)
private @Stable Thread owner;
// maps file descriptors to parked Thread
private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
+ // shutdown (if supported by poller group)
+ private volatile boolean shutdown;
+
/**
* Poller mode.
*/
enum Mode {
/**
- * ReadPoller and WritePoller are dedicated platform threads that block waiting
- * for events and unpark virtual threads when file descriptors are ready for I/O.
+ * Read and write pollers are platform threads that block waiting for events and
+ * unpark virtual threads when file descriptors are ready for I/O.
*/
SYSTEM_THREADS,
/**
- * ReadPoller and WritePoller threads are virtual threads that poll for events,
- * yielding between polls and unparking virtual threads when file descriptors are
+ * Read and write pollers are virtual threads that poll for events, yielding
+ * between polls and unparking virtual threads when file descriptors are
* ready for I/O. If there are no events then the poller threads park until there
* are I/O events to poll. This mode helps to integrate polling with virtual
* thread scheduling. The approach is similar to the default scheme in "User-level
* Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
* (https://dl.acm.org/doi/10.1145/3379483).
*/
- VTHREAD_POLLERS
+ VTHREAD_POLLERS,
+
+ /**
+ * Read pollers are per-carrier virtual threads that poll for events, yielding
+ * between polls and unparking virtual threads when file descriptors are ready
+ * for I/O. If there are no events then the poller threads park until there
+ * are I/O events to poll. The write poller is a system-wide platform thread.
+ */
+ POLLER_PER_CARRIER
+ }
+
+ /**
+ * Create and return the PollerGroup.
+ */
+ private static PollerGroup createPollerGroup() {
+ try {
+ PollerProvider provider;
+ if (System.getProperty("jdk.pollerMode") instanceof String s) {
+ Mode mode = switch (s) {
+ case "1" -> Mode.SYSTEM_THREADS;
+ case "2" -> Mode.VTHREAD_POLLERS;
+ case "3" -> Mode.POLLER_PER_CARRIER;
+ default -> {
+ throw new RuntimeException(s + " is not a valid polling mode");
+ }
+ };
+ provider = PollerProvider.createProvider(mode);
+ } else {
+ provider = PollerProvider.createProvider();
+ }
+
+ int readPollers = pollerCount("jdk.readPollers", provider.defaultReadPollers());
+ int writePollers = pollerCount("jdk.writePollers", provider.defaultWritePollers());
+ PollerGroup group = switch (provider.pollerMode()) {
+ case SYSTEM_THREADS -> new SystemThreadsPollerGroup(provider, readPollers, writePollers);
+ case VTHREAD_POLLERS -> new VThreadsPollerGroup(provider, readPollers, writePollers);
+ case POLLER_PER_CARRIER -> new PollerPerCarrierPollerGroup(provider, writePollers);
+ };
+ group.start();
+ return group;
+ } catch (IOException ioe) {
+ throw new UncheckedIOException(ioe);
+ }
}
/**
* Initialize a Poller.
*/
protected Poller() {
}
/**
- * Returns the poller's file descriptor, used when the read and write poller threads
- * are virtual threads.
- *
+ * Closes the poller and release resources. This method can only be used to cleanup
+ * when creating a poller group fails.
+ */
+ abstract void close() throws IOException;
+
+ /**
+ * Sets the poller's thread owner.
+ */
+ private void setOwner() {
+ owner = Thread.currentThread();
+ }
+
+ /**
+ * Returns true if this poller is marked for shutdown.
+ */
+ boolean isShutdown() {
+ return shutdown;
+ }
+
+ /**
+ * Marks this poller for shutdown.
+ */
+ private void setShutdown() {
+ shutdown = true;
+ }
+
+ /**
+ * Returns the poller's file descriptor to use when polling with the master poller.
* @throws UnsupportedOperationException if not supported
*/
int fdVal() {
throw new UnsupportedOperationException();
}
+ /**
+ * Invoked if when this poller's file descriptor is polled by the master poller.
+ */
+ void pollerPolled() throws IOException {
+ }
+
/**
* Register the file descriptor. The registration is "one shot", meaning it should
* be polled at most once.
*/
abstract void implRegister(int fdVal) throws IOException;
/**
* Deregister the file descriptor.
* @param polled true if the file descriptor has already been polled
*/
- abstract void implDeregister(int fdVal, boolean polled);
+ abstract void implDeregister(int fdVal, boolean polled) throws IOException;
/**
* Poll for events. The {@link #polled(int)} method is invoked for each
* polled file descriptor.
*
* @param timeout if positive then block for up to {@code timeout} milliseconds,
* if zero then don't block, if -1 then block indefinitely
- * @return the number of file descriptors polled
+ * @return >0 if file descriptors are polled, 0 if no file descriptor polled
*/
abstract int poll(int timeout) throws IOException;
+ /**
+ * Wakeup the poller thread if blocked in poll.
+ *
+ * @throws UnsupportedOperationException if not supported
+ */
+ void wakeupPoller() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Callback by the poll method when a file descriptor is polled.
*/
final void polled(int fdVal) {
- wakeup(fdVal);
+ Thread t = map.remove(fdVal);
+ if (t != null) {
+ LockSupport.unpark(t);
+ }
}
/**
* Parks the current thread until a file descriptor is ready for the given op.
* @param fdVal the file descriptor
* @param event POLLIN or POLLOUT
* @param nanos the waiting time or 0 to wait indefinitely
- * @param supplier supplies a boolean to indicate if the enclosing object is open
+ * @param isOpen supplies a boolean to indicate if the enclosing object is open
*/
- static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier)
- throws IOException
- {
- assert nanos >= 0L;
- if (event == Net.POLLIN) {
- POLLERS.readPoller(fdVal).poll(fdVal, nanos, supplier);
- } else if (event == Net.POLLOUT) {
- POLLERS.writePoller(fdVal).poll(fdVal, nanos, supplier);
- } else {
- assert false;
- }
+ public static void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
+ POLLER_GROUP.poll(fdVal, event, nanos, isOpen);
}
/**
* Parks the current thread until a Selector's file descriptor is ready.
* @param fdVal the Selector's file descriptor
* @param nanos the waiting time or 0 to wait indefinitely
*/
- static void pollSelector(int fdVal, long nanos) throws IOException {
- assert nanos >= 0L;
- Poller poller = POLLERS.masterPoller();
- if (poller == null) {
- poller = POLLERS.readPoller(fdVal);
- }
- poller.poll(fdVal, nanos, () -> true);
- }
-
- /**
- * If there is a thread polling the given file descriptor for the given event then
- * the thread is unparked.
- */
- static void stopPoll(int fdVal, int event) {
- if (event == Net.POLLIN) {
- POLLERS.readPoller(fdVal).wakeup(fdVal);
- } else if (event == Net.POLLOUT) {
- POLLERS.writePoller(fdVal).wakeup(fdVal);
- } else {
- throw new IllegalArgumentException();
- }
+ public static void pollSelector(int fdVal, long nanos) throws IOException {
+ POLLER_GROUP.pollSelector(fdVal, nanos);
}
/**
- * If there are any threads polling the given file descriptor then they are unparked.
+ * Unpark the given thread so that it stops polling.
*/
- static void stopPoll(int fdVal) {
- stopPoll(fdVal, Net.POLLIN);
- stopPoll(fdVal, Net.POLLOUT);
+ public static void stopPoll(Thread thread) {
+ LockSupport.unpark(thread);
}
/**
* Parks the current thread until a file descriptor is ready.
*/
- private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException {
+ private void poll(int fdVal, long nanos, BooleanSupplier isOpen) throws IOException {
register(fdVal);
try {
- boolean isOpen = supplier.getAsBoolean();
- if (isOpen) {
+ if (isOpen.getAsBoolean() && !isShutdown()) {
if (nanos > 0) {
LockSupport.parkNanos(nanos);
} else {
LockSupport.park();
}
try {
implRegister(fdVal);
} catch (Throwable t) {
map.remove(fdVal);
throw t;
+ } finally {
+ Reference.reachabilityFence(this);
}
}
/**
* Deregister the file descriptor so that the file descriptor is not polled.
*/
- private void deregister(int fdVal) {
+ private void deregister(int fdVal) throws IOException {
Thread previous = map.remove(fdVal);
boolean polled = (previous == null);
assert polled || previous == Thread.currentThread();
- implDeregister(fdVal, polled);
- }
-
- /**
- * Unparks any thread that is polling the given file descriptor.
- */
- private void wakeup(int fdVal) {
- Thread t = map.remove(fdVal);
- if (t != null) {
- LockSupport.unpark(t);
+ try {
+ implDeregister(fdVal, polled);
+ } finally {
+ Reference.reachabilityFence(this);
}
}
/**
* Master polling loop. The {@link #polled(int)} method is invoked for each file
* descriptor that is polled.
*/
private void pollerLoop() {
- owner = Thread.currentThread();
+ setOwner();
try {
- for (;;) {
+ while (!isShutdown()) {
poll(-1);
}
} catch (Exception e) {
e.printStackTrace();
}
* again when there are no more events. The sub-poller yields after each poll to help
* with fairness and to avoid re-registering with the master poller where possible.
*/
private void subPollerLoop(Poller masterPoller) {
assert Thread.currentThread().isVirtual();
- owner = Thread.currentThread();
+ setOwner();
try {
int polled = 0;
- for (;;) {
+ while (!isShutdown()) {
if (polled == 0) {
masterPoller.poll(fdVal(), 0, () -> true); // park
+ pollerPolled();
} else {
Thread.yield();
}
polled = poll(0);
}
e.printStackTrace();
}
}
/**
- * Returns the number I/O operations currently registered with this poller.
+ * Unparks all threads waiting on a file descriptor registered with this poller.
*/
- public int registered() {
- return map.size();
+ private void wakeupAll() {
+ map.values().forEach(LockSupport::unpark);
}
@Override
public String toString() {
return String.format("%s [registered = %d, owner = %s]",
- Objects.toIdentityString(this), registered(), owner);
+ Objects.toIdentityString(this), map.size(), owner);
}
/**
- * The Pollers used for read and write events.
+ * A group of poller threads that support virtual threads polling file descriptors.
*/
- private static class Pollers {
+ private static abstract class PollerGroup {
private final PollerProvider provider;
- private final Poller.Mode pollerMode;
- private final Poller masterPoller;
- private final Poller[] readPollers;
- private final Poller[] writePollers;
- // used by start method to executor is kept alive
- private Executor executor;
+ PollerGroup(PollerProvider provider) {
+ this.provider = provider;
+ }
+
+ final PollerProvider provider() {
+ return provider;
+ }
/**
- * Creates the Poller instances based on configuration.
+ * Starts the poller group and any system-wide poller threads.
*/
- Pollers() throws IOException {
- PollerProvider provider = PollerProvider.provider();
- Poller.Mode mode;
- String s = System.getProperty("jdk.pollerMode");
- if (s != null) {
- if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) {
- mode = Mode.SYSTEM_THREADS;
- } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) {
- mode = Mode.VTHREAD_POLLERS;
- } else {
- throw new RuntimeException("Can't parse '" + s + "' as polling mode");
+ abstract void start();
+
+ /**
+ * Parks the current thread until a file descriptor is ready for the given op.
+ */
+ abstract void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException;
+
+ /**
+ * Parks the current thread until a Selector's file descriptor is ready.
+ */
+ void pollSelector(int fdVal, long nanos) throws IOException {
+ poll(fdVal, Net.POLLIN, nanos, () -> true);
+ }
+
+ /**
+ * Starts a platform thread to run the given task.
+ */
+ protected final void startPlatformThread(String name, Runnable task) {
+ Thread thread = InnocuousThread.newSystemThread(name, task);
+ thread.setDaemon(true);
+ thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
+ thread.start();
+ }
+
+ /**
+ * Return the master poller, or null if no master poller.
+ */
+ abstract Poller masterPoller();
+
+ /**
+ * Return the read pollers.
+ */
+ abstract List<Poller> readPollers();
+
+ /**
+ * Return the write pollers.
+ */
+ abstract List<Poller> writePollers();
+
+ /**
+ * Close the given pollers.
+ */
+ static void closeAll(Poller... pollers) {
+ for (Poller poller : pollers) {
+ if (poller != null) {
+ try {
+ poller.close();
+ } catch (IOException _) { }
}
- } else {
- mode = provider.defaultPollerMode();
}
+ }
+ }
- // vthread poller mode needs a master poller
- Poller masterPoller = (mode == Mode.VTHREAD_POLLERS)
- ? provider.readPoller(false)
- : null;
+ /**
+ * SYSTEM_THREADS poller group. The read and write pollers are system-wide platform threads.
+ */
+ private static class SystemThreadsPollerGroup extends PollerGroup {
+ // system-wide read and write pollers
+ private final Poller[] readPollers;
+ private final Poller[] writePollers;
- // read pollers (or sub-pollers)
- int readPollerCount = pollerCount("jdk.readPollers", provider.defaultReadPollers(mode));
+ SystemThreadsPollerGroup(PollerProvider provider,
+ int readPollerCount,
+ int writePollerCount) throws IOException {
+ super(provider);
Poller[] readPollers = new Poller[readPollerCount];
- for (int i = 0; i < readPollerCount; i++) {
- readPollers[i] = provider.readPoller(mode == Mode.VTHREAD_POLLERS);
+ Poller[] writePollers = new Poller[writePollerCount];
+ try {
+ for (int i = 0; i < readPollerCount; i++) {
+ readPollers[i] = provider.readPoller(false);
+ }
+ for (int i = 0; i < writePollerCount; i++) {
+ writePollers[i] = provider.writePoller(false);
+ }
+ } catch (Throwable e) {
+ closeAll(readPollers);
+ closeAll(writePollers);
+ throw e;
}
- // write pollers (or sub-pollers)
- int writePollerCount = pollerCount("jdk.writePollers", provider.defaultWritePollers(mode));
+ this.readPollers = readPollers;
+ this.writePollers = writePollers;
+ }
+
+ @Override
+ void start() {
+ Arrays.stream(readPollers).forEach(p -> {
+ startPlatformThread("Read-Poller", p::pollerLoop);
+ });
+ Arrays.stream(writePollers).forEach(p -> {
+ startPlatformThread("Write-Poller", p::pollerLoop);
+ });
+ }
+
+ private Poller readPoller(int fdVal) {
+ int index = provider().fdValToIndex(fdVal, readPollers.length);
+ return readPollers[index];
+ }
+
+ private Poller writePoller(int fdVal) {
+ int index = provider().fdValToIndex(fdVal, writePollers.length);
+ return writePollers[index];
+ }
+
+ @Override
+ void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
+ Poller poller = (event == Net.POLLIN)
+ ? readPoller(fdVal)
+ : writePoller(fdVal);
+ poller.poll(fdVal, nanos, isOpen);
+ }
+
+ @Override
+ Poller masterPoller() {
+ return null;
+ }
+
+ @Override
+ List<Poller> readPollers() {
+ return List.of(readPollers);
+ }
+
+ @Override
+ List<Poller> writePollers() {
+ return List.of(writePollers);
+ }
+ }
+
+ /**
+ * VTHREAD_POLLERS poller group. The read and write pollers are virtual threads.
+ * When read and write pollers need to block then they register with a system-wide
+ * "master poller" that runs in a dedicated platform thread.
+ */
+ private static class VThreadsPollerGroup extends PollerGroup {
+ private final Poller masterPoller;
+ private final Poller[] readPollers;
+ private final Poller[] writePollers;
+
+ // keep virtual thread pollers alive
+ private final Executor executor;
+
+ VThreadsPollerGroup(PollerProvider provider,
+ int readPollerCount,
+ int writePollerCount) throws IOException {
+ super(provider);
+ Poller masterPoller = provider.readPoller(false);
+ Poller[] readPollers = new Poller[readPollerCount];
Poller[] writePollers = new Poller[writePollerCount];
- for (int i = 0; i < writePollerCount; i++) {
- writePollers[i] = provider.writePoller(mode == Mode.VTHREAD_POLLERS);
+
+ try {
+ for (int i = 0; i < readPollerCount; i++) {
+ readPollers[i] = provider.readPoller(true);
+ }
+ for (int i = 0; i < writePollerCount; i++) {
+ writePollers[i] = provider.writePoller(true);
+ }
+ } catch (Throwable e) {
+ masterPoller.close();
+ closeAll(readPollers);
+ closeAll(writePollers);
+ throw e;
}
- this.provider = provider;
- this.pollerMode = mode;
this.masterPoller = masterPoller;
this.readPollers = readPollers;
this.writePollers = writePollers;
+
+ ThreadFactory factory = Thread.ofVirtual()
+ .inheritInheritableThreadLocals(false)
+ .name("SubPoller-", 0)
+ .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
+ .factory();
+ this.executor = Executors.newThreadPerTaskExecutor(factory);
}
- /**
- * Starts the Poller threads.
- */
+ @Override
void start() {
- if (pollerMode == Mode.VTHREAD_POLLERS) {
- startPlatformThread("MasterPoller", masterPoller::pollerLoop);
- ThreadFactory factory = Thread.ofVirtual()
- .inheritInheritableThreadLocals(false)
- .name("SubPoller-", 0)
- .uncaughtExceptionHandler((t, e) -> e.printStackTrace())
- .factory();
- executor = Executors.newThreadPerTaskExecutor(factory);
- Arrays.stream(readPollers).forEach(p -> {
- executor.execute(() -> p.subPollerLoop(masterPoller));
- });
- Arrays.stream(writePollers).forEach(p -> {
- executor.execute(() -> p.subPollerLoop(masterPoller));
- });
- } else {
- Arrays.stream(readPollers).forEach(p -> {
- startPlatformThread("Read-Poller", p::pollerLoop);
- });
- Arrays.stream(writePollers).forEach(p -> {
- startPlatformThread("Write-Poller", p::pollerLoop);
- });
- }
+ startPlatformThread("Master-Poller", masterPoller::pollerLoop);
+ Arrays.stream(readPollers).forEach(p -> {
+ executor.execute(() -> p.subPollerLoop(masterPoller));
+ });
+ Arrays.stream(writePollers).forEach(p -> {
+ executor.execute(() -> p.subPollerLoop(masterPoller));
+ });
}
- /**
- * Returns the master poller, or null if there is no master poller.
- */
+ private Poller readPoller(int fdVal) {
+ int index = provider().fdValToIndex(fdVal, readPollers.length);
+ return readPollers[index];
+ }
+
+ private Poller writePoller(int fdVal) {
+ int index = provider().fdValToIndex(fdVal, writePollers.length);
+ return writePollers[index];
+ }
+
+ @Override
+ void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
+ Poller poller = (event == Net.POLLIN)
+ ? readPoller(fdVal)
+ : writePoller(fdVal);
+ poller.poll(fdVal, nanos, isOpen);
+ }
+
+ @Override
+ void pollSelector(int fdVal, long nanos) throws IOException {
+ masterPoller.poll(fdVal, nanos, () -> true);
+ }
+
+ @Override
Poller masterPoller() {
return masterPoller;
}
- /**
- * Returns the read poller for the given file descriptor.
- */
- Poller readPoller(int fdVal) {
- int index = provider.fdValToIndex(fdVal, readPollers.length);
- return readPollers[index];
+ @Override
+ List<Poller> readPollers() {
+ return List.of(readPollers);
+ }
+
+ @Override
+ List<Poller> writePollers() {
+ return List.of(writePollers);
}
+ }
+
+ /**
+ * POLLER_PER_CARRIER poller group. The read poller is a per-carrier virtual thread.
+ * When a virtual thread polls a file descriptor for POLLIN, then it will use (almost
+ * always, not guaranteed) the read poller for its carrier. When a read poller needs
+ * to block then it registers with a system-wide "master poller" that runs in a
+ * dedicated platform thread. The read poller terminates if the carrier terminates.
+ * The write pollers are system-wide platform threads (usually one).
+ */
+ private static class PollerPerCarrierPollerGroup extends PollerGroup {
+ private record CarrierPoller(PollerPerCarrierPollerGroup group, Poller readPoller) { }
+ private static final TerminatingThreadLocal<CarrierPoller> CARRIER_POLLER =
+ new TerminatingThreadLocal<>() {
+ @Override
+ protected void threadTerminated(CarrierPoller carrierPoller) {
+ Poller readPoller = carrierPoller.readPoller();
+ carrierPoller.group().carrierTerminated(readPoller);
+ }
+ };
+
+ private final Poller masterPoller;
+ private final Set<Poller> readPollers;
+ private final Poller[] writePollers;
/**
- * Returns the write poller for the given file descriptor.
+ * Create a PollerPerCarrierPollerGroup with the given number of write pollers.
*/
- Poller writePoller(int fdVal) {
- int index = provider.fdValToIndex(fdVal, writePollers.length);
+ PollerPerCarrierPollerGroup(PollerProvider provider,
+ int writePollerCount) throws IOException {
+ super(provider);
+ Poller masterPoller = provider.readPoller(false);
+ Poller[] writePollers = new Poller[writePollerCount];
+ try {
+ for (int i = 0; i < writePollerCount; i++) {
+ writePollers[i] = provider.writePoller(false);
+ }
+ } catch (Throwable e) {
+ masterPoller.close();
+ closeAll(writePollers);
+ throw e;
+ }
+ this.masterPoller = masterPoller;
+ this.readPollers = ConcurrentHashMap.newKeySet();;
+ this.writePollers = writePollers;
+ }
+
+ @Override
+ void start() {
+ startPlatformThread("Master-Poller", masterPoller::pollerLoop);
+ Arrays.stream(writePollers).forEach(p -> {
+ startPlatformThread("Write-Poller", p::pollerLoop);
+ });
+ }
+
+ private Poller writePoller(int fdVal) {
+ int index = provider().fdValToIndex(fdVal, writePollers.length);
return writePollers[index];
}
/**
- * Return the list of read pollers.
+ * Starts a read sub-poller in a virtual thread.
*/
- List<Poller> readPollers() {
- return List.of(readPollers);
+ private Poller startReadPoller() throws IOException {
+ assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
+
+ // create read sub-poller
+ Poller readPoller = provider().readPoller(true);
+ readPollers.add(readPoller);
+
+ // start virtual thread to execute sub-polling loop
+ Thread carrier = JLA.currentCarrierThread();
+ var scheduler = JLA.virtualThreadScheduler(Thread.currentThread());
+ @SuppressWarnings("restricted")
+ var _ = Thread.ofVirtual()
+ .scheduler(scheduler)
+ .inheritInheritableThreadLocals(false)
+ .name(carrier.getName() + "-Read-Poller")
+ .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
+ .start(() -> subPollerLoop(readPoller));
+ return readPoller;
}
/**
- * Return the list of write pollers.
+ * Returns the read poller for the current carrier, starting it if required.
*/
- List<Poller> writePollers() {
- return List.of(writePollers);
+ private Poller readPoller() throws IOException {
+ assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
+ Continuation.pin();
+ try {
+ CarrierPoller carrierPoller = CARRIER_POLLER.get();
+ if (carrierPoller != null) {
+ return carrierPoller.readPoller();
+ } else {
+ // first poll on this carrier will start poller
+ Poller readPoller = startReadPoller();
+ CARRIER_POLLER.set(new CarrierPoller(this, readPoller));
+ return readPoller;
+ }
+ } finally {
+ Continuation.unpin();
+ }
+ }
+
+ @Override
+ void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
+ // for POLLIN, get the read poller for this carrier
+ if (event == Net.POLLIN
+ && Thread.currentThread().isVirtual()
+ && ContinuationSupport.isSupported()) {
+ readPoller().poll(fdVal, nanos, isOpen);
+ return;
+ }
+
+ // -XX:-VMContinuations or POLLIN from platform thread does master poller
+ if (event == Net.POLLIN) {
+ masterPoller.poll(fdVal, nanos, isOpen);
+ } else {
+ writePoller(fdVal).poll(fdVal, nanos, isOpen);
+ }
}
+ @Override
+ void pollSelector(int fdVal, long nanos) throws IOException {
+ masterPoller.poll(fdVal, nanos, () -> true);
+ }
/**
- * Reads the given property name to get the poller count. If the property is
- * set then the value must be a power of 2. Returns 1 if the property is not
- * set.
- * @throws IllegalArgumentException if the property is set to a value that
- * is not a power of 2.
+ * Sub-poller polling loop.
*/
- private static int pollerCount(String propName, int defaultCount) {
- String s = System.getProperty(propName);
- int count = (s != null) ? Integer.parseInt(s) : defaultCount;
-
- // check power of 2
- if (count != Integer.highestOneBit(count)) {
- String msg = propName + " is set to a value that is not a power of 2";
- throw new IllegalArgumentException(msg);
+ private void subPollerLoop(Poller readPoller) {
+ try {
+ readPoller.subPollerLoop(masterPoller);
+ } finally {
+ // wakeup all threads waiting on file descriptors registered with the
+ // read poller, these I/O operation will migrate to another carrier.
+ readPoller.wakeupAll();
+
+ // remove from serviceability view
+ readPollers.remove(readPoller);
}
- return count;
}
/**
- * Starts a platform thread to run the given task.
+ * Invoked by the carrier thread before it terminates.
*/
- private void startPlatformThread(String name, Runnable task) {
+ private void carrierTerminated(Poller readPoller) {
+ readPoller.setShutdown();
try {
- Thread thread = InnocuousThread.newSystemThread(name, task);
- thread.setDaemon(true);
- thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
- thread.start();
- } catch (Exception e) {
- throw new InternalError(e);
+ readPoller.wakeupPoller();
+ } catch (Throwable e) {
+ e.printStackTrace();
}
}
+
+ @Override
+ Poller masterPoller() {
+ return masterPoller;
+ }
+
+ @Override
+ List<Poller> readPollers() {
+ return readPollers.stream().toList();
+ }
+
+ @Override
+ List<Poller> writePollers() {
+ return List.of(writePollers);
+ }
+ }
+
+ /**
+ * Reads the given property name to get the poller count. If the property is
+ * set then the value must be a power of 2. Returns 1 if the property is not
+ * set.
+ * @throws IllegalArgumentException if the property is set to a value that
+ * is not a power of 2.
+ */
+ private static int pollerCount(String propName, int defaultCount) {
+ String s = System.getProperty(propName);
+ int count = (s != null) ? Integer.parseInt(s) : defaultCount;
+
+ // check power of 2
+ if (count != Integer.highestOneBit(count)) {
+ String msg = propName + " is set to a value that is not a power of 2";
+ throw new IllegalArgumentException(msg);
+ }
+ return count;
}
/**
* Return the master poller or null if there is no master poller.
*/
public static Poller masterPoller() {
- return POLLERS.masterPoller();
+ return POLLER_GROUP.masterPoller();
}
/**
* Return the list of read pollers.
*/
public static List<Poller> readPollers() {
- return POLLERS.readPollers();
+ return POLLER_GROUP.readPollers();
}
/**
* Return the list of write pollers.
*/
public static List<Poller> writePollers() {
- return POLLERS.writePollers();
+ return POLLER_GROUP.writePollers();
}
}
< prev index next >