8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package sun.nio.ch;
26
27 import java.io.IOException;
28 import java.util.Arrays;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Objects;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ThreadFactory;
36 import java.util.concurrent.locks.LockSupport;
37 import java.util.function.BooleanSupplier;
38 import jdk.internal.misc.InnocuousThread;
39 import jdk.internal.vm.annotation.Stable;
40
41 /**
42 * Polls file descriptors. Virtual threads invoke the poll method to park
43 * until a given file descriptor is ready for I/O.
44 */
45 public abstract class Poller {
46 private static final Pollers POLLERS;
47 static {
48 try {
49 var pollers = new Pollers();
50 pollers.start();
51 POLLERS = pollers;
52 } catch (IOException ioe) {
53 throw new ExceptionInInitializerError(ioe);
54 }
55 }
56
57 // the poller or sub-poller thread
58 private @Stable Thread owner;
59
60 // maps file descriptors to parked Thread
61 private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
62
63 /**
64 * Poller mode.
65 */
66 enum Mode {
67 /**
68 * ReadPoller and WritePoller are dedicated platform threads that block waiting
69 * for events and unpark virtual threads when file descriptors are ready for I/O.
70 */
71 SYSTEM_THREADS,
72
73 /**
74 * ReadPoller and WritePoller threads are virtual threads that poll for events,
75 * yielding between polls and unparking virtual threads when file descriptors are
76 * ready for I/O. If there are no events then the poller threads park until there
77 * are I/O events to poll. This mode helps to integrate polling with virtual
78 * thread scheduling. The approach is similar to the default scheme in "User-level
79 * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
80 * (https://dl.acm.org/doi/10.1145/3379483).
81 */
82 VTHREAD_POLLERS
83 }
84
85 /**
86 * Initialize a Poller.
87 */
88 protected Poller() {
89 }
90
91 /**
92 * Returns the poller's file descriptor, used when the read and write poller threads
93 * are virtual threads.
94 *
95 * @throws UnsupportedOperationException if not supported
96 */
97 int fdVal() {
98 throw new UnsupportedOperationException();
99 }
100
101 /**
102 * Register the file descriptor. The registration is "one shot", meaning it should
103 * be polled at most once.
104 */
105 abstract void implRegister(int fdVal) throws IOException;
106
107 /**
108 * Deregister the file descriptor.
109 * @param polled true if the file descriptor has already been polled
110 */
111 abstract void implDeregister(int fdVal, boolean polled);
112
113 /**
114 * Poll for events. The {@link #polled(int)} method is invoked for each
115 * polled file descriptor.
116 *
117 * @param timeout if positive then block for up to {@code timeout} milliseconds,
118 * if zero then don't block, if -1 then block indefinitely
119 * @return the number of file descriptors polled
120 */
121 abstract int poll(int timeout) throws IOException;
122
123 /**
124 * Callback by the poll method when a file descriptor is polled.
125 */
126 final void polled(int fdVal) {
127 wakeup(fdVal);
128 }
129
130 /**
131 * Parks the current thread until a file descriptor is ready for the given op.
132 * @param fdVal the file descriptor
133 * @param event POLLIN or POLLOUT
134 * @param nanos the waiting time or 0 to wait indefinitely
135 * @param supplier supplies a boolean to indicate if the enclosing object is open
136 */
137 static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier)
138 throws IOException
139 {
140 assert nanos >= 0L;
141 if (event == Net.POLLIN) {
142 POLLERS.readPoller(fdVal).poll(fdVal, nanos, supplier);
143 } else if (event == Net.POLLOUT) {
144 POLLERS.writePoller(fdVal).poll(fdVal, nanos, supplier);
145 } else {
146 assert false;
147 }
148 }
149
150 /**
151 * Parks the current thread until a Selector's file descriptor is ready.
152 * @param fdVal the Selector's file descriptor
153 * @param nanos the waiting time or 0 to wait indefinitely
154 */
155 static void pollSelector(int fdVal, long nanos) throws IOException {
156 assert nanos >= 0L;
157 Poller poller = POLLERS.masterPoller();
158 if (poller == null) {
159 poller = POLLERS.readPoller(fdVal);
160 }
161 poller.poll(fdVal, nanos, () -> true);
162 }
163
164 /**
165 * If there is a thread polling the given file descriptor for the given event then
166 * the thread is unparked.
167 */
168 static void stopPoll(int fdVal, int event) {
169 if (event == Net.POLLIN) {
170 POLLERS.readPoller(fdVal).wakeup(fdVal);
171 } else if (event == Net.POLLOUT) {
172 POLLERS.writePoller(fdVal).wakeup(fdVal);
173 } else {
174 throw new IllegalArgumentException();
175 }
176 }
177
178 /**
179 * If there are any threads polling the given file descriptor then they are unparked.
180 */
181 static void stopPoll(int fdVal) {
182 stopPoll(fdVal, Net.POLLIN);
183 stopPoll(fdVal, Net.POLLOUT);
184 }
185
186 /**
187 * Parks the current thread until a file descriptor is ready.
188 */
189 private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException {
190 register(fdVal);
191 try {
192 boolean isOpen = supplier.getAsBoolean();
193 if (isOpen) {
194 if (nanos > 0) {
195 LockSupport.parkNanos(nanos);
196 } else {
197 LockSupport.park();
198 }
199 }
200 } finally {
201 deregister(fdVal);
202 }
203 }
204
205 /**
206 * Registers the file descriptor to be polled at most once when the file descriptor
207 * is ready for I/O.
208 */
209 private void register(int fdVal) throws IOException {
210 Thread previous = map.put(fdVal, Thread.currentThread());
211 assert previous == null;
212 try {
213 implRegister(fdVal);
214 } catch (Throwable t) {
215 map.remove(fdVal);
216 throw t;
217 }
218 }
219
220 /**
221 * Deregister the file descriptor so that the file descriptor is not polled.
222 */
223 private void deregister(int fdVal) {
224 Thread previous = map.remove(fdVal);
225 boolean polled = (previous == null);
226 assert polled || previous == Thread.currentThread();
227 implDeregister(fdVal, polled);
228 }
229
230 /**
231 * Unparks any thread that is polling the given file descriptor.
232 */
233 private void wakeup(int fdVal) {
234 Thread t = map.remove(fdVal);
235 if (t != null) {
236 LockSupport.unpark(t);
237 }
238 }
239
240 /**
241 * Master polling loop. The {@link #polled(int)} method is invoked for each file
242 * descriptor that is polled.
243 */
244 private void pollerLoop() {
245 owner = Thread.currentThread();
246 try {
247 for (;;) {
248 poll(-1);
249 }
250 } catch (Exception e) {
251 e.printStackTrace();
252 }
253 }
254
255 /**
256 * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
257 * descriptor that is polled.
258 *
259 * The sub-poller registers its file descriptor with the master poller to park until
260 * there are events to poll. When unparked, it does non-blocking polls and parks
261 * again when there are no more events. The sub-poller yields after each poll to help
262 * with fairness and to avoid re-registering with the master poller where possible.
263 */
264 private void subPollerLoop(Poller masterPoller) {
265 assert Thread.currentThread().isVirtual();
266 owner = Thread.currentThread();
267 try {
268 int polled = 0;
269 for (;;) {
270 if (polled == 0) {
271 masterPoller.poll(fdVal(), 0, () -> true); // park
272 } else {
273 Thread.yield();
274 }
275 polled = poll(0);
276 }
277 } catch (Exception e) {
278 e.printStackTrace();
279 }
280 }
281
282 /**
283 * Returns the number I/O operations currently registered with this poller.
284 */
285 public int registered() {
286 return map.size();
287 }
288
289 @Override
290 public String toString() {
291 return String.format("%s [registered = %d, owner = %s]",
292 Objects.toIdentityString(this), registered(), owner);
293 }
294
295 /**
296 * The Pollers used for read and write events.
297 */
298 private static class Pollers {
299 private final PollerProvider provider;
300 private final Poller.Mode pollerMode;
301 private final Poller masterPoller;
302 private final Poller[] readPollers;
303 private final Poller[] writePollers;
304
305 // used by start method to executor is kept alive
306 private Executor executor;
307
308 /**
309 * Creates the Poller instances based on configuration.
310 */
311 Pollers() throws IOException {
312 PollerProvider provider = PollerProvider.provider();
313 Poller.Mode mode;
314 String s = System.getProperty("jdk.pollerMode");
315 if (s != null) {
316 if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) {
317 mode = Mode.SYSTEM_THREADS;
318 } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) {
319 mode = Mode.VTHREAD_POLLERS;
320 } else {
321 throw new RuntimeException("Can't parse '" + s + "' as polling mode");
322 }
323 } else {
324 mode = provider.defaultPollerMode();
325 }
326
327 // vthread poller mode needs a master poller
328 Poller masterPoller = (mode == Mode.VTHREAD_POLLERS)
329 ? provider.readPoller(false)
330 : null;
331
332 // read pollers (or sub-pollers)
333 int readPollerCount = pollerCount("jdk.readPollers", provider.defaultReadPollers(mode));
334 Poller[] readPollers = new Poller[readPollerCount];
335 for (int i = 0; i < readPollerCount; i++) {
336 readPollers[i] = provider.readPoller(mode == Mode.VTHREAD_POLLERS);
337 }
338
339 // write pollers (or sub-pollers)
340 int writePollerCount = pollerCount("jdk.writePollers", provider.defaultWritePollers(mode));
341 Poller[] writePollers = new Poller[writePollerCount];
342 for (int i = 0; i < writePollerCount; i++) {
343 writePollers[i] = provider.writePoller(mode == Mode.VTHREAD_POLLERS);
344 }
345
346 this.provider = provider;
347 this.pollerMode = mode;
348 this.masterPoller = masterPoller;
349 this.readPollers = readPollers;
350 this.writePollers = writePollers;
351 }
352
353 /**
354 * Starts the Poller threads.
355 */
356 void start() {
357 if (pollerMode == Mode.VTHREAD_POLLERS) {
358 startPlatformThread("MasterPoller", masterPoller::pollerLoop);
359 ThreadFactory factory = Thread.ofVirtual()
360 .inheritInheritableThreadLocals(false)
361 .name("SubPoller-", 0)
362 .uncaughtExceptionHandler((t, e) -> e.printStackTrace())
363 .factory();
364 executor = Executors.newThreadPerTaskExecutor(factory);
365 Arrays.stream(readPollers).forEach(p -> {
366 executor.execute(() -> p.subPollerLoop(masterPoller));
367 });
368 Arrays.stream(writePollers).forEach(p -> {
369 executor.execute(() -> p.subPollerLoop(masterPoller));
370 });
371 } else {
372 Arrays.stream(readPollers).forEach(p -> {
373 startPlatformThread("Read-Poller", p::pollerLoop);
374 });
375 Arrays.stream(writePollers).forEach(p -> {
376 startPlatformThread("Write-Poller", p::pollerLoop);
377 });
378 }
379 }
380
381 /**
382 * Returns the master poller, or null if there is no master poller.
383 */
384 Poller masterPoller() {
385 return masterPoller;
386 }
387
388 /**
389 * Returns the read poller for the given file descriptor.
390 */
391 Poller readPoller(int fdVal) {
392 int index = provider.fdValToIndex(fdVal, readPollers.length);
393 return readPollers[index];
394 }
395
396 /**
397 * Returns the write poller for the given file descriptor.
398 */
399 Poller writePoller(int fdVal) {
400 int index = provider.fdValToIndex(fdVal, writePollers.length);
401 return writePollers[index];
402 }
403
404 /**
405 * Return the list of read pollers.
406 */
407 List<Poller> readPollers() {
408 return List.of(readPollers);
409 }
410
411 /**
412 * Return the list of write pollers.
413 */
414 List<Poller> writePollers() {
415 return List.of(writePollers);
416 }
417
418
419 /**
420 * Reads the given property name to get the poller count. If the property is
421 * set then the value must be a power of 2. Returns 1 if the property is not
422 * set.
423 * @throws IllegalArgumentException if the property is set to a value that
424 * is not a power of 2.
425 */
426 private static int pollerCount(String propName, int defaultCount) {
427 String s = System.getProperty(propName);
428 int count = (s != null) ? Integer.parseInt(s) : defaultCount;
429
430 // check power of 2
431 if (count != Integer.highestOneBit(count)) {
432 String msg = propName + " is set to a value that is not a power of 2";
433 throw new IllegalArgumentException(msg);
434 }
435 return count;
436 }
437
438 /**
439 * Starts a platform thread to run the given task.
440 */
441 private void startPlatformThread(String name, Runnable task) {
442 try {
443 Thread thread = InnocuousThread.newSystemThread(name, task);
444 thread.setDaemon(true);
445 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
446 thread.start();
447 } catch (Exception e) {
448 throw new InternalError(e);
449 }
450 }
451 }
452
453 /**
454 * Return the master poller or null if there is no master poller.
455 */
456 public static Poller masterPoller() {
457 return POLLERS.masterPoller();
458 }
459
460 /**
461 * Return the list of read pollers.
462 */
463 public static List<Poller> readPollers() {
464 return POLLERS.readPollers();
465 }
466
467 /**
468 * Return the list of write pollers.
469 */
470 public static List<Poller> writePollers() {
471 return POLLERS.writePollers();
472 }
473 }
|
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package sun.nio.ch;
26
27 import java.io.IOException;
28 import java.io.UncheckedIOException;
29 import java.lang.ref.Reference;
30 import java.util.Arrays;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Objects;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.Executor;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.ThreadFactory;
39 import java.util.concurrent.locks.LockSupport;
40 import java.util.function.BooleanSupplier;
41 import jdk.internal.access.JavaLangAccess;
42 import jdk.internal.access.SharedSecrets;
43 import jdk.internal.misc.InnocuousThread;
44 import jdk.internal.misc.TerminatingThreadLocal;
45 import jdk.internal.vm.Continuation;
46 import jdk.internal.vm.ContinuationSupport;
47 import jdk.internal.vm.annotation.Stable;
48
49 /**
50 * I/O poller to allow virtual threads park until a file descriptor is ready for I/O.
51 * Implementations also optionally support read/write operations where virtual threads
52 * park until bytes are read or written.
53 */
54 public abstract class Poller {
55 private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
56
57 // the poller group for the I/O pollers and poller threads
58 private static final PollerGroup POLLER_GROUP = createPollerGroup();
59
60 // the poller or sub-poller thread (used for observability only)
61 private @Stable Thread owner;
62
63 // maps file descriptors to parked Thread
64 private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
65
66 // shutdown (if supported by poller group)
67 private volatile boolean shutdown;
68
69 /**
70 * Poller mode.
71 */
72 enum Mode {
73 /**
74 * Read and write pollers are platform threads that block waiting for events and
75 * unpark virtual threads when file descriptors are ready for I/O.
76 */
77 SYSTEM_THREADS,
78
79 /**
80 * Read and write pollers are virtual threads that poll for events, yielding
81 * between polls and unparking virtual threads when file descriptors are
82 * ready for I/O. If there are no events then the poller threads park until there
83 * are I/O events to poll. This mode helps to integrate polling with virtual
84 * thread scheduling. The approach is similar to the default scheme in "User-level
85 * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
86 * (https://dl.acm.org/doi/10.1145/3379483).
87 */
88 VTHREAD_POLLERS,
89
90 /**
91 * Read pollers are per-carrier virtual threads that poll for events, yielding
92 * between polls and unparking virtual threads when file descriptors are ready
93 * for I/O. If there are no events then the poller threads park until there
94 * are I/O events to poll. The write poller is a system-wide platform thread.
95 */
96 POLLER_PER_CARRIER
97 }
98
99 /**
100 * Create and return the PollerGroup.
101 */
102 private static PollerGroup createPollerGroup() {
103 try {
104 PollerProvider provider;
105 if (System.getProperty("jdk.pollerMode") instanceof String s) {
106 Mode mode = switch (s) {
107 case "1" -> Mode.SYSTEM_THREADS;
108 case "2" -> Mode.VTHREAD_POLLERS;
109 case "3" -> Mode.POLLER_PER_CARRIER;
110 default -> {
111 throw new RuntimeException(s + " is not a valid polling mode");
112 }
113 };
114 provider = PollerProvider.createProvider(mode);
115 } else {
116 provider = PollerProvider.createProvider();
117 }
118
119 int readPollers = pollerCount("jdk.readPollers", provider.defaultReadPollers());
120 int writePollers = pollerCount("jdk.writePollers", provider.defaultWritePollers());
121 PollerGroup group = switch (provider.pollerMode()) {
122 case SYSTEM_THREADS -> new SystemThreadsPollerGroup(provider, readPollers, writePollers);
123 case VTHREAD_POLLERS -> new VThreadsPollerGroup(provider, readPollers, writePollers);
124 case POLLER_PER_CARRIER -> new PollerPerCarrierPollerGroup(provider, writePollers);
125 };
126 group.start();
127 return group;
128 } catch (IOException ioe) {
129 throw new UncheckedIOException(ioe);
130 }
131 }
132
133 /**
134 * Initialize a Poller.
135 */
136 protected Poller() {
137 }
138
139 /**
140 * Closes the poller and release resources. This method can only be used to cleanup
141 * when creating a poller group fails.
142 */
143 abstract void close() throws IOException;
144
145 /**
146 * Sets the poller's thread owner.
147 */
148 private void setOwner() {
149 owner = Thread.currentThread();
150 }
151
152 /**
153 * Returns true if this poller is marked for shutdown.
154 */
155 boolean isShutdown() {
156 return shutdown;
157 }
158
159 /**
160 * Marks this poller for shutdown.
161 */
162 private void setShutdown() {
163 shutdown = true;
164 }
165
166 /**
167 * Returns the poller's file descriptor to use when polling with the master poller.
168 * @throws UnsupportedOperationException if not supported
169 */
170 int fdVal() {
171 throw new UnsupportedOperationException();
172 }
173
174 /**
175 * Invoked if when this poller's file descriptor is polled by the master poller.
176 */
177 void pollerPolled() throws IOException {
178 }
179
180 /**
181 * Register the file descriptor with the I/O event management facility so that it is
182 * polled when the file descriptor is ready for I/O. The registration is "one shot",
183 * meaning it should be polled at most once.
184 */
185 abstract void implStartPoll(int fdVal) throws IOException;
186
187 /**
188 * Deregister a file descriptor from the I/O event management facility. This may be
189 * a no-op in some implementations when the file descriptor has already been polled.
190 * @param polled true if the file descriptor has already been polled
191 */
192 abstract void implStopPoll(int fdVal, boolean polled) throws IOException;
193
194 /**
195 * Poll for events. The {@link #polled(int)} method is invoked for each
196 * polled file descriptor.
197 *
198 * @param timeout if positive then block for up to {@code timeout} milliseconds,
199 * if zero then don't block, if -1 then block indefinitely
200 * @return >0 if file descriptors are polled, 0 if no file descriptor polled
201 */
202 abstract int poll(int timeout) throws IOException;
203
204 /**
205 * Wakeup the poller thread if blocked in poll so it can shutdown.
206 * @throws UnsupportedOperationException if not supported
207 */
208 void wakeupPoller() throws IOException {
209 throw new UnsupportedOperationException();
210 }
211
212 /**
213 * Callback by the poll method when a file descriptor is polled.
214 */
215 final void polled(int fdVal) {
216 Thread t = map.remove(fdVal);
217 if (t != null) {
218 LockSupport.unpark(t);
219 }
220 }
221
222 /**
223 * Parks the current thread until a file descriptor is ready for the given op.
224 * @param fdVal the file descriptor
225 * @param event POLLIN or POLLOUT
226 * @param nanos the waiting time or 0 to wait indefinitely
227 * @param isOpen supplies a boolean to indicate if the enclosing object is open
228 */
229 public static void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
230 POLLER_GROUP.poll(fdVal, event, nanos, isOpen);
231 }
232
233 /**
234 * Parks the current thread until a Selector's file descriptor is ready.
235 * @param fdVal the Selector's file descriptor
236 * @param nanos the waiting time or 0 to wait indefinitely
237 */
238 public static void pollSelector(int fdVal, long nanos) throws IOException {
239 POLLER_GROUP.pollSelector(fdVal, nanos);
240 }
241
242 /**
243 * Unpark the given thread so that it stops polling.
244 */
245 public static void stopPoll(Thread thread) {
246 LockSupport.unpark(thread);
247 }
248
249 /**
250 * Parks the current thread until a file descriptor is ready.
251 */
252 private void poll(int fdVal, long nanos, BooleanSupplier isOpen) throws IOException {
253 startPoll(fdVal);
254 try {
255 if (isOpen.getAsBoolean() && !isShutdown()) {
256 if (nanos > 0) {
257 LockSupport.parkNanos(nanos);
258 } else {
259 LockSupport.park();
260 }
261 }
262 } finally {
263 stopPoll(fdVal);
264 }
265 }
266
267 /**
268 * Register a file descriptor with the I/O event management facility so that it is
269 * polled when the file descriptor is ready for I/O.
270 */
271 private void startPoll(int fdVal) throws IOException {
272 Thread previous = map.put(fdVal, Thread.currentThread());
273 assert previous == null;
274 try {
275 implStartPoll(fdVal);
276 } catch (Throwable t) {
277 map.remove(fdVal);
278 throw t;
279 } finally {
280 Reference.reachabilityFence(this);
281 }
282 }
283
284 /**
285 * Deregister a file descriptor from the I/O event management facility.
286 */
287 private void stopPoll(int fdVal) throws IOException {
288 Thread previous = map.remove(fdVal);
289 boolean polled = (previous == null);
290 assert polled || previous == Thread.currentThread();
291 try {
292 implStopPoll(fdVal, polled);
293 } finally {
294 Reference.reachabilityFence(this);
295 }
296 }
297
298 /**
299 * Master polling loop. The {@link #polled(int)} method is invoked for each file
300 * descriptor that is polled.
301 */
302 private void pollerLoop() {
303 setOwner();
304 try {
305 while (!isShutdown()) {
306 poll(-1);
307 }
308 } catch (Exception e) {
309 e.printStackTrace();
310 }
311 }
312
313 /**
314 * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
315 * descriptor that is polled.
316 *
317 * The sub-poller registers its file descriptor with the master poller to park until
318 * there are events to poll. When unparked, it does non-blocking polls and parks
319 * again when there are no more events. The sub-poller yields after each poll to help
320 * with fairness and to avoid re-registering with the master poller where possible.
321 */
322 private void subPollerLoop(Poller masterPoller) {
323 assert Thread.currentThread().isVirtual();
324 setOwner();
325 try {
326 int polled = 0;
327 while (!isShutdown()) {
328 if (polled == 0) {
329 masterPoller.poll(fdVal(), 0, () -> true); // park
330 pollerPolled();
331 } else {
332 Thread.yield();
333 }
334 polled = poll(0);
335 }
336 } catch (Exception e) {
337 e.printStackTrace();
338 }
339 }
340
341 /**
342 * Unparks all threads waiting on a file descriptor registered with this poller.
343 */
344 private void wakeupAll() {
345 map.values().forEach(LockSupport::unpark);
346 }
347
348 @Override
349 public String toString() {
350 return String.format("%s [registered = %d, owner = %s]",
351 Objects.toIdentityString(this), map.size(), owner);
352 }
353
354 /**
355 * A group of poller threads that support virtual threads polling file descriptors.
356 */
357 private static abstract class PollerGroup {
358 private final PollerProvider provider;
359
360 PollerGroup(PollerProvider provider) {
361 this.provider = provider;
362 }
363
364 final PollerProvider provider() {
365 return provider;
366 }
367
368 /**
369 * Starts the poller group and any system-wide poller threads.
370 */
371 abstract void start();
372
373 /**
374 * Parks the current thread until a file descriptor is ready for the given op.
375 */
376 abstract void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException;
377
378 /**
379 * Parks the current thread until a Selector's file descriptor is ready.
380 */
381 void pollSelector(int fdVal, long nanos) throws IOException {
382 poll(fdVal, Net.POLLIN, nanos, () -> true);
383 }
384
385 /**
386 * Starts a platform thread to run the given task.
387 */
388 protected final void startPlatformThread(String name, Runnable task) {
389 Thread thread = InnocuousThread.newSystemThread(name, task);
390 thread.setDaemon(true);
391 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
392 thread.start();
393 }
394
395 /**
396 * Return the master poller, or null if no master poller.
397 */
398 abstract Poller masterPoller();
399
400 /**
401 * Return the read pollers.
402 */
403 abstract List<Poller> readPollers();
404
405 /**
406 * Return the write pollers.
407 */
408 abstract List<Poller> writePollers();
409
410 /**
411 * Close the given pollers.
412 */
413 static void closeAll(Poller... pollers) {
414 for (Poller poller : pollers) {
415 if (poller != null) {
416 try {
417 poller.close();
418 } catch (IOException _) { }
419 }
420 }
421 }
422
423 /**
424 * Returns true if the read pollers in this poller group support read ops in
425 * addition to POLLIN polling.
426 */
427 boolean supportReadOps() {
428 return provider().supportReadOps();
429 }
430
431 /**
432 * Reads bytes into a byte array.
433 * @throws UnsupportedOperationException if not supported
434 */
435 abstract int read(int fdVal, byte[] b, int off, int len, long nanos,
436 BooleanSupplier isOpen) throws IOException;
437
438 /**
439 * Returns true if the write pollers in this poller group support write ops in
440 * addition to POLLOUT polling.
441 */
442 boolean supportWriteOps() {
443 return provider().supportWriteOps();
444 }
445
446 /**
447 * Write bytes from a byte array.
448 * @throws UnsupportedOperationException if not supported
449 */
450 abstract int write(int fdVal, byte[] b, int off, int len,
451 BooleanSupplier isOpen) throws IOException;
452 }
453
454 /**
455 * SYSTEM_THREADS poller group. The read and write pollers are system-wide platform threads.
456 */
457 private static class SystemThreadsPollerGroup extends PollerGroup {
458 // system-wide read and write pollers
459 private final Poller[] readPollers;
460 private final Poller[] writePollers;
461
462 SystemThreadsPollerGroup(PollerProvider provider,
463 int readPollerCount,
464 int writePollerCount) throws IOException {
465 super(provider);
466 Poller[] readPollers = new Poller[readPollerCount];
467 Poller[] writePollers = new Poller[writePollerCount];
468 try {
469 for (int i = 0; i < readPollerCount; i++) {
470 readPollers[i] = provider.readPoller(false);
471 }
472 for (int i = 0; i < writePollerCount; i++) {
473 writePollers[i] = provider.writePoller(false);
474 }
475 } catch (Throwable e) {
476 closeAll(readPollers);
477 closeAll(writePollers);
478 throw e;
479 }
480
481 this.readPollers = readPollers;
482 this.writePollers = writePollers;
483 }
484
485 @Override
486 void start() {
487 Arrays.stream(readPollers).forEach(p -> {
488 startPlatformThread("Read-Poller", p::pollerLoop);
489 });
490 Arrays.stream(writePollers).forEach(p -> {
491 startPlatformThread("Write-Poller", p::pollerLoop);
492 });
493 }
494
495 private Poller readPoller(int fdVal) {
496 int index = provider().fdValToIndex(fdVal, readPollers.length);
497 return readPollers[index];
498 }
499
500 private Poller writePoller(int fdVal) {
501 int index = provider().fdValToIndex(fdVal, writePollers.length);
502 return writePollers[index];
503 }
504
505 @Override
506 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
507 Poller poller = (event == Net.POLLIN)
508 ? readPoller(fdVal)
509 : writePoller(fdVal);
510 poller.poll(fdVal, nanos, isOpen);
511 }
512
513 @Override
514 int read(int fdVal, byte[] b, int off, int len, long nanos,
515 BooleanSupplier isOpen) throws IOException {
516 return readPoller(fdVal).implRead(fdVal, b, off, len, nanos, isOpen);
517 }
518
519 @Override
520 int write(int fdVal, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException {
521 return writePoller(fdVal).implWrite(fdVal, b, off, len, isOpen);
522 }
523
524 @Override
525 Poller masterPoller() {
526 return null;
527 }
528
529 @Override
530 List<Poller> readPollers() {
531 return List.of(readPollers);
532 }
533
534 @Override
535 List<Poller> writePollers() {
536 return List.of(writePollers);
537 }
538 }
539
540 /**
541 * VTHREAD_POLLERS poller group. The read and write pollers are virtual threads.
542 * When read and write pollers need to block then they register with a system-wide
543 * "master poller" that runs in a dedicated platform thread.
544 */
545 private static class VThreadsPollerGroup extends PollerGroup {
546 private final Poller masterPoller;
547 private final Poller[] readPollers;
548 private final Poller[] writePollers;
549
550 // keep virtual thread pollers alive
551 private final Executor executor;
552
553 VThreadsPollerGroup(PollerProvider provider,
554 int readPollerCount,
555 int writePollerCount) throws IOException {
556 super(provider);
557 Poller masterPoller = provider.readPoller(false);
558 Poller[] readPollers = new Poller[readPollerCount];
559 Poller[] writePollers = new Poller[writePollerCount];
560
561 try {
562 for (int i = 0; i < readPollerCount; i++) {
563 readPollers[i] = provider.readPoller(true);
564 }
565 for (int i = 0; i < writePollerCount; i++) {
566 writePollers[i] = provider.writePoller(true);
567 }
568 } catch (Throwable e) {
569 masterPoller.close();
570 closeAll(readPollers);
571 closeAll(writePollers);
572 throw e;
573 }
574
575 this.masterPoller = masterPoller;
576 this.readPollers = readPollers;
577 this.writePollers = writePollers;
578
579 ThreadFactory factory = Thread.ofVirtual()
580 .inheritInheritableThreadLocals(false)
581 .name("SubPoller-", 0)
582 .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
583 .factory();
584 this.executor = Executors.newThreadPerTaskExecutor(factory);
585 }
586
587 @Override
588 void start() {
589 startPlatformThread("Master-Poller", masterPoller::pollerLoop);
590 Arrays.stream(readPollers).forEach(p -> {
591 executor.execute(() -> p.subPollerLoop(masterPoller));
592 });
593 Arrays.stream(writePollers).forEach(p -> {
594 executor.execute(() -> p.subPollerLoop(masterPoller));
595 });
596 }
597
598 private Poller readPoller(int fdVal) {
599 int index = provider().fdValToIndex(fdVal, readPollers.length);
600 return readPollers[index];
601 }
602
603 private Poller writePoller(int fdVal) {
604 int index = provider().fdValToIndex(fdVal, writePollers.length);
605 return writePollers[index];
606 }
607
608 @Override
609 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
610 Poller poller = (event == Net.POLLIN)
611 ? readPoller(fdVal)
612 : writePoller(fdVal);
613 poller.poll(fdVal, nanos, isOpen);
614 }
615
616 @Override
617 int read(int fdVal, byte[] b, int off, int len, long nanos,
618 BooleanSupplier isOpen) throws IOException {
619 return readPoller(fdVal).implRead(fdVal, b, off, len, nanos, isOpen);
620 }
621
622 @Override
623 int write(int fdVal, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException {
624 return writePoller(fdVal).implWrite(fdVal, b, off, len, isOpen);
625 }
626
627 @Override
628 void pollSelector(int fdVal, long nanos) throws IOException {
629 masterPoller.poll(fdVal, nanos, () -> true);
630 }
631
632 @Override
633 Poller masterPoller() {
634 return masterPoller;
635 }
636
637 @Override
638 List<Poller> readPollers() {
639 return List.of(readPollers);
640 }
641
642 @Override
643 List<Poller> writePollers() {
644 return List.of(writePollers);
645 }
646 }
647
648 /**
649 * POLLER_PER_CARRIER poller group. The read poller is a per-carrier virtual thread.
650 * When a virtual thread polls a file descriptor for POLLIN, then it will use (almost
651 * always, not guaranteed) the read poller for its carrier. When a read poller needs
652 * to block then it registers with a system-wide "master poller" that runs in a
653 * dedicated platform thread. The read poller terminates if the carrier terminates.
654 * The write pollers are system-wide platform threads (usually one).
655 */
656 private static class PollerPerCarrierPollerGroup extends PollerGroup {
657 private record CarrierPoller(PollerPerCarrierPollerGroup group, Poller readPoller) { }
658 private static final TerminatingThreadLocal<CarrierPoller> CARRIER_POLLER =
659 new TerminatingThreadLocal<>() {
660 @Override
661 protected void threadTerminated(CarrierPoller carrierPoller) {
662 Poller readPoller = carrierPoller.readPoller();
663 carrierPoller.group().carrierTerminated(readPoller);
664 }
665 };
666
667 private final Poller masterPoller;
668 private final Set<Poller> readPollers;
669 private final Poller[] writePollers;
670
671 /**
672 * Create a PollerPerCarrierPollerGroup with the given number of write pollers.
673 */
674 PollerPerCarrierPollerGroup(PollerProvider provider,
675 int writePollerCount) throws IOException {
676 super(provider);
677 Poller masterPoller = provider.readPoller(false);
678 Poller[] writePollers = new Poller[writePollerCount];
679 try {
680 for (int i = 0; i < writePollerCount; i++) {
681 writePollers[i] = provider.writePoller(false);
682 }
683 } catch (Throwable e) {
684 masterPoller.close();
685 closeAll(writePollers);
686 throw e;
687 }
688 this.masterPoller = masterPoller;
689 this.readPollers = ConcurrentHashMap.newKeySet();;
690 this.writePollers = writePollers;
691 }
692
693 @Override
694 void start() {
695 startPlatformThread("Master-Poller", masterPoller::pollerLoop);
696 Arrays.stream(writePollers).forEach(p -> {
697 startPlatformThread("Write-Poller", p::pollerLoop);
698 });
699 }
700
701 private Poller writePoller(int fdVal) {
702 int index = provider().fdValToIndex(fdVal, writePollers.length);
703 return writePollers[index];
704 }
705
706 /**
707 * Starts a read sub-poller in a virtual thread.
708 */
709 private Poller startReadPoller() throws IOException {
710 assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
711
712 // create read sub-poller
713 Poller readPoller = provider().readPoller(true);
714 readPollers.add(readPoller);
715
716 // start virtual thread to execute sub-polling loop
717 Thread carrier = JLA.currentCarrierThread();
718 var scheduler = JLA.virtualThreadScheduler(Thread.currentThread());
719 @SuppressWarnings("restricted")
720 var _ = Thread.ofVirtual()
721 .scheduler(scheduler)
722 .inheritInheritableThreadLocals(false)
723 .name(carrier.getName() + "-Read-Poller")
724 .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
725 .start(() -> subPollerLoop(readPoller));
726 return readPoller;
727 }
728
729 /**
730 * Returns the read poller for the current carrier, starting it if required.
731 */
732 private Poller readPoller() throws IOException {
733 assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
734 Continuation.pin();
735 try {
736 CarrierPoller carrierPoller = CARRIER_POLLER.get();
737 if (carrierPoller != null) {
738 return carrierPoller.readPoller();
739 } else {
740 // first poll on this carrier will start poller
741 Poller readPoller = startReadPoller();
742 CARRIER_POLLER.set(new CarrierPoller(this, readPoller));
743 return readPoller;
744 }
745 } finally {
746 Continuation.unpin();
747 }
748 }
749
750 @Override
751 void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
752 // for POLLIN, get the read poller for this carrier
753 if (event == Net.POLLIN
754 && Thread.currentThread().isVirtual()
755 && ContinuationSupport.isSupported()) {
756 readPoller().poll(fdVal, nanos, isOpen);
757 return;
758 }
759
760 // -XX:-VMContinuations or POLLIN from platform thread does master poller
761 if (event == Net.POLLIN) {
762 masterPoller.poll(fdVal, nanos, isOpen);
763 } else {
764 writePoller(fdVal).poll(fdVal, nanos, isOpen);
765 }
766 }
767
768 @Override
769 void pollSelector(int fdVal, long nanos) throws IOException {
770 masterPoller.poll(fdVal, nanos, () -> true);
771 }
772
773 @Override
774 int read(int fdVal, byte[] b, int off, int len, long nanos,
775 BooleanSupplier isOpen) throws IOException {
776 return readPoller().implRead(fdVal, b, off, len, nanos, isOpen);
777 }
778
779 @Override
780 int write(int fdVal, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException {
781 return writePoller(fdVal).implWrite(fdVal, b, off, len, isOpen);
782 }
783
784 /**
785 * Sub-poller polling loop.
786 */
787 private void subPollerLoop(Poller readPoller) {
788 try {
789 readPoller.subPollerLoop(masterPoller);
790 } finally {
791 // wakeup all threads waiting on file descriptors registered with the
792 // read poller, these I/O operation will migrate to another carrier.
793 readPoller.wakeupAll();
794
795 // remove from serviceability view
796 readPollers.remove(readPoller);
797 }
798 }
799
800 /**
801 * Invoked by the carrier thread before it terminates.
802 */
803 private void carrierTerminated(Poller readPoller) {
804 readPoller.setShutdown();
805 try {
806 readPoller.wakeupPoller();
807 } catch (Throwable e) {
808 e.printStackTrace();
809 }
810 }
811
812 @Override
813 Poller masterPoller() {
814 return masterPoller;
815 }
816
817 @Override
818 List<Poller> readPollers() {
819 return readPollers.stream().toList();
820 }
821
822 @Override
823 List<Poller> writePollers() {
824 return List.of(writePollers);
825 }
826 }
827
828 /**
829 * Reads the given property name to get the poller count. If the property is
830 * set then the value must be a power of 2. Returns 1 if the property is not
831 * set.
832 * @throws IllegalArgumentException if the property is set to a value that
833 * is not a power of 2.
834 */
835 private static int pollerCount(String propName, int defaultCount) {
836 String s = System.getProperty(propName);
837 int count = (s != null) ? Integer.parseInt(s) : defaultCount;
838
839 // check power of 2
840 if (count != Integer.highestOneBit(count)) {
841 String msg = propName + " is set to a value that is not a power of 2";
842 throw new IllegalArgumentException(msg);
843 }
844 return count;
845 }
846
847
848 /**
849 * Returns true if read ops are supported in addition to POLLIN polling.
850 */
851 public static boolean supportReadOps() {
852 return POLLER_GROUP.supportReadOps();
853 }
854
855 /**
856 * Returns true if write ops are supported in addition to POLLOUT polling.
857 */
858 public static boolean supportWriteOps() {
859 return POLLER_GROUP.supportWriteOps();
860 }
861
862 /**
863 * Parks the current thread until bytes are read into a byte array.
864 * @param isOpen supplies a boolean to indicate if the enclosing object is open
865 * @return the number of bytes read (>0), EOF (-1), or UNAVAILABLE (-2) if unparked
866 * or the timeout expires while waiting for bytes to be read
867 * @throws UnsupportedOperationException if not supported
868 */
869 public static int read(int fdVal, byte[] b, int off, int len, long nanos,
870 BooleanSupplier isOpen) throws IOException {
871 return POLLER_GROUP.read(fdVal, b, off, len, nanos, isOpen);
872 }
873
874 /**
875 * Parks the current thread until bytes are written from a byte array.
876 * @param isOpen supplies a boolean to indicate if the enclosing object is open
877 * @return the number of bytes read (>0), EOF (-1), or UNAVAILABLE (-2) if unparked
878 * or the timeout expires while waiting for bytes to be read
879 * @throws UnsupportedOperationException if not supported
880 */
881 public static int write(int fdVal, byte[] b, int off, int len,
882 BooleanSupplier isOpen) throws IOException {
883 return POLLER_GROUP.write(fdVal, b, off, len, isOpen);
884 }
885
886 /**
887 * Parks the current thread until bytes are read a byte array. This method is
888 * overridden by poller implementations that support this operation.
889 */
890 int implRead(int fdVal, byte[] b, int off, int len, long nanos,
891 BooleanSupplier isOpen) throws IOException {
892 throw new UnsupportedOperationException();
893 }
894
895 /**
896 * Parks the current thread until bytes are written from a byte array. This
897 * method is overridden by poller implementations that support this operation.
898 */
899 int implWrite(int fdVal, byte[] b, int off, int len,
900 BooleanSupplier isOpen) throws IOException {
901 throw new UnsupportedOperationException();
902 }
903
904 /**
905 * Return the master poller or null if there is no master poller.
906 */
907 public static Poller masterPoller() {
908 return POLLER_GROUP.masterPoller();
909 }
910
911 /**
912 * Return the list of read pollers.
913 */
914 public static List<Poller> readPollers() {
915 return POLLER_GROUP.readPollers();
916 }
917
918 /**
919 * Return the list of write pollers.
920 */
921 public static List<Poller> writePollers() {
922 return POLLER_GROUP.writePollers();
923 }
924 }
|