< prev index next >

src/java.base/share/classes/sun/nio/ch/Poller.java

Print this page

  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package sun.nio.ch;
 26 
 27 import java.io.IOException;

 28 import java.util.Arrays;
 29 import java.util.List;
 30 import java.util.Map;
 31 import java.util.Objects;
 32 import java.util.concurrent.ConcurrentHashMap;
 33 import java.util.concurrent.Executor;
 34 import java.util.concurrent.Executors;
 35 import java.util.concurrent.ThreadFactory;
 36 import java.util.concurrent.locks.LockSupport;
 37 import java.util.function.BooleanSupplier;



 38 import jdk.internal.misc.InnocuousThread;
 39 import jdk.internal.vm.annotation.Stable;
 40 
 41 /**
 42  * Polls file descriptors. Virtual threads invoke the poll method to park
 43  * until a given file descriptor is ready for I/O.
 44  */
 45 public abstract class Poller {
 46     private static final Pollers POLLERS;
 47     static {
 48         try {
 49             var pollers = new Pollers();
 50             pollers.start();
 51             POLLERS = pollers;
 52         } catch (IOException ioe) {
 53             throw new ExceptionInInitializerError(ioe);
 54         }
 55     }



 56 
 57     // the poller or sub-poller thread
 58     private @Stable Thread owner;
 59 
 60     // maps file descriptors to parked Thread
 61     private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
 62 
 63     /**
 64      * Poller mode.
 65      */
 66     enum Mode {
 67         /**
 68          * ReadPoller and WritePoller are dedicated platform threads that block waiting
 69          * for events and unpark virtual threads when file descriptors are ready for I/O.
 70          */
 71         SYSTEM_THREADS,
 72 
 73         /**
 74          * ReadPoller and WritePoller threads are virtual threads that poll for events,
 75          * yielding between polls and unparking virtual threads when file descriptors are
 76          * ready for I/O. If there are no events then the poller threads park until there
 77          * are I/O events to poll. This mode helps to integrate polling with virtual
 78          * thread scheduling. The approach is similar to the default scheme in "User-level
 79          * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
 80          * (https://dl.acm.org/doi/10.1145/3379483).
 81          */
 82         VTHREAD_POLLERS
 83     }
 84 
 85     /**
 86      * Initialize a Poller.
 87      */
 88     protected Poller() {
 89     }
 90 






 91     /**
 92      * Returns the poller's file descriptor, used when the read and write poller threads
 93      * are virtual threads.
 94      *
 95      * @throws UnsupportedOperationException if not supported
 96      */
 97     int fdVal() {
 98         throw new UnsupportedOperationException();
 99     }
100 
101     /**
102      * Register the file descriptor. The registration is "one shot", meaning it should
103      * be polled at most once.
104      */
105     abstract void implRegister(int fdVal) throws IOException;
106 
107     /**
108      * Deregister the file descriptor.
109      * @param polled true if the file descriptor has already been polled
110      */

121     abstract int poll(int timeout) throws IOException;
122 
123     /**
124      * Callback by the poll method when a file descriptor is polled.
125      */
126     final void polled(int fdVal) {
127         wakeup(fdVal);
128     }
129 
130     /**
131      * Parks the current thread until a file descriptor is ready for the given op.
132      * @param fdVal the file descriptor
133      * @param event POLLIN or POLLOUT
134      * @param nanos the waiting time or 0 to wait indefinitely
135      * @param supplier supplies a boolean to indicate if the enclosing object is open
136      */
137     static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier)
138         throws IOException
139     {
140         assert nanos >= 0L;

141         if (event == Net.POLLIN) {
142             POLLERS.readPoller(fdVal).poll(fdVal, nanos, supplier);
143         } else if (event == Net.POLLOUT) {
144             POLLERS.writePoller(fdVal).poll(fdVal, nanos, supplier);
145         } else {
146             assert false;
147         }
148     }
149 
150     /**
151      * Parks the current thread until a Selector's file descriptor is ready.
152      * @param fdVal the Selector's file descriptor
153      * @param nanos the waiting time or 0 to wait indefinitely
154      */
155     static void pollSelector(int fdVal, long nanos) throws IOException {
156         assert nanos >= 0L;
157         Poller poller = POLLERS.masterPoller();

158         if (poller == null) {
159             poller = POLLERS.readPoller(fdVal);
160         }
161         poller.poll(fdVal, nanos, () -> true);
162     }
163 
164     /**
165      * If there is a thread polling the given file descriptor for the given event then
166      * the thread is unparked.
167      */
168     static void stopPoll(int fdVal, int event) {
169         if (event == Net.POLLIN) {
170             POLLERS.readPoller(fdVal).wakeup(fdVal);
171         } else if (event == Net.POLLOUT) {
172             POLLERS.writePoller(fdVal).wakeup(fdVal);
173         } else {
174             throw new IllegalArgumentException();
175         }
176     }
177 
178     /**
179      * If there are any threads polling the given file descriptor then they are unparked.
180      */
181     static void stopPoll(int fdVal) {
182         stopPoll(fdVal, Net.POLLIN);
183         stopPoll(fdVal, Net.POLLOUT);
184     }
185 
186     /**
187      * Parks the current thread until a file descriptor is ready.
188      */
189     private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException {
190         register(fdVal);
191         try {
192             boolean isOpen = supplier.getAsBoolean();
193             if (isOpen) {
194                 if (nanos > 0) {
195                     LockSupport.parkNanos(nanos);
196                 } else {
197                     LockSupport.park();
198                 }
199             }
200         } finally {
201             deregister(fdVal);
202         }
203     }

