8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package sun.nio.ch;
26
27 import java.io.IOException;
28 import java.util.Arrays;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Objects;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ThreadFactory;
36 import java.util.concurrent.locks.LockSupport;
37 import java.util.function.BooleanSupplier;
38 import jdk.internal.misc.InnocuousThread;
39 import jdk.internal.vm.annotation.Stable;
40
41 /**
42 * Polls file descriptors. Virtual threads invoke the poll method to park
43 * until a given file descriptor is ready for I/O.
44 */
45 public abstract class Poller {
46 private static final Pollers POLLERS;
47 static {
48 try {
49 var pollers = new Pollers();
50 pollers.start();
51 POLLERS = pollers;
52 } catch (IOException ioe) {
53 throw new ExceptionInInitializerError(ioe);
54 }
55 }
56
57 // the poller or sub-poller thread
58 private @Stable Thread owner;
59
60 // maps file descriptors to parked Thread
61 private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
62
63 /**
64 * Poller mode.
65 */
66 enum Mode {
67 /**
68 * ReadPoller and WritePoller are dedicated platform threads that block waiting
69 * for events and unpark virtual threads when file descriptors are ready for I/O.
70 */
71 SYSTEM_THREADS,
72
73 /**
74 * ReadPoller and WritePoller threads are virtual threads that poll for events,
75 * yielding between polls and unparking virtual threads when file descriptors are
76 * ready for I/O. If there are no events then the poller threads park until there
77 * are I/O events to poll. This mode helps to integrate polling with virtual
78 * thread scheduling. The approach is similar to the default scheme in "User-level
79 * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
80 * (https://dl.acm.org/doi/10.1145/3379483).
81 */
82 VTHREAD_POLLERS
83 }
84
85 /**
86 * Initialize a Poller.
87 */
88 protected Poller() {
89 }
90
91 /**
92 * Returns the poller's file descriptor, used when the read and write poller threads
93 * are virtual threads.
94 *
95 * @throws UnsupportedOperationException if not supported
96 */
97 int fdVal() {
98 throw new UnsupportedOperationException();
99 }
100
101 /**
102 * Register the file descriptor. The registration is "one shot", meaning it should
103 * be polled at most once.
104 */
105 abstract void implRegister(int fdVal) throws IOException;
106
107 /**
108 * Deregister the file descriptor.
109 * @param polled true if the file descriptor has already been polled
110 */
121 abstract int poll(int timeout) throws IOException;
122
123 /**
124 * Callback by the poll method when a file descriptor is polled.
125 */
126 final void polled(int fdVal) {
127 wakeup(fdVal);
128 }
129
130 /**
131 * Parks the current thread until a file descriptor is ready for the given op.
132 * @param fdVal the file descriptor
133 * @param event POLLIN or POLLOUT
134 * @param nanos the waiting time or 0 to wait indefinitely
135 * @param supplier supplies a boolean to indicate if the enclosing object is open
136 */
137 static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier)
138 throws IOException
139 {
140 assert nanos >= 0L;
141 if (event == Net.POLLIN) {
142 POLLERS.readPoller(fdVal).poll(fdVal, nanos, supplier);
143 } else if (event == Net.POLLOUT) {
144 POLLERS.writePoller(fdVal).poll(fdVal, nanos, supplier);
145 } else {
146 assert false;
147 }
148 }
149
150 /**
151 * Parks the current thread until a Selector's file descriptor is ready.
152 * @param fdVal the Selector's file descriptor
153 * @param nanos the waiting time or 0 to wait indefinitely
154 */
155 static void pollSelector(int fdVal, long nanos) throws IOException {
156 assert nanos >= 0L;
157 Poller poller = POLLERS.masterPoller();
158 if (poller == null) {
159 poller = POLLERS.readPoller(fdVal);
160 }
161 poller.poll(fdVal, nanos, () -> true);
162 }
163
164 /**
165 * If there is a thread polling the given file descriptor for the given event then
166 * the thread is unparked.
167 */
168 static void stopPoll(int fdVal, int event) {
169 if (event == Net.POLLIN) {
170 POLLERS.readPoller(fdVal).wakeup(fdVal);
171 } else if (event == Net.POLLOUT) {
172 POLLERS.writePoller(fdVal).wakeup(fdVal);
173 } else {
174 throw new IllegalArgumentException();
175 }
176 }
177
178 /**
179 * If there are any threads polling the given file descriptor then they are unparked.
180 */
181 static void stopPoll(int fdVal) {
182 stopPoll(fdVal, Net.POLLIN);
183 stopPoll(fdVal, Net.POLLOUT);
184 }
185
186 /**
187 * Parks the current thread until a file descriptor is ready.
188 */
189 private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException {
190 register(fdVal);
191 try {
192 boolean isOpen = supplier.getAsBoolean();
193 if (isOpen) {
194 if (nanos > 0) {
195 LockSupport.parkNanos(nanos);
196 } else {
197 LockSupport.park();
198 }
199 }
200 } finally {
201 deregister(fdVal);
202 }
203 }
244 private void pollerLoop() {
245 owner = Thread.currentThread();
246 try {
247 for (;;) {
248 poll(-1);
249 }
250 } catch (Exception e) {
251 e.printStackTrace();
252 }
253 }
254
255 /**
256 * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
257 * descriptor that is polled.
258 *
259 * The sub-poller registers its file descriptor with the master poller to park until
260 * there are events to poll. When unparked, it does non-blocking polls and parks
261 * again when there are no more events. The sub-poller yields after each poll to help
262 * with fairness and to avoid re-registering with the master poller where possible.
263 */
264 private void subPollerLoop(Poller masterPoller) {
265 assert Thread.currentThread().isVirtual();
266 owner = Thread.currentThread();
267 try {
268 int polled = 0;
269 for (;;) {
270 if (polled == 0) {
271 masterPoller.poll(fdVal(), 0, () -> true); // park
272 } else {
273 Thread.yield();
274 }
275 polled = poll(0);
276 }
277 } catch (Exception e) {
278 e.printStackTrace();
279 }
280 }
281
282 /**
283 * Returns the number I/O operations currently registered with this poller.
284 */
285 public int registered() {
286 return map.size();
287 }
288
289 @Override
290 public String toString() {
291 return String.format("%s [registered = %d, owner = %s]",
292 Objects.toIdentityString(this), registered(), owner);
293 }
294
295 /**
296 * The Pollers used for read and write events.
297 */
298 private static class Pollers {
299 private final PollerProvider provider;
300 private final Poller.Mode pollerMode;
301 private final Poller masterPoller;
302 private final Poller[] readPollers;
303 private final Poller[] writePollers;
304
305 // used by start method to executor is kept alive
306 private Executor executor;
307
308 /**
309 * Creates the Poller instances based on configuration.
310 */
311 Pollers() throws IOException {
312 PollerProvider provider = PollerProvider.provider();
313 Poller.Mode mode;
314 String s = System.getProperty("jdk.pollerMode");
315 if (s != null) {
316 if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) {
317 mode = Mode.SYSTEM_THREADS;
318 } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) {
319 mode = Mode.VTHREAD_POLLERS;
320 } else {
321 throw new RuntimeException("Can't parse '" + s + "' as polling mode");
322 }
323 } else {
324 mode = provider.defaultPollerMode();
325 }
326
327 // vthread poller mode needs a master poller
328 Poller masterPoller = (mode == Mode.VTHREAD_POLLERS)
329 ? provider.readPoller(false)
330 : null;
331
332 // read pollers (or sub-pollers)
333 int readPollerCount = pollerCount("jdk.readPollers", provider.defaultReadPollers(mode));
334 Poller[] readPollers = new Poller[readPollerCount];
335 for (int i = 0; i < readPollerCount; i++) {
336 readPollers[i] = provider.readPoller(mode == Mode.VTHREAD_POLLERS);
337 }
338
339 // write pollers (or sub-pollers)
340 int writePollerCount = pollerCount("jdk.writePollers", provider.defaultWritePollers(mode));
341 Poller[] writePollers = new Poller[writePollerCount];
342 for (int i = 0; i < writePollerCount; i++) {
343 writePollers[i] = provider.writePoller(mode == Mode.VTHREAD_POLLERS);
344 }
345
346 this.provider = provider;
347 this.pollerMode = mode;
348 this.masterPoller = masterPoller;
349 this.readPollers = readPollers;
350 this.writePollers = writePollers;
351 }
352
353 /**
354 * Starts the Poller threads.
355 */
356 void start() {
357 if (pollerMode == Mode.VTHREAD_POLLERS) {
358 startPlatformThread("MasterPoller", masterPoller::pollerLoop);
359 ThreadFactory factory = Thread.ofVirtual()
360 .inheritInheritableThreadLocals(false)
361 .name("SubPoller-", 0)
362 .uncaughtExceptionHandler((t, e) -> e.printStackTrace())
363 .factory();
364 executor = Executors.newThreadPerTaskExecutor(factory);
365 Arrays.stream(readPollers).forEach(p -> {
366 executor.execute(() -> p.subPollerLoop(masterPoller));
367 });
368 Arrays.stream(writePollers).forEach(p -> {
369 executor.execute(() -> p.subPollerLoop(masterPoller));
370 });
371 } else {
372 Arrays.stream(readPollers).forEach(p -> {
373 startPlatformThread("Read-Poller", p::pollerLoop);
374 });
375 Arrays.stream(writePollers).forEach(p -> {
376 startPlatformThread("Write-Poller", p::pollerLoop);
377 });
378 }
379 }
380
381 /**
382 * Returns the master poller, or null if there is no master poller.
383 */
384 Poller masterPoller() {
385 return masterPoller;
386 }
387
388 /**
389 * Returns the read poller for the given file descriptor.
390 */
391 Poller readPoller(int fdVal) {
392 int index = provider.fdValToIndex(fdVal, readPollers.length);
393 return readPollers[index];
394 }
395
396 /**
397 * Returns the write poller for the given file descriptor.
398 */
399 Poller writePoller(int fdVal) {
400 int index = provider.fdValToIndex(fdVal, writePollers.length);
401 return writePollers[index];
402 }
403
404 /**
405 * Return the list of read pollers.
406 */
407 List<Poller> readPollers() {
408 return List.of(readPollers);
409 }
410
411 /**
412 * Return the list of write pollers.
413 */
414 List<Poller> writePollers() {
415 return List.of(writePollers);
416 }
417
418
419 /**
420 * Reads the given property name to get the poller count. If the property is
421 * set then the value must be a power of 2. Returns 1 if the property is not
422 * set.
423 * @throws IllegalArgumentException if the property is set to a value that
424 * is not a power of 2.
425 */
426 private static int pollerCount(String propName, int defaultCount) {
427 String s = System.getProperty(propName);
428 int count = (s != null) ? Integer.parseInt(s) : defaultCount;
429
430 // check power of 2
431 if (count != Integer.highestOneBit(count)) {
432 String msg = propName + " is set to a value that is not a power of 2";
433 throw new IllegalArgumentException(msg);
434 }
435 return count;
436 }
437
438 /**
439 * Starts a platform thread to run the given task.
440 */
441 private void startPlatformThread(String name, Runnable task) {
442 try {
443 Thread thread = InnocuousThread.newSystemThread(name, task);
444 thread.setDaemon(true);
445 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
446 thread.start();
447 } catch (Exception e) {
448 throw new InternalError(e);
449 }
450 }
451 }
452
453 /**
454 * Return the master poller or null if there is no master poller.
455 */
456 public static Poller masterPoller() {
457 return POLLERS.masterPoller();
458 }
459
460 /**
461 * Return the list of read pollers.
462 */
463 public static List<Poller> readPollers() {
464 return POLLERS.readPollers();
465 }
466
467 /**
468 * Return the list of write pollers.
469 */
470 public static List<Poller> writePollers() {
471 return POLLERS.writePollers();
472 }
473 }
|
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package sun.nio.ch;
26
27 import java.io.IOException;
28 import java.io.UncheckedIOException;
29 import java.util.Arrays;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Objects;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.ThreadFactory;
37 import java.util.concurrent.locks.LockSupport;
38 import java.util.function.BooleanSupplier;
39 import java.util.function.Supplier;
40 import jdk.internal.access.JavaLangAccess;
41 import jdk.internal.access.SharedSecrets;
42 import jdk.internal.misc.InnocuousThread;
43 import jdk.internal.vm.annotation.Stable;
44
45 /**
46 * Polls file descriptors. Virtual threads invoke the poll method to park
47 * until a given file descriptor is ready for I/O.
48 */
49 public abstract class Poller {
50 private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
51
52 private static final PollerProvider PROVIDER = PollerProvider.provider();
53
54 private static final Mode POLLER_MODE = pollerMode();
55
56 private static final Thread.VirtualThreadScheduler DEFAULT_SCHEDULER = JLA.defaultVirtualThreadScheduler();
57
58 // poller group for default scheduler
59 private static final Supplier<PollerGroup> DEFAULT_POLLER_GROUP = StableValue.supplier(PollerGroup::create);
60
61 // maps scheduler to PollerGroup, custom schedulers can't be GC'ed at this time
62 private static final Map<Thread.VirtualThreadScheduler, PollerGroup> POLLER_GROUPS = new ConcurrentHashMap<>();
63
64 // the poller or sub-poller thread
65 private @Stable Thread owner;
66
67 // maps file descriptors to parked Thread
68 private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
69
70 /**
71 * Poller mode.
72 */
73 enum Mode {
74 /**
75 * ReadPoller and WritePoller are dedicated platform threads that block waiting
76 * for events and unpark virtual threads when file descriptors are ready for I/O.
77 */
78 SYSTEM_THREADS,
79
80 /**
81 * ReadPoller and WritePoller threads are virtual threads that poll for events,
82 * yielding between polls and unparking virtual threads when file descriptors are
83 * ready for I/O. If there are no events then the poller threads park until there
84 * are I/O events to poll. This mode helps to integrate polling with virtual
85 * thread scheduling. The approach is similar to the default scheme in "User-level
86 * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
87 * (https://dl.acm.org/doi/10.1145/3379483).
88 */
89 VTHREAD_POLLERS
90 }
91
92 /**
93 * Initialize a Poller.
94 */
95 protected Poller() {
96 }
97
98 /**
99 * Closes the poller and release resources. This method can only be used to cleanup
100 * when creating a poller group fails.
101 */
102 abstract void close();
103
104 /**
105 * Returns the poller's file descriptor, used when the read and write poller threads
106 * are virtual threads.
107 *
108 * @throws UnsupportedOperationException if not supported
109 */
110 int fdVal() {
111 throw new UnsupportedOperationException();
112 }
113
114 /**
115 * Register the file descriptor. The registration is "one shot", meaning it should
116 * be polled at most once.
117 */
118 abstract void implRegister(int fdVal) throws IOException;
119
120 /**
121 * Deregister the file descriptor.
122 * @param polled true if the file descriptor has already been polled
123 */
134 abstract int poll(int timeout) throws IOException;
135
136 /**
137 * Callback by the poll method when a file descriptor is polled.
138 */
139 final void polled(int fdVal) {
140 wakeup(fdVal);
141 }
142
143 /**
144 * Parks the current thread until a file descriptor is ready for the given op.
145 * @param fdVal the file descriptor
146 * @param event POLLIN or POLLOUT
147 * @param nanos the waiting time or 0 to wait indefinitely
148 * @param supplier supplies a boolean to indicate if the enclosing object is open
149 */
150 static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier)
151 throws IOException
152 {
153 assert nanos >= 0L;
154 PollerGroup pollerGroup = pollerGroup(Thread.currentThread());
155 if (event == Net.POLLIN) {
156 pollerGroup.readPoller(fdVal).poll(fdVal, nanos, supplier);
157 } else if (event == Net.POLLOUT) {
158 pollerGroup.writePoller(fdVal).poll(fdVal, nanos, supplier);
159 } else {
160 assert false;
161 }
162 }
163
164 /**
165 * Parks the current thread until a Selector's file descriptor is ready.
166 * @param fdVal the Selector's file descriptor
167 * @param nanos the waiting time or 0 to wait indefinitely
168 */
169 static void pollSelector(int fdVal, long nanos) throws IOException {
170 assert nanos >= 0L;
171 PollerGroup pollerGroup = pollerGroup(Thread.currentThread());
172 Poller poller = pollerGroup.masterPoller();
173 if (poller == null) {
174 poller = pollerGroup.readPoller(fdVal);
175 }
176 poller.poll(fdVal, nanos, () -> true);
177 }
178
179 /**
180 * Unpark the given thread so that it stops polling.
181 */
182 static void stopPoll(Thread thread) {
183 LockSupport.unpark(thread);
184 }
185
186 /**
187 * Parks the current thread until a file descriptor is ready.
188 */
189 private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException {
190 register(fdVal);
191 try {
192 boolean isOpen = supplier.getAsBoolean();
193 if (isOpen) {
194 if (nanos > 0) {
195 LockSupport.parkNanos(nanos);
196 } else {
197 LockSupport.park();
198 }
199 }
200 } finally {
201 deregister(fdVal);
202 }
203 }
244 private void pollerLoop() {
245 owner = Thread.currentThread();
246 try {
247 for (;;) {
248 poll(-1);
249 }
250 } catch (Exception e) {
251 e.printStackTrace();
252 }
253 }
254
255 /**
256 * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
257 * descriptor that is polled.
258 *
259 * The sub-poller registers its file descriptor with the master poller to park until
260 * there are events to poll. When unparked, it does non-blocking polls and parks
261 * again when there are no more events. The sub-poller yields after each poll to help
262 * with fairness and to avoid re-registering with the master poller where possible.
263 */
264 private void subPollerLoop(PollerGroup pollerGroup, Poller masterPoller) {
265 assert Thread.currentThread().isVirtual();
266 owner = Thread.currentThread();
267 try {
268 int polled = 0;
269 while (!pollerGroup.isShutdown()) {
270 if (polled == 0) {
271 masterPoller.poll(fdVal(), 0, () -> true); // park
272 } else {
273 Thread.yield();
274 }
275 polled = poll(0);
276 }
277 } catch (Exception e) {
278 e.printStackTrace();
279 }
280 }
281
282 @Override
283 public String toString() {
284 return String.format("%s [registered = %d, owner = %s]",
285 Objects.toIdentityString(this), map.size(), owner);
286 }
287
288 /**
289 * The read/write pollers for a virtual thread scheduler.
290 */
291 private static class PollerGroup {
292 private final Thread.VirtualThreadScheduler scheduler;
293 private final Poller[] readPollers;
294 private final Poller[] writePollers;
295 private final Poller masterPoller;
296 private final Executor executor;
297 private volatile boolean shutdown;
298
299 PollerGroup(Thread.VirtualThreadScheduler scheduler,
300 Poller masterPoller,
301 int readPollerCount,
302 int writePollerCount) throws IOException {
303 boolean subPoller = (POLLER_MODE == Mode.VTHREAD_POLLERS);
304 Executor executor = null;
305 if (subPoller) {
306 String namePrefix;
307 if (scheduler == DEFAULT_SCHEDULER) {
308 namePrefix = "SubPoller-";
309 } else {
310 namePrefix = Objects.toIdentityString(scheduler) + "-SubPoller-";
311 }
312 @SuppressWarnings("restricted")
313 ThreadFactory factory = Thread.ofVirtual()
314 .scheduler(scheduler)
315 .inheritInheritableThreadLocals(false)
316 .name(namePrefix, 0)
317 .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
318 .factory();
319 executor = Executors.newThreadPerTaskExecutor(factory);
320 }
321
322 // read and write pollers (or sub-pollers)
323 Poller[] readPollers = new Poller[readPollerCount];
324 Poller[] writePollers = new Poller[writePollerCount];
325 try {
326 for (int i = 0; i < readPollerCount; i++) {
327 readPollers[i] = PROVIDER.readPoller(subPoller);
328 }
329 for (int i = 0; i < writePollerCount; i++) {
330 writePollers[i] = PROVIDER.writePoller(subPoller);
331 }
332 } catch (Exception e) {
333 closeAll(readPollers);
334 closeAll(writePollers);
335 throw e;
336 }
337
338 this.scheduler = scheduler;
339 this.masterPoller = masterPoller;
340 this.readPollers = readPollers;
341 this.writePollers = writePollers;
342 this.executor = executor;
343 }
344
345 /**
346 * Create and starts the poller group for the default scheduler.
347 */
348 static PollerGroup create() {
349 try {
350 Poller masterPoller = (POLLER_MODE == Mode.VTHREAD_POLLERS)
351 ? PROVIDER.readPoller(false)
352 : null;
353 PollerGroup pollerGroup;
354 try {
355 int rc = pollerCount("jdk.readPollers", PROVIDER.defaultReadPollers(POLLER_MODE));
356 int wc = pollerCount("jdk.writePollers", PROVIDER.defaultWritePollers(POLLER_MODE));
357 pollerGroup = new PollerGroup(DEFAULT_SCHEDULER, masterPoller, rc, wc);
358 } catch (Exception e) {
359 masterPoller.close();
360 throw e;
361 }
362 pollerGroup.start();
363 return pollerGroup;
364 } catch (IOException ioe) {
365 throw new UncheckedIOException(ioe);
366 }
367 }
368
369 /**
370 * Create and starts the poller group for a custom scheduler.
371 */
372 static PollerGroup create(Thread.VirtualThreadScheduler scheduler) {
373 try {
374 Poller masterPoller = DEFAULT_POLLER_GROUP.get().masterPoller();
375 var pollerGroup = new PollerGroup(scheduler, masterPoller, 1, 1);
376 pollerGroup.start();
377 return pollerGroup;
378 } catch (IOException ioe) {
379 throw new UncheckedIOException(ioe);
380 }
381 }
382
383 /**
384 * Start poller threads.
385 */
386 private void start() {
387 if (POLLER_MODE == Mode.VTHREAD_POLLERS) {
388 if (scheduler == DEFAULT_SCHEDULER) {
389 startPlatformThread("Master-Poller", masterPoller::pollerLoop);
390 }
391 Arrays.stream(readPollers).forEach(p -> {
392 executor.execute(() -> p.subPollerLoop(this, masterPoller));
393 });
394 Arrays.stream(writePollers).forEach(p -> {
395 executor.execute(() -> p.subPollerLoop(this, masterPoller));
396 });
397 } else {
398 // Mode.SYSTEM_THREADS
399 Arrays.stream(readPollers).forEach(p -> {
400 startPlatformThread("Read-Poller", p::pollerLoop);
401 });
402 Arrays.stream(writePollers).forEach(p -> {
403 startPlatformThread("Write-Poller", p::pollerLoop);
404 });
405 }
406 }
407
408 /**
409 * Close the given pollers.
410 */
411 private void closeAll(Poller... pollers) {
412 for (Poller poller : pollers) {
413 if (poller != null) {
414 poller.close();
415 }
416 }
417 }
418
419 /**
420 * Invoked during shutdown to unpark all subpoller threads and wait for
421 * them to terminate.
422 */
423 private void shutdownPollers(Poller... pollers) {
424 boolean interrupted = false;
425 for (Poller poller : pollers) {
426 if (poller.owner instanceof Thread owner) {
427 LockSupport.unpark(owner);
428 while (owner.isAlive()) {
429 try {
430 owner.join();
431 } catch (InterruptedException e) {
432 interrupted = true;
433 }
434 }
435 }
436 }
437 if (interrupted) {
438 Thread.currentThread().interrupt();
439 }
440 }
441
442 void shutdown() {
443 if (scheduler == DEFAULT_SCHEDULER || POLLER_MODE == Mode.SYSTEM_THREADS) {
444 throw new UnsupportedOperationException();
445 }
446 shutdown = true;
447 shutdownPollers(readPollers);
448 shutdownPollers(writePollers);
449 }
450
451 /**
452 *
453 * @return
454 */
455 boolean isShutdown() {
456 return shutdown;
457 }
458
459 Poller masterPoller() {
460 return masterPoller;
461 }
462
463 List<Poller> readPollers() {
464 return List.of(readPollers);
465 }
466
467 List<Poller> writePollers() {
468 return List.of(writePollers);
469 }
470
471 /**
472 * Returns the read poller for the given file descriptor.
473 */
474 Poller readPoller(int fdVal) {
475 int index = PROVIDER.fdValToIndex(fdVal, readPollers.length);
476 return readPollers[index];
477 }
478
479 /**
480 * Returns the write poller for the given file descriptor.
481 */
482 Poller writePoller(int fdVal) {
483 int index = PROVIDER.fdValToIndex(fdVal, writePollers.length);
484 return writePollers[index];
485 }
486
487 /**
488 * Reads the given property name to get the poller count. If the property is
489 * set then the value must be a power of 2. Returns 1 if the property is not
490 * set.
491 * @throws IllegalArgumentException if the property is set to a value that
492 * is not a power of 2.
493 */
494 private static int pollerCount(String propName, int defaultCount) {
495 String s = System.getProperty(propName);
496 int count = (s != null) ? Integer.parseInt(s) : defaultCount;
497
498 // check power of 2
499 if (count != Integer.highestOneBit(count)) {
500 String msg = propName + " is set to a value that is not a power of 2";
501 throw new IllegalArgumentException(msg);
502 }
503 return count;
504 }
505
506 /**
507 * Starts a platform thread to run the given task.
508 */
509 private void startPlatformThread(String name, Runnable task) {
510 try {
511 Thread thread = InnocuousThread.newSystemThread(name, task);
512 thread.setDaemon(true);
513 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
514 thread.start();
515 } catch (Exception e) {
516 throw new InternalError(e);
517 }
518 }
519 }
520
521 /**
522 * Returns the poller mode.
523 */
524 private static Mode pollerMode() {
525 String s = System.getProperty("jdk.pollerMode");
526 if (s != null) {
527 if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) {
528 return Mode.SYSTEM_THREADS;
529 } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) {
530 return Mode.VTHREAD_POLLERS;
531 } else {
532 throw new RuntimeException("Can't parse '" + s + "' as polling mode");
533 }
534 } else {
535 return PROVIDER.defaultPollerMode();
536 }
537 }
538
539 /**
540 * Returns the PollerGroup that the given thread uses to poll file descriptors.
541 */
542 private static PollerGroup pollerGroup(Thread thread) {
543 if (POLLER_MODE == Mode.SYSTEM_THREADS) {
544 return DEFAULT_POLLER_GROUP.get();
545 }
546 Thread.VirtualThreadScheduler scheduler;
547 if (thread.isVirtual()) {
548 scheduler = JLA.virtualThreadScheduler(thread);
549 } else {
550 scheduler = DEFAULT_SCHEDULER;
551 }
552 return POLLER_GROUPS.computeIfAbsent(scheduler, _ -> PollerGroup.create(scheduler));
553 }
554
555 /**
556 * Invoked before the given scheduler is shutdown. In VTHREAD_POLLERS mode, this
557 * method will arrange for the sub poller threads to terminate. Does nothing in
558 * SYSTEM_THREADS mode.
559 */
560 public static void beforeShutdown(Executor executor) {
561 if (POLLER_MODE == Mode.VTHREAD_POLLERS) {
562 PollerGroup group = POLLER_GROUPS.remove(executor);
563 if (group != null) {
564 group.shutdown();
565 }
566 }
567 }
568
569 /**
570 * Return the master poller or null if there is no master poller.
571 */
572 public static Poller masterPoller() {
573 return DEFAULT_POLLER_GROUP.get().masterPoller();
574 }
575
576 /**
577 * Return the list of read pollers.
578 */
579 public static List<Poller> readPollers() {
580 return DEFAULT_POLLER_GROUP.get().readPollers();
581 }
582
583 /**
584 * Return the list of write pollers.
585 */
586 public static List<Poller> writePollers() {
587 return DEFAULT_POLLER_GROUP.get().writePollers();
588 }
589
590 }
|