8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package sun.nio.ch;
26
27 import java.io.IOException;
28 import java.util.Arrays;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Objects;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ThreadFactory;
36 import java.util.concurrent.locks.LockSupport;
37 import java.util.function.BooleanSupplier;
38 import jdk.internal.misc.InnocuousThread;
39 import jdk.internal.vm.annotation.Stable;
40
41 /**
42 * Polls file descriptors. Virtual threads invoke the poll method to park
43 * until a given file descriptor is ready for I/O.
44 */
45 public abstract class Poller {
46 private static final Pollers POLLERS;
47 static {
48 try {
49 var pollers = new Pollers();
50 pollers.start();
51 POLLERS = pollers;
52 } catch (IOException ioe) {
53 throw new ExceptionInInitializerError(ioe);
54 }
55 }
56
57 // the poller or sub-poller thread
58 private @Stable Thread owner;
59
60 // maps file descriptors to parked Thread
61 private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
62
63 /**
64 * Poller mode.
65 */
66 enum Mode {
67 /**
68 * ReadPoller and WritePoller are dedicated platform threads that block waiting
69 * for events and unpark virtual threads when file descriptors are ready for I/O.
70 */
71 SYSTEM_THREADS,
72
73 /**
74 * ReadPoller and WritePoller threads are virtual threads that poll for events,
75 * yielding between polls and unparking virtual threads when file descriptors are
76 * ready for I/O. If there are no events then the poller threads park until there
77 * are I/O events to poll. This mode helps to integrate polling with virtual
78 * thread scheduling. The approach is similar to the default scheme in "User-level
79 * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
80 * (https://dl.acm.org/doi/10.1145/3379483).
81 */
82 VTHREAD_POLLERS
83 }
84
85 /**
86 * Initialize a Poller.
87 */
88 protected Poller() {
89 }
90
91 /**
92 * Returns the poller's file descriptor, used when the read and write poller threads
93 * are virtual threads.
94 *
95 * @throws UnsupportedOperationException if not supported
96 */
97 int fdVal() {
98 throw new UnsupportedOperationException();
99 }
100
101 /**
102 * Register the file descriptor. The registration is "one shot", meaning it should
103 * be polled at most once.
104 */
105 abstract void implRegister(int fdVal) throws IOException;
106
107 /**
108 * Deregister the file descriptor.
109 * @param polled true if the file descriptor has already been polled
110 */
111 abstract void implDeregister(int fdVal, boolean polled);
112
113 /**
114 * Poll for events. The {@link #polled(int)} method is invoked for each
115 * polled file descriptor.
116 *
117 * @param timeout if positive then block for up to {@code timeout} milliseconds,
118 * if zero then don't block, if -1 then block indefinitely
119 * @return the number of file descriptors polled
120 */
121 abstract int poll(int timeout) throws IOException;
122
123 /**
124 * Callback by the poll method when a file descriptor is polled.
125 */
126 final void polled(int fdVal) {
127 wakeup(fdVal);
128 }
129
130 /**
131 * Parks the current thread until a file descriptor is ready for the given op.
132 * @param fdVal the file descriptor
133 * @param event POLLIN or POLLOUT
134 * @param nanos the waiting time or 0 to wait indefinitely
135 * @param supplier supplies a boolean to indicate if the enclosing object is open
136 */
137 static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier)
138 throws IOException
139 {
140 assert nanos >= 0L;
141 if (event == Net.POLLIN) {
142 POLLERS.readPoller(fdVal).poll(fdVal, nanos, supplier);
143 } else if (event == Net.POLLOUT) {
144 POLLERS.writePoller(fdVal).poll(fdVal, nanos, supplier);
145 } else {
146 assert false;
147 }
148 }
149
150 /**
151 * Parks the current thread until a Selector's file descriptor is ready.
152 * @param fdVal the Selector's file descriptor
153 * @param nanos the waiting time or 0 to wait indefinitely
154 */
155 static void pollSelector(int fdVal, long nanos) throws IOException {
156 assert nanos >= 0L;
157 Poller poller = POLLERS.masterPoller();
158 if (poller == null) {
159 poller = POLLERS.readPoller(fdVal);
160 }
161 poller.poll(fdVal, nanos, () -> true);
162 }
163
164 /**
165 * If there is a thread polling the given file descriptor for the given event then
166 * the thread is unparked.
167 */
168 static void stopPoll(int fdVal, int event) {
169 if (event == Net.POLLIN) {
170 POLLERS.readPoller(fdVal).wakeup(fdVal);
171 } else if (event == Net.POLLOUT) {
172 POLLERS.writePoller(fdVal).wakeup(fdVal);
173 } else {
174 throw new IllegalArgumentException();
175 }
176 }
177
178 /**
179 * If there are any threads polling the given file descriptor then they are unparked.
180 */
181 static void stopPoll(int fdVal) {
182 stopPoll(fdVal, Net.POLLIN);
183 stopPoll(fdVal, Net.POLLOUT);
184 }
185
186 /**
187 * Parks the current thread until a file descriptor is ready.
188 */
189 private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException {
190 register(fdVal);
191 try {
192 boolean isOpen = supplier.getAsBoolean();
193 if (isOpen) {
194 if (nanos > 0) {
195 LockSupport.parkNanos(nanos);
196 } else {
197 LockSupport.park();
198 }
199 }
200 } finally {
201 deregister(fdVal);
202 }
203 }
204
205 /**
206 * Registers the file descriptor to be polled at most once when the file descriptor
207 * is ready for I/O.
208 */
209 private void register(int fdVal) throws IOException {
210 Thread previous = map.put(fdVal, Thread.currentThread());
211 assert previous == null;
212 try {
213 implRegister(fdVal);
214 } catch (Throwable t) {
215 map.remove(fdVal);
216 throw t;
217 }
218 }
219
220 /**
221 * Deregister the file descriptor so that the file descriptor is not polled.
222 */
223 private void deregister(int fdVal) {
224 Thread previous = map.remove(fdVal);
225 boolean polled = (previous == null);
226 assert polled || previous == Thread.currentThread();
227 implDeregister(fdVal, polled);
228 }
229
230 /**
231 * Unparks any thread that is polling the given file descriptor.
232 */
233 private void wakeup(int fdVal) {
234 Thread t = map.remove(fdVal);
235 if (t != null) {
236 LockSupport.unpark(t);
237 }
238 }
239
240 /**
241 * Master polling loop. The {@link #polled(int)} method is invoked for each file
242 * descriptor that is polled.
243 */
244 private void pollerLoop() {
245 owner = Thread.currentThread();
246 try {
247 for (;;) {
248 poll(-1);
249 }
250 } catch (Exception e) {
251 e.printStackTrace();
252 }
253 }
254
255 /**
256 * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
257 * descriptor that is polled.
258 *
259 * The sub-poller registers its file descriptor with the master poller to park until
260 * there are events to poll. When unparked, it does non-blocking polls and parks
261 * again when there are no more events. The sub-poller yields after each poll to help
262 * with fairness and to avoid re-registering with the master poller where possible.
263 */
264 private void subPollerLoop(Poller masterPoller) {
265 assert Thread.currentThread().isVirtual();
266 owner = Thread.currentThread();
267 try {
268 int polled = 0;
269 for (;;) {
270 if (polled == 0) {
271 masterPoller.poll(fdVal(), 0, () -> true); // park
272 } else {
273 Thread.yield();
274 }
275 polled = poll(0);
276 }
277 } catch (Exception e) {
278 e.printStackTrace();
279 }
280 }
281
282 /**
283 * Returns the number I/O operations currently registered with this poller.
284 */
285 public int registered() {
286 return map.size();
287 }
288
289 @Override
290 public String toString() {
291 return String.format("%s [registered = %d, owner = %s]",
292 Objects.toIdentityString(this), registered(), owner);
293 }
294
295 /**
296 * The Pollers used for read and write events.
297 */
298 private static class Pollers {
299 private final PollerProvider provider;
300 private final Poller.Mode pollerMode;
301 private final Poller masterPoller;
302 private final Poller[] readPollers;
303 private final Poller[] writePollers;
304
305 // used by start method to executor is kept alive
306 private Executor executor;
307
308 /**
309 * Creates the Poller instances based on configuration.
310 */
311 Pollers() throws IOException {
312 PollerProvider provider = PollerProvider.provider();
313 Poller.Mode mode;
314 String s = System.getProperty("jdk.pollerMode");
315 if (s != null) {
316 if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) {
317 mode = Mode.SYSTEM_THREADS;
318 } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) {
319 mode = Mode.VTHREAD_POLLERS;
320 } else {
321 throw new RuntimeException("Can't parse '" + s + "' as polling mode");
322 }
323 } else {
324 mode = provider.defaultPollerMode();
325 }
326
327 // vthread poller mode needs a master poller
328 Poller masterPoller = (mode == Mode.VTHREAD_POLLERS)
329 ? provider.readPoller(false)
330 : null;
331
332 // read pollers (or sub-pollers)
333 int readPollerCount = pollerCount("jdk.readPollers", provider.defaultReadPollers(mode));
334 Poller[] readPollers = new Poller[readPollerCount];
335 for (int i = 0; i < readPollerCount; i++) {
336 readPollers[i] = provider.readPoller(mode == Mode.VTHREAD_POLLERS);
337 }
338
339 // write pollers (or sub-pollers)
340 int writePollerCount = pollerCount("jdk.writePollers", provider.defaultWritePollers(mode));
341 Poller[] writePollers = new Poller[writePollerCount];
342 for (int i = 0; i < writePollerCount; i++) {
343 writePollers[i] = provider.writePoller(mode == Mode.VTHREAD_POLLERS);
344 }
345
346 this.provider = provider;
347 this.pollerMode = mode;
348 this.masterPoller = masterPoller;
349 this.readPollers = readPollers;
350 this.writePollers = writePollers;
351 }
352
353 /**
354 * Starts the Poller threads.
355 */
356 void start() {
357 if (pollerMode == Mode.VTHREAD_POLLERS) {
358 startPlatformThread("MasterPoller", masterPoller::pollerLoop);
359 ThreadFactory factory = Thread.ofVirtual()
360 .inheritInheritableThreadLocals(false)
361 .name("SubPoller-", 0)
362 .uncaughtExceptionHandler((t, e) -> e.printStackTrace())
363 .factory();
364 executor = Executors.newThreadPerTaskExecutor(factory);
365 Arrays.stream(readPollers).forEach(p -> {
366 executor.execute(() -> p.subPollerLoop(masterPoller));
367 });
368 Arrays.stream(writePollers).forEach(p -> {
369 executor.execute(() -> p.subPollerLoop(masterPoller));
370 });
371 } else {
372 Arrays.stream(readPollers).forEach(p -> {
373 startPlatformThread("Read-Poller", p::pollerLoop);
374 });
375 Arrays.stream(writePollers).forEach(p -> {
376 startPlatformThread("Write-Poller", p::pollerLoop);
377 });
378 }
379 }
380
381 /**
382 * Returns the master poller, or null if there is no master poller.
383 */
384 Poller masterPoller() {
385 return masterPoller;
386 }
387
388 /**
389 * Returns the read poller for the given file descriptor.
390 */
391 Poller readPoller(int fdVal) {
392 int index = provider.fdValToIndex(fdVal, readPollers.length);
393 return readPollers[index];
394 }
395
396 /**
397 * Returns the write poller for the given file descriptor.
398 */
399 Poller writePoller(int fdVal) {
400 int index = provider.fdValToIndex(fdVal, writePollers.length);
401 return writePollers[index];
402 }
403
404 /**
405 * Return the list of read pollers.
406 */
407 List<Poller> readPollers() {
408 return List.of(readPollers);
409 }
410
411 /**
412 * Return the list of write pollers.
413 */
414 List<Poller> writePollers() {
415 return List.of(writePollers);
416 }
417
418
419 /**
420 * Reads the given property name to get the poller count. If the property is
421 * set then the value must be a power of 2. Returns 1 if the property is not
422 * set.
423 * @throws IllegalArgumentException if the property is set to a value that
424 * is not a power of 2.
425 */
426 private static int pollerCount(String propName, int defaultCount) {
427 String s = System.getProperty(propName);
428 int count = (s != null) ? Integer.parseInt(s) : defaultCount;
429
430 // check power of 2
431 if (count != Integer.highestOneBit(count)) {
432 String msg = propName + " is set to a value that is not a power of 2";
433 throw new IllegalArgumentException(msg);
434 }
435 return count;
436 }
437
438 /**
439 * Starts a platform thread to run the given task.
440 */
441 private void startPlatformThread(String name, Runnable task) {
442 try {
443 Thread thread = InnocuousThread.newSystemThread(name, task);
444 thread.setDaemon(true);
445 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
446 thread.start();
447 } catch (Exception e) {
448 throw new InternalError(e);
449 }
450 }
451 }
452
453 /**
454 * Return the master poller or null if there is no master poller.
455 */
456 public static Poller masterPoller() {
457 return POLLERS.masterPoller();
458 }
459
460 /**
461 * Return the list of read pollers.
462 */
463 public static List<Poller> readPollers() {
464 return POLLERS.readPollers();
465 }
466
467 /**
468 * Return the list of write pollers.
469 */
470 public static List<Poller> writePollers() {
471 return POLLERS.writePollers();
472 }
473 }
|
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package sun.nio.ch;
26
27 import java.io.IOException;
28 import java.io.UncheckedIOException;
29 import java.lang.ref.Reference;
30 import java.util.Arrays;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Objects;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.Executor;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.ThreadFactory;
39 import java.util.concurrent.locks.LockSupport;
40 import java.util.function.BooleanSupplier;
41 import jdk.internal.access.JavaLangAccess;
42 import jdk.internal.access.SharedSecrets;
43 import jdk.internal.misc.InnocuousThread;
44 import jdk.internal.misc.TerminatingThreadLocal;
45 import jdk.internal.vm.Continuation;
46 import jdk.internal.vm.ContinuationSupport;
47 import jdk.internal.vm.annotation.Stable;
48
49 /**
50 * Polls file descriptors. Virtual threads invoke the poll method to park
51 * until a given file descriptor is ready for I/O.
52 */
53 public abstract class Poller {
54 private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
55
56 // the poller group for the I/O pollers and poller threads
57 private static final PollerGroup POLLER_GROUP = createPollerGroup();
58
59 // the poller or sub-poller thread (used for observability only)
60 private @Stable Thread owner;
61
62 // maps file descriptors to parked Thread
63 private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
64
65 // shutdown (if supported by poller group)
66 private volatile boolean shutdown;
67
68 /**
69 * Poller mode.
70 */
71 enum Mode {
72 /**
73 * Read and write pollers are platform threads that block waiting for events and
74 * unpark virtual threads when file descriptors are ready for I/O.
75 */
76 SYSTEM_THREADS,
77
78 /**
79 * Read and write pollers are virtual threads that poll for events, yielding
80 * between polls and unparking virtual threads when file descriptors are
81 * ready for I/O. If there are no events then the poller threads park until there
82 * are I/O events to poll. This mode helps to integrate polling with virtual
83 * thread scheduling. The approach is similar to the default scheme in "User-level
84 * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
85 * (https://dl.acm.org/doi/10.1145/3379483).
86 */
87 VTHREAD_POLLERS,
88
89 /**
90 * Read pollers are per-carrier virtual threads that poll for events, yielding
91 * between polls and unparking virtual threads when file descriptors are ready
92 * for I/O. If there are no events then the poller threads park until there
93 * are I/O events to poll. The write poller is a system-wide platform thread.
94 */
95 POLLER_PER_CARRIER
96 }
97
98 /**
99 * Create and return the PollerGroup.
100 */
101 private static PollerGroup createPollerGroup() {
102 try {
103 PollerProvider provider;
104 if (System.getProperty("jdk.pollerMode") instanceof String s) {
105 Mode mode = switch (s) {
106 case "1" -> Mode.SYSTEM_THREADS;
107 case "2" -> Mode.VTHREAD_POLLERS;
108 case "3" -> Mode.POLLER_PER_CARRIER;
109 default -> {
110 throw new RuntimeException(s + " is not a valid polling mode");
111 }
112 };
113 provider = PollerProvider.createProvider(mode);
114 } else {
115 provider = PollerProvider.createProvider();
116 }
117
118 int readPollers = pollerCount("jdk.readPollers", provider.defaultReadPollers());
119 int writePollers = pollerCount("jdk.writePollers", provider.defaultWritePollers());
120 PollerGroup group = switch (provider.pollerMode()) {
121 case SYSTEM_THREADS -> new SystemThreadsPollerGroup(provider, readPollers, writePollers);
122 case VTHREAD_POLLERS -> new VThreadsPollerGroup(provider, readPollers, writePollers);
123 case POLLER_PER_CARRIER -> new PollerPerCarrierPollerGroup(provider, writePollers);
124 };
125 group.start();
126 return group;
127 } catch (IOException ioe) {
128 throw new UncheckedIOException(ioe);
129 }
130 }
131
132 /**
133 * Initialize a Poller.
134 */
135 protected Poller() {
136 }
137
138 /**
139 * Closes the poller and release resources. This method can only be used to cleanup
140 * when creating a poller group fails.
141 */
142 abstract void close() throws IOException;
143
144 /**
145 * Sets the poller's thread owner.
146 */
147 private void setOwner() {
148 owner = Thread.currentThread();
149 }
150
151 /**
152 * Returns true if this poller is marked for shutdown.
153 */
154 boolean isShutdown() {
155 return shutdown;
156 }
157
158 /**
159 * Marks this poller for shutdown.
160 */
161 private void setShutdown() {
162 shutdown = true;
163 }
164
165 /**
166 * Returns the poller's file descriptor to use when polling with the master poller.
167 * @throws UnsupportedOperationException if not supported
168 */
169 int fdVal() {
170 throw new UnsupportedOperationException();
171 }
172
173 /**
174 * Invoked if when this poller's file descriptor is polled by the master poller.
175 */
176 void pollerPolled() throws IOException {
177 }
178
179 /**
180 * Register the file descriptor. The registration is "one shot", meaning it should
181 * be polled at most once.
182 */
183 abstract void implRegister(int fdVal) throws IOException;
184
185 /**
186 * Deregister the file descriptor.
187 * @param polled true if the file descriptor has already been polled
188 */
189 abstract void implDeregister(int fdVal, boolean polled) throws IOException;
190
191 /**
192 * Poll for events. The {@link #polled(int)} method is invoked for each
193 * polled file descriptor.
194 *
195 * @param timeout if positive then block for up to {@code timeout} milliseconds,
196 * if zero then don't block, if -1 then block indefinitely
197 * @return >0 if file descriptors are polled, 0 if no file descriptor polled
198 */
199 abstract int poll(int timeout) throws IOException;
200
201 /**
202 * Wakeup the poller thread if blocked in poll.
203 *
204 * @throws UnsupportedOperationException if not supported
205 */
206 void wakeupPoller() throws IOException {
207 throw new UnsupportedOperationException();
208 }
209
210 /**
211 * Callback by the poll method when a file descriptor is polled.
212 */
213 final void polled(int fdVal) {
214 Thread t = map.remove(fdVal);
215 if (t != null) {
216 LockSupport.unpark(t);
217 }
218 }
219
220 /**
221 * Parks the current thread until a file descriptor is ready for the given op.
222 * @param fdVal the file descriptor
223 * @param event POLLIN or POLLOUT
224 * @param nanos the waiting time or 0 to wait indefinitely
225 * @param isOpen supplies a boolean to indicate if the enclosing object is open
226 */
227 public static void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
228 POLLER_GROUP.poll(fdVal, event, nanos, isOpen);
229 }
230
231 /**
232 * Parks the current thread until a Selector's file descriptor is ready.
233 * @param fdVal the Selector's file descriptor
234 * @param nanos the waiting time or 0 to wait indefinitely
235 */
236 public static void pollSelector(int fdVal, long nanos) throws IOException {
237 POLLER_GROUP.pollSelector(fdVal, nanos);
238 }
239
240 /**
241 * Unpark the given thread so that it stops polling.
242 */
243 public static void stopPoll(Thread thread) {
244 LockSupport.unpark(thread);
245 }
246
247 /**
248 * Parks the current thread until a file descriptor is ready.
249 */
250 private void poll(int fdVal, long nanos, BooleanSupplier isOpen) throws IOException {
251 register(fdVal);
252 try {
253 if (isOpen.getAsBoolean() && !isShutdown()) {
254 if (nanos > 0) {
255 LockSupport.parkNanos(nanos);
256 } else {
257 LockSupport.park();
258 }
259 }
260 } finally {
261 deregister(fdVal);
262 }
263 }
264
265 /**
266 * Registers the file descriptor to be polled at most once when the file descriptor
267 * is ready for I/O.
268 */
269 private void register(int fdVal) throws IOException {
270 Thread previous = map.put(fdVal, Thread.currentThread());
271 assert previous == null;
272 try {
273 implRegister(fdVal);
274 } catch (Throwable t) {
275 map.remove(fdVal);
276 throw t;
277 } finally {
278 Reference.reachabilityFence(this);
279 }
280 }
281
282 /**
283 * Deregister the file descriptor so that the file descriptor is not polled.
284 */
285 private void deregister(int fdVal) throws IOException {
286 Thread previous = map.remove(fdVal);
287 boolean polled = (previous == null);
288 assert polled || previous == Thread.currentThread();
289 try {
290 implDeregister(fdVal, polled);
291 } finally {
292 Reference.reachabilityFence(this);
293 }
294 }
295
296 /**
297 * Master polling loop. The {@link #polled(int)} method is invoked for each file
298 * descriptor that is polled.
299 */
300 private void pollerLoop() {
301 setOwner();
302 try {
303 while (!isShutdown()) {
304 poll(-1);
305 }
306 } catch (Exception e) {
307 e.printStackTrace();
308 }
309 }
310
311 /**
312 * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
313 * descriptor that is polled.
314 *
315 * The sub-poller registers its file descriptor with the master poller to park until
316 * there are events to poll. When unparked, it does non-blocking polls and parks
317 * again when there are no more events. The sub-poller yields after each poll to help
318 * with fairness and to avoid re-registering with the master poller where possible.
319 */
320 private void subPollerLoop(Poller masterPoller) {
321 assert Thread.currentThread().isVirtual();
322 setOwner();
323 try {
324 int polled = 0;
325 while (!isShutdown()) {
326 if (polled == 0) {
327 masterPoller.poll(fdVal(), 0, () -> true); // park
328 pollerPolled();
329 } else {
330 Thread.yield();
331 }
332 polled = poll(0);
333 }
334 } catch (Exception e) {
335 e.printStackTrace();
336 }
337 }
338
339 /**
340 * Unparks all threads waiting on a file descriptor registered with this poller.
341 */
342 private void wakeupAll() {
343 map.values().forEach(LockSupport::unpark);
344 }
345
346 @Override
347 public String toString() {
348 return String.format("%s [registered = %d, owner = %s]",
349 Objects.toIdentityString(this), map.size(), owner);
350 }
351
352 /**
353 * A group of poller threads that support virtual threads polling file descriptors.
354 */
355 private static abstract class PollerGroup {
356 private final PollerProvider provider;
357
358 PollerGroup(PollerProvider provider) {
359 this.provider = provider;
360 }
361
362 final PollerProvider provider() {
363 return provider;
364 }
365
366 /**
367 * Starts the poller group and any system-wide poller threads.
368 */
369 abstract void start();
370
371 /**
372 * Parks the current thread until a file descriptor is ready for the given op.
373 */
374 abstract void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException;
375
376 /**
377 * Parks the current thread until a Selector's file descriptor is ready.
378 */
379 void pollSelector(int fdVal, long nanos) throws IOException {
380 poll(fdVal, Net.POLLIN, nanos, () -> true);
381 }
382
383 /**
384 * Starts a platform thread to run the given task.
385 */
386 protected final void startPlatformThread(String name, Runnable task) {
387 Thread thread = InnocuousThread.newSystemThread(name, task);
388 thread.setDaemon(true);
389 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
390 thread.start();
391 }
392
393 /**
394 * Return the master poller, or null if no master poller.
395 */
396 abstract Poller masterPoller();
397
398 /**
399 * Return the read pollers.
400 */
401 abstract List<Poller> readPollers();
402
403 /**
404 * Return the write pollers.
405 */
406 abstract List<Poller> writePollers();
407
408 /**
409 * Close the given pollers.
410 */
411 static void closeAll(Poller... pollers) {
412 for (Poller poller : pollers) {
413 if (poller != null) {
414 try {
415 poller.close();
416 } catch (IOException _) { }
417 }
418 }
419 }
420 }
421
422 /**
423 * SYSTEM_THREADS poller group. The read and write pollers are system-wide platform threads.
424 */
425 private static class SystemThreadsPollerGroup extends PollerGroup {
426 // system-wide read and write pollers
427 private final Poller[] readPollers;
428 private final Poller[] writePollers;
429
430 SystemThreadsPollerGroup(PollerProvider provider,
431 int readPollerCount,
432 int writePollerCount) throws IOException {
433 super(provider);
434 Poller[] readPollers = new Poller[readPollerCount];
435 Poller[] writePollers = new Poller[writePollerCount];
436 try {
437 for (int i = 0; i < readPollerCount; i++) {
438 readPollers[i] = provider.readPoller(false);
439 }
440 for (int i = 0; i < writePollerCount; i++) {
441 writePollers[i] = provider.writePoller(false);
442 }
443 } catch (Throwable e) {
444 closeAll(readPollers);
445 closeAll(writePollers);
446 throw e;
447 }
448
449 this.readPollers = readPollers;
450 this.writePollers = writePollers;
451 }
452
453 @Override
454 void start() {
455 Arrays.stream(readPollers).forEach(p -> {
456 startPlatformThread("Read-Poller", p::pollerLoop);
457 });
458 Arrays.stream(writePollers).forEach(p -> {
459 startPlatformThread("Write-Poller", p::pollerLoop);
460 });
461 }
462
463 private Poller readPoller(int fdVal) {
464 int index = provider().fdValToIndex(fdVal, readPollers.length);
465 return readPollers[index];
466 }
467
468 private Poller writePoller(int fdVal) {
469 int index = provider().fdValToIndex(fdVal, writePollers.length);
470 return writePollers[index];
471 }
472
473 @Override
474 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
475 Poller poller = (event == Net.POLLIN)
476 ? readPoller(fdVal)
477 : writePoller(fdVal);
478 poller.poll(fdVal, nanos, isOpen);
479 }
480
481 @Override
482 Poller masterPoller() {
483 return null;
484 }
485
486 @Override
487 List<Poller> readPollers() {
488 return List.of(readPollers);
489 }
490
491 @Override
492 List<Poller> writePollers() {
493 return List.of(writePollers);
494 }
495 }
496
497 /**
498 * VTHREAD_POLLERS poller group. The read and write pollers are virtual threads.
499 * When read and write pollers need to block then they register with a system-wide
500 * "master poller" that runs in a dedicated platform thread.
501 */
502 private static class VThreadsPollerGroup extends PollerGroup {
503 private final Poller masterPoller;
504 private final Poller[] readPollers;
505 private final Poller[] writePollers;
506
507 // keep virtual thread pollers alive
508 private final Executor executor;
509
510 VThreadsPollerGroup(PollerProvider provider,
511 int readPollerCount,
512 int writePollerCount) throws IOException {
513 super(provider);
514 Poller masterPoller = provider.readPoller(false);
515 Poller[] readPollers = new Poller[readPollerCount];
516 Poller[] writePollers = new Poller[writePollerCount];
517
518 try {
519 for (int i = 0; i < readPollerCount; i++) {
520 readPollers[i] = provider.readPoller(true);
521 }
522 for (int i = 0; i < writePollerCount; i++) {
523 writePollers[i] = provider.writePoller(true);
524 }
525 } catch (Throwable e) {
526 masterPoller.close();
527 closeAll(readPollers);
528 closeAll(writePollers);
529 throw e;
530 }
531
532 this.masterPoller = masterPoller;
533 this.readPollers = readPollers;
534 this.writePollers = writePollers;
535
536 ThreadFactory factory = Thread.ofVirtual()
537 .inheritInheritableThreadLocals(false)
538 .name("SubPoller-", 0)
539 .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
540 .factory();
541 this.executor = Executors.newThreadPerTaskExecutor(factory);
542 }
543
544 @Override
545 void start() {
546 startPlatformThread("Master-Poller", masterPoller::pollerLoop);
547 Arrays.stream(readPollers).forEach(p -> {
548 executor.execute(() -> p.subPollerLoop(masterPoller));
549 });
550 Arrays.stream(writePollers).forEach(p -> {
551 executor.execute(() -> p.subPollerLoop(masterPoller));
552 });
553 }
554
555 private Poller readPoller(int fdVal) {
556 int index = provider().fdValToIndex(fdVal, readPollers.length);
557 return readPollers[index];
558 }
559
560 private Poller writePoller(int fdVal) {
561 int index = provider().fdValToIndex(fdVal, writePollers.length);
562 return writePollers[index];
563 }
564
565 @Override
566 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
567 Poller poller = (event == Net.POLLIN)
568 ? readPoller(fdVal)
569 : writePoller(fdVal);
570 poller.poll(fdVal, nanos, isOpen);
571 }
572
573 @Override
574 void pollSelector(int fdVal, long nanos) throws IOException {
575 masterPoller.poll(fdVal, nanos, () -> true);
576 }
577
578 @Override
579 Poller masterPoller() {
580 return masterPoller;
581 }
582
583 @Override
584 List<Poller> readPollers() {
585 return List.of(readPollers);
586 }
587
588 @Override
589 List<Poller> writePollers() {
590 return List.of(writePollers);
591 }
592 }
593
594 /**
595 * POLLER_PER_CARRIER poller group. The read poller is a per-carrier virtual thread.
596 * When a virtual thread polls a file descriptor for POLLIN, then it will use (almost
597 * always, not guaranteed) the read poller for its carrier. When a read poller needs
598 * to block then it registers with a system-wide "master poller" that runs in a
599 * dedicated platform thread. The read poller terminates if the carrier terminates.
600 * The write pollers are system-wide platform threads (usually one).
601 */
602 private static class PollerPerCarrierPollerGroup extends PollerGroup {
603 private record CarrierPoller(PollerPerCarrierPollerGroup group, Poller readPoller) { }
604 private static final TerminatingThreadLocal<CarrierPoller> CARRIER_POLLER =
605 new TerminatingThreadLocal<>() {
606 @Override
607 protected void threadTerminated(CarrierPoller carrierPoller) {
608 Poller readPoller = carrierPoller.readPoller();
609 carrierPoller.group().carrierTerminated(readPoller);
610 }
611 };
612
613 private final Poller masterPoller;
614 private final Set<Poller> readPollers;
615 private final Poller[] writePollers;
616
617 /**
618 * Create a PollerPerCarrierPollerGroup with the given number of write pollers.
619 */
620 PollerPerCarrierPollerGroup(PollerProvider provider,
621 int writePollerCount) throws IOException {
622 super(provider);
623 Poller masterPoller = provider.readPoller(false);
624 Poller[] writePollers = new Poller[writePollerCount];
625 try {
626 for (int i = 0; i < writePollerCount; i++) {
627 writePollers[i] = provider.writePoller(false);
628 }
629 } catch (Throwable e) {
630 masterPoller.close();
631 closeAll(writePollers);
632 throw e;
633 }
634 this.masterPoller = masterPoller;
635 this.readPollers = ConcurrentHashMap.newKeySet();;
636 this.writePollers = writePollers;
637 }
638
639 @Override
640 void start() {
641 startPlatformThread("Master-Poller", masterPoller::pollerLoop);
642 Arrays.stream(writePollers).forEach(p -> {
643 startPlatformThread("Write-Poller", p::pollerLoop);
644 });
645 }
646
647 private Poller writePoller(int fdVal) {
648 int index = provider().fdValToIndex(fdVal, writePollers.length);
649 return writePollers[index];
650 }
651
652 /**
653 * Starts a read sub-poller in a virtual thread.
654 */
655 private Poller startReadPoller() throws IOException {
656 assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
657
658 // create read sub-poller
659 Poller readPoller = provider().readPoller(true);
660 readPollers.add(readPoller);
661
662 // start virtual thread to execute sub-polling loop
663 Thread carrier = JLA.currentCarrierThread();
664 var scheduler = JLA.virtualThreadScheduler(Thread.currentThread());
665 @SuppressWarnings("restricted")
666 var _ = Thread.ofVirtual()
667 .scheduler(scheduler)
668 .inheritInheritableThreadLocals(false)
669 .name(carrier.getName() + "-Read-Poller")
670 .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
671 .start(() -> subPollerLoop(readPoller));
672 return readPoller;
673 }
674
675 /**
676 * Returns the read poller for the current carrier, starting it if required.
677 */
678 private Poller readPoller() throws IOException {
679 assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
680 Continuation.pin();
681 try {
682 CarrierPoller carrierPoller = CARRIER_POLLER.get();
683 if (carrierPoller != null) {
684 return carrierPoller.readPoller();
685 } else {
686 // first poll on this carrier will start poller
687 Poller readPoller = startReadPoller();
688 CARRIER_POLLER.set(new CarrierPoller(this, readPoller));
689 return readPoller;
690 }
691 } finally {
692 Continuation.unpin();
693 }
694 }
695
696 @Override
697 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
698 // for POLLIN, get the read poller for this carrier
699 if (event == Net.POLLIN
700 && Thread.currentThread().isVirtual()
701 && ContinuationSupport.isSupported()) {
702 readPoller().poll(fdVal, nanos, isOpen);
703 return;
704 }
705
706 // -XX:-VMContinuations or POLLIN from platform thread does master poller
707 if (event == Net.POLLIN) {
708 masterPoller.poll(fdVal, nanos, isOpen);
709 } else {
710 writePoller(fdVal).poll(fdVal, nanos, isOpen);
711 }
712 }
713
714 @Override
715 void pollSelector(int fdVal, long nanos) throws IOException {
716 masterPoller.poll(fdVal, nanos, () -> true);
717 }
718
719 /**
720 * Sub-poller polling loop.
721 */
722 private void subPollerLoop(Poller readPoller) {
723 try {
724 readPoller.subPollerLoop(masterPoller);
725 } finally {
726 // wakeup all threads waiting on file descriptors registered with the
727 // read poller, these I/O operation will migrate to another carrier.
728 readPoller.wakeupAll();
729
730 // remove from serviceability view
731 readPollers.remove(readPoller);
732 }
733 }
734
735 /**
736 * Invoked by the carrier thread before it terminates.
737 */
738 private void carrierTerminated(Poller readPoller) {
739 readPoller.setShutdown();
740 try {
741 readPoller.wakeupPoller();
742 } catch (Throwable e) {
743 e.printStackTrace();
744 }
745 }
746
747 @Override
748 Poller masterPoller() {
749 return masterPoller;
750 }
751
752 @Override
753 List<Poller> readPollers() {
754 return readPollers.stream().toList();
755 }
756
757 @Override
758 List<Poller> writePollers() {
759 return List.of(writePollers);
760 }
761 }
762
763 /**
764 * Reads the given property name to get the poller count. If the property is
765 * set then the value must be a power of 2. Returns 1 if the property is not
766 * set.
767 * @throws IllegalArgumentException if the property is set to a value that
768 * is not a power of 2.
769 */
770 private static int pollerCount(String propName, int defaultCount) {
771 String s = System.getProperty(propName);
772 int count = (s != null) ? Integer.parseInt(s) : defaultCount;
773
774 // check power of 2
775 if (count != Integer.highestOneBit(count)) {
776 String msg = propName + " is set to a value that is not a power of 2";
777 throw new IllegalArgumentException(msg);
778 }
779 return count;
780 }
781
782 /**
783 * Return the master poller or null if there is no master poller.
784 */
785 public static Poller masterPoller() {
786 return POLLER_GROUP.masterPoller();
787 }
788
789 /**
790 * Return the list of read pollers.
791 */
792 public static List<Poller> readPollers() {
793 return POLLER_GROUP.readPollers();
794 }
795
796 /**
797 * Return the list of write pollers.
798 */
799 public static List<Poller> writePollers() {
800 return POLLER_GROUP.writePollers();
801 }
802 }
|