244     private void pollerLoop() {
245         owner = Thread.currentThread();
246         try {
247             for (;;) {
248                 poll(-1);
249             }
250         } catch (Exception e) {
251             e.printStackTrace();
252         }
253     }
254 
255     /**
256      * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
257      * descriptor that is polled.
258      *
259      * The sub-poller registers its file descriptor with the master poller to park until
260      * there are events to poll. When unparked, it does non-blocking polls and parks
261      * again when there are no more events. The sub-poller yields after each poll to help
262      * with fairness and to avoid re-registering with the master poller where possible.
263      */
264     private void subPollerLoop(Poller masterPoller) {
265         assert Thread.currentThread().isVirtual();
266         owner = Thread.currentThread();
267         try {
268             int polled = 0;
269             for (;;) {
270                 if (polled == 0) {
271                     masterPoller.poll(fdVal(), 0, () -> true);  // park
272                 } else {
273                     Thread.yield();
274                 }
275                 polled = poll(0);
276             }
277         } catch (Exception e) {
278             e.printStackTrace();
279         }
280     }
281 
282     /**
283      * Returns the number I/O operations currently registered with this poller.
284      */
285     public int registered() {
286         return map.size();
287     }
288 
289     @Override
290     public String toString() {
291         return String.format("%s [registered = %d, owner = %s]",
292                 Objects.toIdentityString(this), registered(), owner);
293     }
294 
295     /**
296      * The Pollers used for read and write events.
297      */
298     private static class Pollers {
299         private final PollerProvider provider;
300         private final Poller.Mode pollerMode;
301         private final Poller masterPoller;
302         private final Poller[] readPollers;
303         private final Poller[] writePollers;
304 
305         // used by start method to executor is kept alive
306         private Executor executor;
307 
308         /**
309          * Creates the Poller instances based on configuration.
310          */
311         Pollers() throws IOException {
312             PollerProvider provider = PollerProvider.provider();
313             Poller.Mode mode;
314             String s = System.getProperty("jdk.pollerMode");
315             if (s != null) {
316                 if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) {
317                     mode = Mode.SYSTEM_THREADS;
318                 } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) {
319                     mode = Mode.VTHREAD_POLLERS;
320                 } else {
321                     throw new RuntimeException("Can't parse '" + s + "' as polling mode");
322                 }
323             } else {
324                 mode = provider.defaultPollerMode();






325             }
326 
327             // vthread poller mode needs a master poller
328             Poller masterPoller = (mode == Mode.VTHREAD_POLLERS)
329                     ? provider.readPoller(false)
330                     : null;
331 
332             // read pollers (or sub-pollers)
333             int readPollerCount = pollerCount("jdk.readPollers", provider.defaultReadPollers(mode));
334             Poller[] readPollers = new Poller[readPollerCount];
335             for (int i = 0; i < readPollerCount; i++) {
336                 readPollers[i] = provider.readPoller(mode == Mode.VTHREAD_POLLERS);
337             }
338 
339             // write pollers (or sub-pollers)
340             int writePollerCount = pollerCount("jdk.writePollers", provider.defaultWritePollers(mode));
341             Poller[] writePollers = new Poller[writePollerCount];
342             for (int i = 0; i < writePollerCount; i++) {
343                 writePollers[i] = provider.writePoller(mode == Mode.VTHREAD_POLLERS);









344             }
345 
346             this.provider = provider;
347             this.pollerMode = mode;
348             this.masterPoller = masterPoller;
349             this.readPollers = readPollers;
350             this.writePollers = writePollers;

