8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package sun.nio.ch;
26
27 import java.io.IOException;
28 import java.util.Arrays;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Objects;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ThreadFactory;
36 import java.util.concurrent.locks.LockSupport;
37 import java.util.function.BooleanSupplier;
38 import jdk.internal.misc.InnocuousThread;
39 import jdk.internal.vm.annotation.Stable;
40
41 /**
42 * Polls file descriptors. Virtual threads invoke the poll method to park
43 * until a given file descriptor is ready for I/O.
44 */
45 public abstract class Poller {
46 private static final Pollers POLLERS;
47 static {
48 try {
49 var pollers = new Pollers();
50 pollers.start();
51 POLLERS = pollers;
52 } catch (IOException ioe) {
53 throw new ExceptionInInitializerError(ioe);
54 }
55 }
56
57 // the poller or sub-poller thread
58 private @Stable Thread owner;
59
60 // maps file descriptors to parked Thread
61 private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
62
63 /**
64 * Poller mode.
65 */
66 enum Mode {
67 /**
68 * ReadPoller and WritePoller are dedicated platform threads that block waiting
69 * for events and unpark virtual threads when file descriptors are ready for I/O.
70 */
71 SYSTEM_THREADS,
72
73 /**
74 * ReadPoller and WritePoller threads are virtual threads that poll for events,
75 * yielding between polls and unparking virtual threads when file descriptors are
76 * ready for I/O. If there are no events then the poller threads park until there
77 * are I/O events to poll. This mode helps to integrate polling with virtual
78 * thread scheduling. The approach is similar to the default scheme in "User-level
79 * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
80 * (https://dl.acm.org/doi/10.1145/3379483).
81 */
82 VTHREAD_POLLERS
83 }
84
85 /**
86 * Initialize a Poller.
87 */
88 protected Poller() {
89 }
90
91 /**
92 * Returns the poller's file descriptor, used when the read and write poller threads
93 * are virtual threads.
94 *
95 * @throws UnsupportedOperationException if not supported
96 */
97 int fdVal() {
98 throw new UnsupportedOperationException();
99 }
100
101 /**
102 * Register the file descriptor. The registration is "one shot", meaning it should
103 * be polled at most once.
104 */
105 abstract void implRegister(int fdVal) throws IOException;
106
107 /**
108 * Deregister the file descriptor.
109 * @param polled true if the file descriptor has already been polled
110 */
111 abstract void implDeregister(int fdVal, boolean polled);
112
113 /**
114 * Poll for events. The {@link #polled(int)} method is invoked for each
115 * polled file descriptor.
116 *
117 * @param timeout if positive then block for up to {@code timeout} milliseconds,
118 * if zero then don't block, if -1 then block indefinitely
119 * @return the number of file descriptors polled
120 */
121 abstract int poll(int timeout) throws IOException;
122
123 /**
124 * Callback by the poll method when a file descriptor is polled.
125 */
126 final void polled(int fdVal) {
127 wakeup(fdVal);
128 }
129
130 /**
131 * Parks the current thread until a file descriptor is ready for the given op.
132 * @param fdVal the file descriptor
133 * @param event POLLIN or POLLOUT
134 * @param nanos the waiting time or 0 to wait indefinitely
135 * @param supplier supplies a boolean to indicate if the enclosing object is open
136 */
137 static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier)
138 throws IOException
139 {
140 assert nanos >= 0L;
141 if (event == Net.POLLIN) {
142 POLLERS.readPoller(fdVal).poll(fdVal, nanos, supplier);
143 } else if (event == Net.POLLOUT) {
144 POLLERS.writePoller(fdVal).poll(fdVal, nanos, supplier);
145 } else {
146 assert false;
147 }
148 }
149
150 /**
151 * Parks the current thread until a Selector's file descriptor is ready.
152 * @param fdVal the Selector's file descriptor
153 * @param nanos the waiting time or 0 to wait indefinitely
154 */
155 static void pollSelector(int fdVal, long nanos) throws IOException {
156 assert nanos >= 0L;
157 Poller poller = POLLERS.masterPoller();
158 if (poller == null) {
159 poller = POLLERS.readPoller(fdVal);
160 }
161 poller.poll(fdVal, nanos, () -> true);
162 }
163
164 /**
165 * If there is a thread polling the given file descriptor for the given event then
166 * the thread is unparked.
167 */
168 static void stopPoll(int fdVal, int event) {
169 if (event == Net.POLLIN) {
170 POLLERS.readPoller(fdVal).wakeup(fdVal);
171 } else if (event == Net.POLLOUT) {
172 POLLERS.writePoller(fdVal).wakeup(fdVal);
173 } else {
174 throw new IllegalArgumentException();
175 }
176 }
177
178 /**
179 * If there are any threads polling the given file descriptor then they are unparked.
180 */
181 static void stopPoll(int fdVal) {
182 stopPoll(fdVal, Net.POLLIN);
183 stopPoll(fdVal, Net.POLLOUT);
184 }
185
186 /**
187 * Parks the current thread until a file descriptor is ready.
188 */
189 private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException {
190 register(fdVal);
191 try {
192 boolean isOpen = supplier.getAsBoolean();
193 if (isOpen) {
194 if (nanos > 0) {
195 LockSupport.parkNanos(nanos);
196 } else {
197 LockSupport.park();
198 }
199 }
200 } finally {
201 deregister(fdVal);
202 }
203 }
204
205 /**
206 * Registers the file descriptor to be polled at most once when the file descriptor
207 * is ready for I/O.
208 */
209 private void register(int fdVal) throws IOException {
210 Thread previous = map.put(fdVal, Thread.currentThread());
211 assert previous == null;
212 try {
213 implRegister(fdVal);
214 } catch (Throwable t) {
215 map.remove(fdVal);
216 throw t;
217 }
218 }
219
220 /**
221 * Deregister the file descriptor so that the file descriptor is not polled.
222 */
223 private void deregister(int fdVal) {
224 Thread previous = map.remove(fdVal);
225 boolean polled = (previous == null);
226 assert polled || previous == Thread.currentThread();
227 implDeregister(fdVal, polled);
228 }
229
230 /**
231 * Unparks any thread that is polling the given file descriptor.
232 */
233 private void wakeup(int fdVal) {
234 Thread t = map.remove(fdVal);
235 if (t != null) {
236 LockSupport.unpark(t);
237 }
238 }
239
240 /**
241 * Master polling loop. The {@link #polled(int)} method is invoked for each file
242 * descriptor that is polled.
243 */
244 private void pollerLoop() {
245 owner = Thread.currentThread();
246 try {
247 for (;;) {
248 poll(-1);
249 }
250 } catch (Exception e) {
251 e.printStackTrace();
252 }
253 }
254
255 /**
256 * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
257 * descriptor that is polled.
258 *
259 * The sub-poller registers its file descriptor with the master poller to park until
260 * there are events to poll. When unparked, it does non-blocking polls and parks
261 * again when there are no more events. The sub-poller yields after each poll to help
262 * with fairness and to avoid re-registering with the master poller where possible.
263 */
264 private void subPollerLoop(Poller masterPoller) {
265 assert Thread.currentThread().isVirtual();
266 owner = Thread.currentThread();
267 try {
268 int polled = 0;
269 for (;;) {
270 if (polled == 0) {
271 masterPoller.poll(fdVal(), 0, () -> true); // park
272 } else {
273 Thread.yield();
274 }
275 polled = poll(0);
276 }
277 } catch (Exception e) {
278 e.printStackTrace();
279 }
280 }
281
282 /**
283 * Returns the number I/O operations currently registered with this poller.
284 */
285 public int registered() {
286 return map.size();
287 }
288
289 @Override
290 public String toString() {
291 return String.format("%s [registered = %d, owner = %s]",
292 Objects.toIdentityString(this), registered(), owner);
293 }
294
295 /**
296 * The Pollers used for read and write events.
297 */
298 private static class Pollers {
299 private final PollerProvider provider;
300 private final Poller.Mode pollerMode;
301 private final Poller masterPoller;
302 private final Poller[] readPollers;
303 private final Poller[] writePollers;
304
305 // used by start method to executor is kept alive
306 private Executor executor;
307
308 /**
309 * Creates the Poller instances based on configuration.
310 */
311 Pollers() throws IOException {
312 PollerProvider provider = PollerProvider.provider();
313 Poller.Mode mode;
314 String s = System.getProperty("jdk.pollerMode");
315 if (s != null) {
316 if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) {
317 mode = Mode.SYSTEM_THREADS;
318 } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) {
319 mode = Mode.VTHREAD_POLLERS;
320 } else {
321 throw new RuntimeException("Can't parse '" + s + "' as polling mode");
322 }
323 } else {
324 mode = provider.defaultPollerMode();
325 }
326
327 // vthread poller mode needs a master poller
328 Poller masterPoller = (mode == Mode.VTHREAD_POLLERS)
329 ? provider.readPoller(false)
330 : null;
331
332 // read pollers (or sub-pollers)
333 int readPollerCount = pollerCount("jdk.readPollers", provider.defaultReadPollers(mode));
334 Poller[] readPollers = new Poller[readPollerCount];
335 for (int i = 0; i < readPollerCount; i++) {
336 readPollers[i] = provider.readPoller(mode == Mode.VTHREAD_POLLERS);
337 }
338
339 // write pollers (or sub-pollers)
340 int writePollerCount = pollerCount("jdk.writePollers", provider.defaultWritePollers(mode));
341 Poller[] writePollers = new Poller[writePollerCount];
342 for (int i = 0; i < writePollerCount; i++) {
343 writePollers[i] = provider.writePoller(mode == Mode.VTHREAD_POLLERS);
344 }
345
346 this.provider = provider;
347 this.pollerMode = mode;
348 this.masterPoller = masterPoller;
349 this.readPollers = readPollers;
350 this.writePollers = writePollers;
351 }
352
353 /**
354 * Starts the Poller threads.
355 */
356 void start() {
357 if (pollerMode == Mode.VTHREAD_POLLERS) {
358 startPlatformThread("MasterPoller", masterPoller::pollerLoop);
359 ThreadFactory factory = Thread.ofVirtual()
360 .inheritInheritableThreadLocals(false)
361 .name("SubPoller-", 0)
362 .uncaughtExceptionHandler((t, e) -> e.printStackTrace())
363 .factory();
364 executor = Executors.newThreadPerTaskExecutor(factory);
365 Arrays.stream(readPollers).forEach(p -> {
366 executor.execute(() -> p.subPollerLoop(masterPoller));
367 });
368 Arrays.stream(writePollers).forEach(p -> {
369 executor.execute(() -> p.subPollerLoop(masterPoller));
370 });
371 } else {
372 Arrays.stream(readPollers).forEach(p -> {
373 startPlatformThread("Read-Poller", p::pollerLoop);
374 });
375 Arrays.stream(writePollers).forEach(p -> {
376 startPlatformThread("Write-Poller", p::pollerLoop);
377 });
378 }
379 }
380
381 /**
382 * Returns the master poller, or null if there is no master poller.
383 */
384 Poller masterPoller() {
385 return masterPoller;
386 }
387
388 /**
389 * Returns the read poller for the given file descriptor.
390 */
391 Poller readPoller(int fdVal) {
392 int index = provider.fdValToIndex(fdVal, readPollers.length);
393 return readPollers[index];
394 }
395
396 /**
397 * Returns the write poller for the given file descriptor.
398 */
399 Poller writePoller(int fdVal) {
400 int index = provider.fdValToIndex(fdVal, writePollers.length);
401 return writePollers[index];
402 }
403
404 /**
405 * Return the list of read pollers.
406 */
407 List<Poller> readPollers() {
408 return List.of(readPollers);
409 }
410
411 /**
412 * Return the list of write pollers.
413 */
414 List<Poller> writePollers() {
415 return List.of(writePollers);
416 }
417
418
419 /**
420 * Reads the given property name to get the poller count. If the property is
421 * set then the value must be a power of 2. Returns 1 if the property is not
422 * set.
423 * @throws IllegalArgumentException if the property is set to a value that
424 * is not a power of 2.
425 */
426 private static int pollerCount(String propName, int defaultCount) {
427 String s = System.getProperty(propName);
428 int count = (s != null) ? Integer.parseInt(s) : defaultCount;
429
430 // check power of 2
431 if (count != Integer.highestOneBit(count)) {
432 String msg = propName + " is set to a value that is not a power of 2";
433 throw new IllegalArgumentException(msg);
434 }
435 return count;
436 }
437
438 /**
439 * Starts a platform thread to run the given task.
440 */
441 private void startPlatformThread(String name, Runnable task) {
442 try {
443 Thread thread = InnocuousThread.newSystemThread(name, task);
444 thread.setDaemon(true);
445 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
446 thread.start();
447 } catch (Exception e) {
448 throw new InternalError(e);
449 }
450 }
451 }
452
453 /**
454 * Return the master poller or null if there is no master poller.
455 */
456 public static Poller masterPoller() {
457 return POLLERS.masterPoller();
458 }
459
460 /**
461 * Return the list of read pollers.
462 */
463 public static List<Poller> readPollers() {
464 return POLLERS.readPollers();
465 }
466
467 /**
468 * Return the list of write pollers.
469 */
470 public static List<Poller> writePollers() {
471 return POLLERS.writePollers();
472 }
473 }
|
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package sun.nio.ch;
26
27 import java.io.IOException;
28 import java.io.UncheckedIOException;
29 import java.lang.ref.Reference;
30 import java.util.Arrays;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Objects;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.ThreadFactory;
38 import java.util.concurrent.locks.LockSupport;
39 import java.util.function.BooleanSupplier;
40 import jdk.internal.access.JavaLangAccess;
41 import jdk.internal.access.SharedSecrets;
42 import jdk.internal.misc.InnocuousThread;
43 import jdk.internal.misc.TerminatingThreadLocal;
44 import jdk.internal.vm.Continuation;
45 import jdk.internal.vm.ContinuationSupport;
46 import jdk.internal.vm.annotation.Stable;
47
48 /**
49 * Polls file descriptors. Virtual threads invoke the poll method to park
50 * until a given file descriptor is ready for I/O.
51 */
52 public abstract class Poller {
53 private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
54
55 private static final PollerGroup POLLER_GROUP;
56 static {
57 try {
58 PollerProvider provider = PollerProvider.provider();
59 Mode mode = pollerMode(provider.defaultPollerMode());
60 PollerGroup group = switch (mode) {
61 case SYSTEM_THREADS -> new SystemThreadsPollerGroup(provider);
62 case VTHREAD_POLLERS -> new VirtualThreadsPollerGroup(provider);
63 case PER_CARRIER -> new PerCarrierPollerGroup(provider);
64 };
65 group.start();
66 POLLER_GROUP = group;
67 } catch (IOException ioe) {
68 throw new ExceptionInInitializerError(ioe);
69 }
70 }
71
72 // the poller or sub-poller thread
73 private @Stable Thread owner;
74
75 // maps file descriptors to parked Thread
76 private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
77
78 // shutdown if supported by poller group.
79 private volatile boolean shutdown;
80
81 /**
82 * Poller mode.
83 */
84 enum Mode {
85 /**
86 * ReadPoller and WritePoller are dedicated platform threads that block waiting
87 * for events and unpark virtual threads when file descriptors are ready for I/O.
88 */
89 SYSTEM_THREADS,
90
91 /**
92 * ReadPoller and WritePoller threads are virtual threads that poll for events,
93 * yielding between polls and unparking virtual threads when file descriptors are
94 * ready for I/O. If there are no events then the poller threads park until there
95 * are I/O events to poll. This mode helps to integrate polling with virtual
96 * thread scheduling. The approach is similar to the default scheme in "User-level
97 * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
98 * (https://dl.acm.org/doi/10.1145/3379483).
99 */
100 VTHREAD_POLLERS,
101
102 /**
103 * A dedicated ReadPoller is created for each carrier thread.
104 * There is one system-wide (carrier agnostic) WritePoller.
105 */
106 PER_CARRIER
107 }
108
109 /**
110 * Initialize a Poller.
111 */
112 protected Poller() {
113 }
114
115 /**
116 * Closes the poller and release resources. This method can only be used to cleanup
117 * when creating a poller group fails.
118 */
119 abstract void close() throws IOException;
120
121 /**
122 * Returns the poller's thread owner.
123 */
124 private Thread owner() {
125 return owner;
126 }
127
128 /**
129 * Sets the poller's thread owner.
130 */
131 private void setOwner() {
132 owner = Thread.currentThread();
133 }
134
135 /**
136 * Returns true if this poller is marked for shutdown.
137 */
138 final boolean isShutdown() {
139 return shutdown;
140 }
141
142 /**
143 * Marks this poller for shutdown.
144 */
145 private void setShutdown() {
146 shutdown = true;
147 }
148
149 /**
150 * Returns the poller's file descriptor to use when polling with the master poller.
151 * @throws UnsupportedOperationException if not supported
152 */
153 int fdVal() {
154 throw new UnsupportedOperationException();
155 }
156
157 /**
158 * Invoked if when this poller's file descriptor is polled by the master poller.
159 */
160 void pollerPolled() throws IOException {
161 }
162
163 /**
164 * Register the file descriptor. The registration is "one shot", meaning it should
165 * be polled at most once.
166 */
167 abstract void implRegister(int fdVal) throws IOException;
168
169 /**
170 * Deregister the file descriptor.
171 * @param polled true if the file descriptor has already been polled
172 */
173 abstract void implDeregister(int fdVal, boolean polled) throws IOException;
174
175 /**
176 * Poll for events. The {@link #polled(int)} method is invoked for each
177 * polled file descriptor.
178 *
179 * @param timeout if positive then block for up to {@code timeout} milliseconds,
180 * if zero then don't block, if -1 then block indefinitely
181 * @return >0 if file descriptors are polled, 0 if no file descriptor polled
182 */
183 abstract int poll(int timeout) throws IOException;
184
185 /**
186 * Wakeup the poller thread if blocked in poll.
187 *
188 * @throws UnsupportedOperationException if not supported
189 */
190 void wakeupPoller() throws IOException {
191 throw new UnsupportedOperationException();
192 }
193
194 /**
195 * Callback by the poll method when a file descriptor is polled.
196 */
197 final void polled(int fdVal) {
198 Thread t = map.remove(fdVal);
199 if (t != null) {
200 LockSupport.unpark(t);
201 }
202 }
203
204 /**
205 * Parks the current thread until a file descriptor is ready for the given op.
206 * @param fdVal the file descriptor
207 * @param event POLLIN or POLLOUT
208 * @param nanos the waiting time or 0 to wait indefinitely
209 * @param isOpen supplies a boolean to indicate if the enclosing object is open
210 */
211 static void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
212 POLLER_GROUP.poll(fdVal, event, nanos, isOpen);
213 }
214
215 /**
216 * Parks the current thread until a Selector's file descriptor is ready.
217 * @param fdVal the Selector's file descriptor
218 * @param nanos the waiting time or 0 to wait indefinitely
219 */
220 static void pollSelector(int fdVal, long nanos) throws IOException {
221 POLLER_GROUP.pollSelector(fdVal, nanos);
222 }
223
224 /**
225 * Unpark the given thread so that it stops polling.
226 */
227 static void stopPoll(Thread thread) {
228 LockSupport.unpark(thread);
229 }
230
231 /**
232 * Parks the current thread until a file descriptor is ready.
233 */
234 private void poll(int fdVal, long nanos, BooleanSupplier isOpen) throws IOException {
235 register(fdVal);
236 try {
237 if (isOpen.getAsBoolean() && !isShutdown()) {
238 if (nanos > 0) {
239 LockSupport.parkNanos(nanos);
240 } else {
241 LockSupport.park();
242 }
243 }
244 } finally {
245 deregister(fdVal);
246 }
247 }
248
249 /**
250 * Registers the file descriptor to be polled at most once when the file descriptor
251 * is ready for I/O.
252 */
253 private void register(int fdVal) throws IOException {
254 Thread previous = map.put(fdVal, Thread.currentThread());
255 assert previous == null;
256 try {
257 implRegister(fdVal);
258 } catch (Throwable t) {
259 map.remove(fdVal);
260 throw t;
261 } finally {
262 Reference.reachabilityFence(this);
263 }
264 }
265
266 /**
267 * Deregister the file descriptor so that the file descriptor is not polled.
268 */
269 private void deregister(int fdVal) throws IOException {
270 Thread previous = map.remove(fdVal);
271 boolean polled = (previous == null);
272 assert polled || previous == Thread.currentThread();
273 try {
274 implDeregister(fdVal, polled);
275 } finally {
276 Reference.reachabilityFence(this);
277 }
278 }
279
280 /**
281 * Master polling loop. The {@link #polled(int)} method is invoked for each file
282 * descriptor that is polled.
283 */
284 private void pollerLoop() {
285 setOwner();
286 try {
287 while (!isShutdown()) {
288 poll(-1);
289 }
290 } catch (Exception e) {
291 e.printStackTrace();
292 }
293 }
294
295 /**
296 * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
297 * descriptor that is polled.
298 *
299 * The sub-poller registers its file descriptor with the master poller to park until
300 * there are events to poll. When unparked, it does non-blocking polls and parks
301 * again when there are no more events. The sub-poller yields after each poll to help
302 * with fairness and to avoid re-registering with the master poller where possible.
303 */
304 private void subPollerLoop(Poller masterPoller) {
305 assert Thread.currentThread().isVirtual();
306 setOwner();
307 try {
308 int polled = 0;
309 while (!isShutdown()) {
310 if (polled == 0) {
311 masterPoller.poll(fdVal(), 0, () -> true); // park
312 pollerPolled();
313 } else {
314 Thread.yield();
315 }
316 polled = poll(0);
317 }
318 } catch (Exception e) {
319 e.printStackTrace();
320 }
321 }
322
323 /**
324 * Unparks all threads waiting on a file descriptor registered with this poller.
325 */
326 private void wakeupAll() {
327 map.values().forEach(LockSupport::unpark);
328 }
329
330 @Override
331 public String toString() {
332 return String.format("%s [registered = %d, owner = %s]",
333 Objects.toIdentityString(this), map.size(), owner);
334 }
335
336 /**
337 * A group of poller threads that support virtual threads polling file descriptors.
338 */
339 private static abstract class PollerGroup {
340 private final PollerProvider provider;
341
342 PollerGroup(PollerProvider provider) {
343 this.provider = provider;
344 }
345
346 PollerProvider provider() {
347 return provider;
348 }
349
350 /**
351 * Starts the poller group and any system-wide poller threads.
352 */
353 abstract void start();
354
355 /**
356 * Parks the current thread until a file descriptor is ready for the given op.
357 */
358 abstract void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException;
359
360 /**
361 * Parks the current thread until a Selector's file descriptor is ready.
362 */
363 void pollSelector(int fdVal, long nanos) throws IOException {
364 poll(fdVal, Net.POLLIN, nanos, () -> true);
365 }
366
367 /**
368 * Starts a platform thread to run the given task.
369 */
370 protected final void startPlatformThread(String name, Runnable task) {
371 Thread thread = InnocuousThread.newSystemThread(name, task);
372 thread.setDaemon(true);
373 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
374 thread.start();
375 }
376
377 /**
378 * Return the master poller for default scheduler, or null if no master poller.
379 */
380 abstract Poller defaultMasterPoller();
381
382 /**
383 * Return the read pollers for the default scheduler.
384 */
385 abstract List<Poller> defaultReadPollers();
386
387 /**
388 * Return the write pollers for the default scheduler.
389 */
390 abstract List<Poller> defaultWritePollers();
391 }
392
393 /**
394 * The poller group for the SYSTEM_THREADS polling mode. The read and write pollers
395 * are system-wide (scheduler agnostic) platform threads.
396 */
397 private static class SystemThreadsPollerGroup extends PollerGroup {
398 // system-wide read and write pollers
399 private final Poller[] readPollers;
400 private final Poller[] writePollers;
401
402 SystemThreadsPollerGroup(PollerProvider provider) throws IOException {
403 super(provider);
404
405 int readPollerCount = pollerCount("jdk.readPollers",
406 provider.defaultReadPollers(Mode.SYSTEM_THREADS));
407 int writePollerCount = pollerCount("jdk.writePollers",
408 provider.defaultWritePollers(Mode.SYSTEM_THREADS));
409
410 Poller[] readPollers = new Poller[readPollerCount];
411 Poller[] writePollers = new Poller[writePollerCount];
412 try {
413 for (int i = 0; i < readPollerCount; i++) {
414 readPollers[i] = provider.readPoller(false);
415 }
416 for (int i = 0; i < writePollerCount; i++) {
417 writePollers[i] = provider.writePoller(false);
418 }
419 } catch (Throwable e) {
420 closeAll(readPollers);
421 closeAll(writePollers);
422 throw e;
423 }
424
425 this.readPollers = readPollers;
426 this.writePollers = writePollers;
427 }
428
429 /**
430 * Close the given pollers.
431 */
432 private static void closeAll(Poller... pollers) {
433 for (Poller poller : pollers) {
434 if (poller != null) {
435 try {
436 poller.close();
437 } catch (IOException _) { }
438 }
439 }
440 }
441
442 /**
443 * Start poller threads.
444 */
445 @Override
446 void start() {
447 Arrays.stream(readPollers).forEach(p -> {
448 startPlatformThread("Read-Poller", p::pollerLoop);
449 });
450 Arrays.stream(writePollers).forEach(p -> {
451 startPlatformThread("Write-Poller", p::pollerLoop);
452 });
453 }
454
455 private Poller readPoller(int fdVal) {
456 int index = provider().fdValToIndex(fdVal, readPollers.length);
457 return readPollers[index];
458 }
459
460 private Poller writePoller(int fdVal) {
461 int index = provider().fdValToIndex(fdVal, writePollers.length);
462 return writePollers[index];
463 }
464
465 @Override
466 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
467 Poller poller = (event == Net.POLLIN)
468 ? readPoller(fdVal)
469 : writePoller(fdVal);
470 poller.poll(fdVal, nanos, isOpen);
471 }
472
473 @Override
474 Poller defaultMasterPoller() {
475 return null;
476 }
477
478 @Override
479 List<Poller> defaultReadPollers() {
480 return List.of(readPollers);
481 }
482
483 @Override
484 List<Poller> defaultWritePollers() {
485 return List.of(writePollers);
486 }
487 }
488
489 /**
490 * The poller group for the VTHREAD_POLLERS poller mode. The read and write pollers
491 * are scheduler-specific virtual threads. The default scheduler may have many read
492 * and write pollers. Custom schedulers have one read poller and one write poller.
493 * When read and write pollers must block then they register with a system-wide
494 * (scheduler agnostic) "master poller" that runs in a dedicated platform thread.
495 */
496 private static class VirtualThreadsPollerGroup extends PollerGroup {
497 private static final Thread.VirtualThreadScheduler DEFAULT_SCHEDULER = JLA.defaultVirtualThreadScheduler();
498
499 // system-wide master poller
500 private final Poller masterPoller;
501
502 // number of read and write pollers for default scheduler
503 private final int defaultReadPollerCount;
504 private final int defaultWritePollerCount;
505
506 // maps scheduler to a set of read and write pollers
507 private record Pollers(ExecutorService executor, Poller[] readPollers, Poller[] writePollers) { }
508 private final Map<Thread.VirtualThreadScheduler, Pollers> POLLERS = new ConcurrentHashMap<>();
509
510 VirtualThreadsPollerGroup(PollerProvider provider) throws IOException {
511 super(provider);
512
513 var mode = Mode.VTHREAD_POLLERS;
514 this.defaultReadPollerCount = pollerCount("jdk.readPollers",
515 provider.defaultReadPollers(mode));
516 this.defaultWritePollerCount = pollerCount("jdk.writePollers",
517 provider.defaultWritePollers(mode));
518 this.masterPoller = provider.readPoller(false);
519 }
520
521 @Override
522 void start() {
523 startPlatformThread("Master-Poller", masterPoller::pollerLoop);
524 }
525
526 /**
527 * Create the read and write pollers for the given scheduler.
528 */
529 private Pollers createPollers(Thread.VirtualThreadScheduler scheduler) {
530 int readPollerCount;
531 int writePollerCount;
532 if (scheduler == DEFAULT_SCHEDULER) {
533 readPollerCount = defaultReadPollerCount;
534 writePollerCount = defaultWritePollerCount;
535 } else {
536 readPollerCount = 1;
537 writePollerCount = 1;
538 }
539
540 Poller[] readPollers = new Poller[readPollerCount];
541 Poller[] writePollers = new Poller[writePollerCount];
542 try {
543 for (int i = 0; i < readPollerCount; i++) {
544 readPollers[i] = provider().readPoller(true);
545 }
546 for (int i = 0; i < writePollerCount; i++) {
547 writePollers[i] = provider().writePoller(true);
548 }
549 } catch (IOException ioe) {
550 closeAll(readPollers);
551 closeAll(writePollers);
552 throw new UncheckedIOException(ioe);
553 }
554
555 @SuppressWarnings("restricted")
556 ThreadFactory factory = Thread.ofVirtual()
557 .scheduler(scheduler)
558 .inheritInheritableThreadLocals(false)
559 .name("SubPoller-", 0)
560 .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
561 .factory();
562 ExecutorService executor = Executors.newThreadPerTaskExecutor(factory);
563
564 Arrays.stream(readPollers).forEach(p -> {
565 executor.execute(() -> p.subPollerLoop(masterPoller));
566 });
567 Arrays.stream(writePollers).forEach(p -> {
568 executor.execute(() -> p.subPollerLoop(masterPoller));
569 });
570
571 return new Pollers(executor, readPollers, writePollers);
572 }
573
574 @Override
575 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
576 Thread.VirtualThreadScheduler scheduler;
577 if (Thread.currentThread().isVirtual()) {
578 scheduler = JLA.virtualThreadScheduler(Thread.currentThread());
579 } else {
580 scheduler = DEFAULT_SCHEDULER;
581 }
582 Pollers pollers;
583 try {
584 pollers = POLLERS.computeIfAbsent(scheduler, _ -> createPollers(scheduler));
585 } catch (UncheckedIOException uioe) {
586 throw uioe.getCause();
587 }
588
589 Poller poller;
590 if (event == Net.POLLIN) {
591 Poller[] readPollers = pollers.readPollers();
592 int index = provider().fdValToIndex(fdVal, readPollers.length);
593 poller = readPollers[index];
594 } else {
595 Poller[] writePollers = pollers.writePollers();
596 int index = provider().fdValToIndex(fdVal, writePollers.length);
597 poller = writePollers[index];
598 }
599 poller.poll(fdVal, nanos, isOpen);
600 }
601
602 @Override
603 void pollSelector(int fdVal, long nanos) throws IOException {
604 masterPoller.poll(fdVal, nanos, () -> true);
605 }
606
607 /**
608 * Close the given pollers.
609 */
610 private static void closeAll(Poller... pollers) {
611 for (Poller poller : pollers) {
612 if (poller != null) {
613 try {
614 poller.close();
615 } catch (IOException _) { }
616 }
617 }
618 }
619
620 @Override
621 Poller defaultMasterPoller() {
622 return masterPoller;
623 }
624
625 @Override
626 List<Poller> defaultReadPollers() {
627 Pollers pollers = POLLERS.get(DEFAULT_SCHEDULER);
628 return (pollers != null) ? List.of(pollers.readPollers()) : List.of();
629 }
630
631 @Override
632 List<Poller> defaultWritePollers() {
633 Pollers pollers = POLLERS.get(DEFAULT_SCHEDULER);
634 return (pollers != null) ? List.of(pollers.writePollers()) : List.of();
635 }
636 }
637
638 /**
639 * The poller group for the PER_CARRIER polling mode. A dedicated read poller is
640 * created for each carrier thread. When a virtual thread polls a file descriptor
641 * for POLLIN, then it will use (almost always, not guaranteed) to see the read
642 * poller for its carrier. The read poller terminates if carrier thread terminates.
643 * There is one system-wide (carrier agnostic) write poller.
644 */
645 private static class PerCarrierPollerGroup extends PollerGroup {
646 private static final boolean USE_SUBPOLLER;
647 static {
648 String s = System.getProperty("jdk.useSubPoller");
649 USE_SUBPOLLER = (s == null) || s.isEmpty() || Boolean.parseBoolean(s);
650 }
651
652 private static final TerminatingThreadLocal<PerCarrierPollerGroup> POLLER_GROUP =
653 new TerminatingThreadLocal<>() {
654 @Override
655 protected void threadTerminated(PerCarrierPollerGroup pollerGroup) {
656 pollerGroup.carrierTerminated();
657 }
658 };
659
660 // -XX:-VMContinuations or called from platform thread
661 private static final Thread PLACEHOLDER = new Thread();
662
663 // maps carrier thread to its read poller
664 private final Map<Thread, Poller> READ_POLLERS = new ConcurrentHashMap<>();
665
666 private final Poller writePoller;
667 private final Poller masterPoller;
668
669 PerCarrierPollerGroup(PollerProvider provider) throws IOException {
670 super(provider);
671
672 this.writePoller = provider.writePoller(false);
673 if (USE_SUBPOLLER) {
674 this.masterPoller = provider.readPoller(false);
675 } else {
676 this.masterPoller = null;
677 }
678 }
679
680 @Override
681 void start() {
682 startPlatformThread("Write-Poller", writePoller::pollerLoop);
683 if (masterPoller != null) {
684 startPlatformThread("Master-Poller", masterPoller::pollerLoop);
685 }
686 }
687
688 /**
689 * Starts a read poller with the given name.
690 * @throws UncheckedIOException if an I/O error occurs
691 */
692 private Poller startReadPoller(String name) {
693 try {
694 Poller readPoller = provider().readPoller(USE_SUBPOLLER);
695 if (USE_SUBPOLLER) {
696 var scheduler = JLA.virtualThreadScheduler(Thread.currentThread());
697 @SuppressWarnings("restricted")
698 var _ = Thread.ofVirtual().scheduler(scheduler)
699 .inheritInheritableThreadLocals(false)
700 .name(name)
701 .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
702 .start(() -> subPollerLoop(readPoller));
703 } else {
704 startPlatformThread(name, () -> pollLoop(readPoller));
705 }
706 return readPoller;
707 } catch (IOException ioe) {
708 throw new UncheckedIOException(ioe);
709 }
710 }
711
712 @Override
713 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
714 if (event == Net.POLLOUT) {
715 writePoller.poll(fdVal, nanos, isOpen);
716 return;
717 }
718
719 assert event == Net.POLLIN;
720 if (Thread.currentThread().isVirtual() && ContinuationSupport.isSupported()) {
721 Poller readPoller;
722 // get read poller for this carrier
723 Continuation.pin();
724 try {
725 Thread carrier = JLA.currentCarrierThread();
726 try {
727 readPoller = READ_POLLERS.computeIfAbsent(carrier, _ -> {
728 String name = carrier.getName() + "-Read-Poller";
729 return startReadPoller(name);
730 });
731 } catch (UncheckedIOException uioe) {
732 throw uioe.getCause();
733 }
734 POLLER_GROUP.set(this);
735 } finally {
736 Continuation.unpin();
737 }
738 // may execute on a different carrier
739 readPoller.poll(fdVal, nanos, isOpen);
740 return;
741 }
742
743 // -XX:-VMContinuations or called from platform thread
744 if (masterPoller != null) {
745 masterPoller.poll(fdVal, nanos, isOpen);
746 } else {
747 Poller readPoller;
748 try {
749 readPoller = READ_POLLERS.computeIfAbsent(PLACEHOLDER,
750 _ -> startReadPoller("Read-Poller"));
751 } catch (UncheckedIOException uioe) {
752 throw uioe.getCause();
753 }
754 readPoller.poll(fdVal, nanos, isOpen);
755 }
756 }
757
758 @Override
759 void pollSelector(int fdVal, long nanos) throws IOException {
760 masterPoller.poll(fdVal, nanos, () -> true);
761 }
762
763 /**
764 * Read-poller polling loop.
765 */
766 private void pollLoop(Poller readPoller) {
767 try {
768 readPoller.pollerLoop();
769 } finally {
770 // wakeup all threads waiting on file descriptors registered with the
771 // read poller, these I/O operation will migrate to another carrier.
772 readPoller.wakeupAll();
773 }
774 }
775
776 /**
777 * Read-poll sub-poller polling loop.
778 */
779 private void subPollerLoop(Poller readPoller) {
780 try {
781 readPoller.subPollerLoop(masterPoller);
782 } finally {
783 // wakeup all threads waiting on file descriptors registered with the
784 // read poller, these I/O operation will migrate to another carrier.
785 readPoller.wakeupAll();
786 }
787 }
788
789 /**
790 * Invoked by the carrier thread before it terminates.
791 */
792 private void carrierTerminated() {
793 Poller readPoller = READ_POLLERS.remove(Thread.currentThread());
794 if (readPoller != null) {
795 readPoller.setShutdown();
796 try {
797 readPoller.wakeupPoller();
798 } catch (Throwable e) {
799 e.printStackTrace();
800 }
801 }
802 }
803
804 @Override
805 Poller defaultMasterPoller() {
806 return masterPoller;
807 }
808
809 @Override
810 List<Poller> defaultReadPollers() {
811 // return all read pollers for now.
812 return READ_POLLERS.values().stream().toList();
813 }
814
815 @Override
816 List<Poller> defaultWritePollers() {
817 return List.of(writePoller);
818 }
819 }
820
821 /**
822 * Returns the poller mode.
823 */
824 private static Mode pollerMode(Mode defaultPollerMode) {
825 String s = System.getProperty("jdk.pollerMode");
826 if (s != null) {
827 if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) {
828 return Mode.SYSTEM_THREADS;
829 } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) {
830 return Mode.VTHREAD_POLLERS;
831 } else if (s.equalsIgnoreCase(Mode.PER_CARRIER.name()) || s.equals("3")) {
832 return Mode.PER_CARRIER;
833 } else {
834 throw new RuntimeException("Can't parse '" + s + "' as polling mode");
835 }
836 } else {
837 return defaultPollerMode;
838 }
839 }
840
841 /**
842 * Reads the given property name to get the poller count. If the property is
843 * set then the value must be a power of 2. Returns 1 if the property is not
844 * set.
845 * @throws IllegalArgumentException if the property is set to a value that
846 * is not a power of 2.
847 */
848 private static int pollerCount(String propName, int defaultCount) {
849 String s = System.getProperty(propName);
850 int count = (s != null) ? Integer.parseInt(s) : defaultCount;
851
852 // check power of 2
853 if (count != Integer.highestOneBit(count)) {
854 String msg = propName + " is set to a value that is not a power of 2";
855 throw new IllegalArgumentException(msg);
856 }
857 return count;
858 }
859
860 /**
861 * Return the master poller or null if there is no master poller.
862 */
863 public static Poller masterPoller() {
864 return POLLER_GROUP.defaultMasterPoller();
865 }
866
867 /**
868 * Return the list of read pollers.
869 */
870 public static List<Poller> readPollers() {
871 return POLLER_GROUP.defaultReadPollers();
872 }
873
874 /**
875 * Return the list of write pollers.
876 */
877 public static List<Poller> writePollers() {
878 return POLLER_GROUP.defaultWritePollers();
879 }
880 }
|