351         }
352 
353         /**
354          * Starts the Poller threads.
355          */
356         void start() {
357             if (pollerMode == Mode.VTHREAD_POLLERS) {
358                 startPlatformThread("MasterPoller", masterPoller::pollerLoop);
359                 ThreadFactory factory = Thread.ofVirtual()
360                         .inheritInheritableThreadLocals(false)
361                         .name("SubPoller-", 0)
362                         .uncaughtExceptionHandler((t, e) -> e.printStackTrace())
363                         .factory();
364                 executor = Executors.newThreadPerTaskExecutor(factory);


































365                 Arrays.stream(readPollers).forEach(p -> {
366                     executor.execute(() -> p.subPollerLoop(masterPoller));
367                 });
368                 Arrays.stream(writePollers).forEach(p -> {
369                     executor.execute(() -> p.subPollerLoop(masterPoller));
370                 });
371             } else {

372                 Arrays.stream(readPollers).forEach(p -> {
373                     startPlatformThread("Read-Poller", p::pollerLoop);
374                 });
375                 Arrays.stream(writePollers).forEach(p -> {
376                     startPlatformThread("Write-Poller", p::pollerLoop);
377                 });
378             }
379         }
380 
381         /**
382          * Returns the master poller, or null if there is no master poller.
383          */
384         Poller masterPoller() {
385             return masterPoller;




386         }
387 
388         /**
389          * Returns the read poller for the given file descriptor.

390          */
391         Poller readPoller(int fdVal) {
392             int index = provider.fdValToIndex(fdVal, readPollers.length);
393             return readPollers[index];














394         }
395 
396         /**
397          * Returns the write poller for the given file descriptor.
398          */
399         Poller writePoller(int fdVal) {
400             int index = provider.fdValToIndex(fdVal, writePollers.length);
401             return writePollers[index];

402         }
403 
404         /**
405          * Return the list of read pollers.

406          */








407         List<Poller> readPollers() {
408             return List.of(readPollers);
409         }
410 
411         /**
412          * Return the list of write pollers.
413          */
414         List<Poller> writePollers() {
415             return List.of(writePollers);
416         }
417 















418 
419         /**
420          * Reads the given property name to get the poller count. If the property is
421          * set then the value must be a power of 2. Returns 1 if the property is not
422          * set.
423          * @throws IllegalArgumentException if the property is set to a value that
424          * is not a power of 2.
425          */
426         private static int pollerCount(String propName, int defaultCount) {
427             String s = System.getProperty(propName);
428             int count = (s != null) ? Integer.parseInt(s) : defaultCount;
429 
430             // check power of 2
431             if (count != Integer.highestOneBit(count)) {
432                 String msg = propName + " is set to a value that is not a power of 2";
433                 throw new IllegalArgumentException(msg);
434             }
435             return count;
436         }
437 
438         /**
439          * Starts a platform thread to run the given task.
440          */
441         private void startPlatformThread(String name, Runnable task) {
442             try {
443                 Thread thread = InnocuousThread.newSystemThread(name, task);
444                 thread.setDaemon(true);
445                 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
446                 thread.start();
447             } catch (Exception e) {
448                 throw new InternalError(e);
449             }
450         }
451     }
452 
















































453     /**
454      * Return the master poller or null if there is no master poller.
455      */
456     public static Poller masterPoller() {
457         return POLLERS.masterPoller();
458     }
459 
460     /**
461      * Return the list of read pollers.
462      */
463     public static List<Poller> readPollers() {
464         return POLLERS.readPollers();
465     }
466 
467     /**
468      * Return the list of write pollers.
469      */
470     public static List<Poller> writePollers() {
471         return POLLERS.writePollers();
472     }

473 }

  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package sun.nio.ch;
 26 
 27 import java.io.IOException;
 28 import java.io.UncheckedIOException;
 29 import java.util.Arrays;
 30 import java.util.List;
 31 import java.util.Map;
 32 import java.util.Objects;
 33 import java.util.concurrent.ConcurrentHashMap;
 34 import java.util.concurrent.Executor;
 35 import java.util.concurrent.Executors;
 36 import java.util.concurrent.ThreadFactory;
 37 import java.util.concurrent.locks.LockSupport;
 38 import java.util.function.BooleanSupplier;
 39 import java.util.function.Supplier;
 40 import jdk.internal.access.JavaLangAccess;
 41 import jdk.internal.access.SharedSecrets;
 42 import jdk.internal.misc.InnocuousThread;
 43 import jdk.internal.vm.annotation.Stable;
 44 
 45 /**
 46  * Polls file descriptors. Virtual threads invoke the poll method to park
 47  * until a given file descriptor is ready for I/O.
 48  */
 49 public abstract class Poller {
 50     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 51 
 52     private static final PollerProvider PROVIDER = PollerProvider.provider();
 53 
 54     private static final Mode POLLER_MODE = pollerMode();
 55 
 56     private static final Thread.VirtualThreadScheduler DEFAULT_SCHEDULER = JLA.defaultVirtualThreadScheduler();
 57 
 58     // poller group for default scheduler
 59     private static final Supplier<PollerGroup> DEFAULT_POLLER_GROUP = StableValue.supplier(PollerGroup::create);
 60 
 61     // maps scheduler to PollerGroup, custom schedulers can't be GC'ed at this time
 62     private static final Map<Thread.VirtualThreadScheduler, PollerGroup> POLLER_GROUPS = new ConcurrentHashMap<>();
 63 
 64     // the poller or sub-poller thread
 65     private @Stable Thread owner;
 66 
 67     // maps file descriptors to parked Thread
 68     private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
 69 
 70     /**
 71      * Poller mode.
 72      */
 73     enum Mode {
 74         /**
 75          * ReadPoller and WritePoller are dedicated platform threads that block waiting
 76          * for events and unpark virtual threads when file descriptors are ready for I/O.
 77          */
 78         SYSTEM_THREADS,
 79 
 80         /**
 81          * ReadPoller and WritePoller threads are virtual threads that poll for events,
 82          * yielding between polls and unparking virtual threads when file descriptors are
 83          * ready for I/O. If there are no events then the poller threads park until there
 84          * are I/O events to poll. This mode helps to integrate polling with virtual
 85          * thread scheduling. The approach is similar to the default scheme in "User-level
 86          * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
 87          * (https://dl.acm.org/doi/10.1145/3379483).
 88          */
 89         VTHREAD_POLLERS
 90     }
 91 
 92     /**
 93      * Initialize a Poller.
 94      */
 95     protected Poller() {
 96     }
 97 
 98     /**
 99      * Closes the poller and release resources. This method can only be used to cleanup
100      * when creating a poller group fails.
101      */
102     abstract void close();
103 
104     /**
105      * Returns the poller's file descriptor, used when the read and write poller threads
106      * are virtual threads.
107      *
108      * @throws UnsupportedOperationException if not supported
109      */
110     int fdVal() {
111         throw new UnsupportedOperationException();
112     }
113 
114     /**
115      * Register the file descriptor. The registration is "one shot", meaning it should
116      * be polled at most once.
117      */
118     abstract void implRegister(int fdVal) throws IOException;
119 
120     /**
121      * Deregister the file descriptor.
122      * @param polled true if the file descriptor has already been polled
123      */

134     abstract int poll(int timeout) throws IOException;
135 
136     /**
137      * Callback by the poll method when a file descriptor is polled.
138      */
139     final void polled(int fdVal) {
140         wakeup(fdVal);
141     }
142 
143     /**
144      * Parks the current thread until a file descriptor is ready for the given op.
145      * @param fdVal the file descriptor
146      * @param event POLLIN or POLLOUT
147      * @param nanos the waiting time or 0 to wait indefinitely
148      * @param supplier supplies a boolean to indicate if the enclosing object is open
149      */
150     static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier)
151         throws IOException
152     {
153         assert nanos >= 0L;
154         PollerGroup pollerGroup = pollerGroup(Thread.currentThread());
155         if (event == Net.POLLIN) {
156             pollerGroup.readPoller(fdVal).poll(fdVal, nanos, supplier);
157         } else if (event == Net.POLLOUT) {
158             pollerGroup.writePoller(fdVal).poll(fdVal, nanos, supplier);
159         } else {
160             assert false;
161         }
162     }
163 
164     /**
165      * Parks the current thread until a Selector's file descriptor is ready.
166      * @param fdVal the Selector's file descriptor
167      * @param nanos the waiting time or 0 to wait indefinitely
168      */
169     static void pollSelector(int fdVal, long nanos) throws IOException {
170         assert nanos >= 0L;
171         PollerGroup pollerGroup = pollerGroup(Thread.currentThread());
172         Poller poller = pollerGroup.masterPoller();
173         if (poller == null) {
174             poller = pollerGroup.readPoller(fdVal);
175         }
176         poller.poll(fdVal, nanos, () -> true);
177     }
178 
179     /**
180      * Unpark the given thread so that it stops polling.














181      */
182     static void stopPoll(Thread thread) {
183         LockSupport.unpark(thread);

184     }
185 
186     /**
187      * Parks the current thread until a file descriptor is ready.
188      */
189     private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException {
190         register(fdVal);
191         try {
192             boolean isOpen = supplier.getAsBoolean();
193             if (isOpen) {
194                 if (nanos > 0) {
195                     LockSupport.parkNanos(nanos);
196                 } else {
197                     LockSupport.park();
198                 }
199             }
200         } finally {
201             deregister(fdVal);
202         }
203     }

244     private void pollerLoop() {
245         owner = Thread.currentThread();
246         try {
247             for (;;) {
248                 poll(-1);
249             }
250         } catch (Exception e) {
251             e.printStackTrace();
252         }
253     }
254 
255     /**
256      * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
257      * descriptor that is polled.
258      *
259      * The sub-poller registers its file descriptor with the master poller to park until
260      * there are events to poll. When unparked, it does non-blocking polls and parks
261      * again when there are no more events. The sub-poller yields after each poll to help
262      * with fairness and to avoid re-registering with the master poller where possible.
263      */
264     private void subPollerLoop(PollerGroup pollerGroup, Poller masterPoller) {
265         assert Thread.currentThread().isVirtual();
266         owner = Thread.currentThread();
267         try {
268             int polled = 0;
269             while (!pollerGroup.isShutdown()) {
270                 if (polled == 0) {
271                     masterPoller.poll(fdVal(), 0, () -> true);  // park
272                 } else {
273                     Thread.yield();
274                 }
275                 polled = poll(0);
276             }
277         } catch (Exception e) {
278             e.printStackTrace();
279         }
280     }
281 







282     @Override
283     public String toString() {
284         return String.format("%s [registered = %d, owner = %s]",
285             Objects.toIdentityString(this), map.size(), owner);
286     }
287 
288     /**
289      * The read/write pollers for a virtual thread scheduler.
290      */
291     private static class PollerGroup {
292         private final Thread.VirtualThreadScheduler scheduler;


293         private final Poller[] readPollers;
294         private final Poller[] writePollers;
295         private final Poller masterPoller;
296         private final Executor executor;
297         private volatile boolean shutdown;
298 
299         PollerGroup(Thread.VirtualThreadScheduler scheduler,
300                     Poller masterPoller,
301                     int readPollerCount,
302                     int writePollerCount) throws IOException {
303             boolean subPoller = (POLLER_MODE == Mode.VTHREAD_POLLERS);
304             Executor executor = null;
305             if (subPoller) {
306                 String namePrefix;
307                 if (scheduler == DEFAULT_SCHEDULER) {
308                     namePrefix = "SubPoller-";


309                 } else {
310                     namePrefix = Objects.toIdentityString(scheduler) + "-SubPoller-";
311                 }
312                 @SuppressWarnings("restricted")
313                 ThreadFactory factory = Thread.ofVirtual()
314                         .scheduler(scheduler)
315                         .inheritInheritableThreadLocals(false)
316                         .name(namePrefix, 0)
317                         .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
318                         .factory();
319                 executor = Executors.newThreadPerTaskExecutor(factory);
320             }
321 
322             // read and write pollers (or sub-pollers)






323             Poller[] readPollers = new Poller[readPollerCount];






324             Poller[] writePollers = new Poller[writePollerCount];
325             try {
326                 for (int i = 0; i < readPollerCount; i++) {
327                     readPollers[i] = PROVIDER.readPoller(subPoller);
328                 }
329                 for (int i = 0; i < writePollerCount; i++) {
330                     writePollers[i] = PROVIDER.writePoller(subPoller);
331                 }
332             } catch (Exception e) {
333                 closeAll(readPollers);
334                 closeAll(writePollers);
335                 throw e;
336             }
337 
338             this.scheduler = scheduler;

339             this.masterPoller = masterPoller;
340             this.readPollers = readPollers;
341             this.writePollers = writePollers;
342             this.executor = executor;
343         }
344 
345         /**
346          * Create and starts the poller group for the default scheduler.
347          */
348         static PollerGroup create() {
349             try {
350                 Poller masterPoller = (POLLER_MODE == Mode.VTHREAD_POLLERS)
351                         ? PROVIDER.readPoller(false)
352                         : null;
353                 PollerGroup pollerGroup;
354                 try {
355                     int rc = pollerCount("jdk.readPollers", PROVIDER.defaultReadPollers(POLLER_MODE));
356                     int wc = pollerCount("jdk.writePollers", PROVIDER.defaultWritePollers(POLLER_MODE));
357                     pollerGroup = new PollerGroup(DEFAULT_SCHEDULER, masterPoller, rc, wc);
358                 } catch (Exception e) {
359                     masterPoller.close();
360                     throw e;
361                 }
362                 pollerGroup.start();
363                 return pollerGroup;
364             } catch (IOException ioe) {
365                 throw new UncheckedIOException(ioe);
366             }
367         }
368 
369         /**
370          * Create and starts the poller group for a custom scheduler.
371          */
372         static PollerGroup create(Thread.VirtualThreadScheduler scheduler) {
373             try {
374                 Poller masterPoller = DEFAULT_POLLER_GROUP.get().masterPoller();
375                 var pollerGroup = new PollerGroup(scheduler, masterPoller, 1, 1);
376                 pollerGroup.start();
377                 return pollerGroup;
378             } catch (IOException ioe) {
379                 throw new UncheckedIOException(ioe);
380             }
381         }
382 
383         /**
384          * Start poller threads.
385          */
386         private void start() {
387             if (POLLER_MODE == Mode.VTHREAD_POLLERS) {
388                 if (scheduler == DEFAULT_SCHEDULER) {
389                     startPlatformThread("Master-Poller", masterPoller::pollerLoop);
390                 }
391                 Arrays.stream(readPollers).forEach(p -> {
392                     executor.execute(() -> p.subPollerLoop(this, masterPoller));
393                 });
394                 Arrays.stream(writePollers).forEach(p -> {
395                     executor.execute(() -> p.subPollerLoop(this, masterPoller));
396                 });
397             } else {
398                 // Mode.SYSTEM_THREADS
399                 Arrays.stream(readPollers).forEach(p -> {
400                     startPlatformThread("Read-Poller", p::pollerLoop);
401                 });
402                 Arrays.stream(writePollers).forEach(p -> {
403                     startPlatformThread("Write-Poller", p::pollerLoop);
404                 });
405             }
406         }
407 
408         /**
409          * Close the given pollers.
410          */
411         private void closeAll(Poller... pollers) {
412             for (Poller poller : pollers) {
413                 if (poller != null) {
414                     poller.close();
415                 }
416             }
417         }
418 
419         /**
420          * Invoked during shutdown to unpark all subpoller threads and wait for
421          * them to terminate.
422          */
423         private void shutdownPollers(Poller... pollers) {
424             boolean interrupted = false;
425             for (Poller poller : pollers) {
426                 if (poller.owner instanceof Thread owner) {
427                     LockSupport.unpark(owner);
428                     while (owner.isAlive()) {
429                         try {
430                             owner.join();
431                         } catch (InterruptedException e) {
432                             interrupted = true;
433                         }
434                     }
435                 }
436             }
437             if (interrupted) {
438                 Thread.currentThread().interrupt();
439             }
440         }
441 
442         void shutdown() {
443             if (scheduler == DEFAULT_SCHEDULER || POLLER_MODE == Mode.SYSTEM_THREADS) {
444                 throw new UnsupportedOperationException();
445             }
446             shutdown = true;
447             shutdownPollers(readPollers);
448             shutdownPollers(writePollers);
449         }
450 
451         /**
452          *
453          * @return
454          */
455         boolean isShutdown() {
456             return shutdown;
457         }
458 
459         Poller masterPoller() {
460             return masterPoller;
461         }
462 
463         List<Poller> readPollers() {
464             return List.of(readPollers);
465         }
466 



467         List<Poller> writePollers() {
468             return List.of(writePollers);
469         }
470 
471         /**
472          * Returns the read poller for the given file descriptor.
473          */
474         Poller readPoller(int fdVal) {
475             int index = PROVIDER.fdValToIndex(fdVal, readPollers.length);
476             return readPollers[index];
477         }
478 
479         /**
480          * Returns the write poller for the given file descriptor.
481          */
482         Poller writePoller(int fdVal) {
483             int index = PROVIDER.fdValToIndex(fdVal, writePollers.length);
484             return writePollers[index];
485         }
486 
487         /**
488          * Reads the given property name to get the poller count. If the property is
489          * set then the value must be a power of 2. Returns 1 if the property is not
490          * set.
491          * @throws IllegalArgumentException if the property is set to a value that
492          * is not a power of 2.
493          */
494         private static int pollerCount(String propName, int defaultCount) {
495             String s = System.getProperty(propName);
496             int count = (s != null) ? Integer.parseInt(s) : defaultCount;
497 
498             // check power of 2
499             if (count != Integer.highestOneBit(count)) {
500                 String msg = propName + " is set to a value that is not a power of 2";
501                 throw new IllegalArgumentException(msg);
502             }
503             return count;
504         }
505 
506         /**
507          * Starts a platform thread to run the given task.
508          */
509         private void startPlatformThread(String name, Runnable task) {
510             try {
511                 Thread thread = InnocuousThread.newSystemThread(name, task);
512                 thread.setDaemon(true);
513                 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
514                 thread.start();
515             } catch (Exception e) {
516                 throw new InternalError(e);
517             }
518         }
519     }
520 
521     /**
522      * Returns the poller mode.
523      */
524     private static Mode pollerMode() {
525         String s = System.getProperty("jdk.pollerMode");
526         if (s != null) {
527             if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) {
528                 return Mode.SYSTEM_THREADS;
529             } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) {
530                 return Mode.VTHREAD_POLLERS;
531             } else {
532                 throw new RuntimeException("Can't parse '" + s + "' as polling mode");
533             }
534         } else {
535             return PROVIDER.defaultPollerMode();
536         }
537     }
538 
539     /**
540      * Returns the PollerGroup that the given thread uses to poll file descriptors.
541      */
542     private static PollerGroup pollerGroup(Thread thread) {
543         if (POLLER_MODE == Mode.SYSTEM_THREADS) {
544             return DEFAULT_POLLER_GROUP.get();
545         }
546         Thread.VirtualThreadScheduler scheduler;
547         if (thread.isVirtual()) {
548             scheduler = JLA.virtualThreadScheduler(thread);
549         } else {
550             scheduler = DEFAULT_SCHEDULER;
551         }
552         return POLLER_GROUPS.computeIfAbsent(scheduler, _ -> PollerGroup.create(scheduler));
553     }
554 
555     /**
556      * Invoked before the given scheduler is shutdown. In VTHREAD_POLLERS mode, this
557      * method will arrange for the sub poller threads to terminate. Does nothing in
558      * SYSTEM_THREADS mode.
559      */
560     public static void beforeShutdown(Executor executor) {
561         if (POLLER_MODE == Mode.VTHREAD_POLLERS) {
562             PollerGroup group = POLLER_GROUPS.remove(executor);
563             if (group != null) {
564                 group.shutdown();
565             }
566         }
567     }
568 
569     /**
570      * Return the master poller or null if there is no master poller.
571      */
572     public static Poller masterPoller() {
573         return DEFAULT_POLLER_GROUP.get().masterPoller();
574     }
575 
576     /**
577      * Return the list of read pollers.
578      */
579     public static List<Poller> readPollers() {
580         return DEFAULT_POLLER_GROUP.get().readPollers();
581     }
582 
583     /**
584      * Return the list of write pollers.
585      */
586     public static List<Poller> writePollers() {
587         return DEFAULT_POLLER_GROUP.get().writePollers();
588     }
589 
590 }
< prev index next >