< prev index next >

src/java.base/share/classes/sun/nio/ch/Poller.java

Print this page

  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package sun.nio.ch;
 26 
 27 import java.io.IOException;


 28 import java.util.Arrays;
 29 import java.util.List;
 30 import java.util.Map;
 31 import java.util.Objects;
 32 import java.util.concurrent.ConcurrentHashMap;
 33 import java.util.concurrent.Executor;
 34 import java.util.concurrent.Executors;
 35 import java.util.concurrent.ThreadFactory;
 36 import java.util.concurrent.locks.LockSupport;
 37 import java.util.function.BooleanSupplier;


 38 import jdk.internal.misc.InnocuousThread;



 39 import jdk.internal.vm.annotation.Stable;
 40 
 41 /**
 42  * Polls file descriptors. Virtual threads invoke the poll method to park
 43  * until a given file descriptor is ready for I/O.
 44  */
 45 public abstract class Poller {
 46     private static final Pollers POLLERS;


 47     static {
 48         try {
 49             var pollers = new Pollers();
 50             pollers.start();
 51             POLLERS = pollers;






 52         } catch (IOException ioe) {
 53             throw new ExceptionInInitializerError(ioe);
 54         }
 55     }
 56 
 57     // the poller or sub-poller thread
 58     private @Stable Thread owner;
 59 
 60     // maps file descriptors to parked Thread
 61     private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
 62 



 63     /**
 64      * Poller mode.
 65      */
 66     enum Mode {
 67         /**
 68          * ReadPoller and WritePoller are dedicated platform threads that block waiting
 69          * for events and unpark virtual threads when file descriptors are ready for I/O.
 70          */
 71         SYSTEM_THREADS,
 72 
 73         /**
 74          * ReadPoller and WritePoller threads are virtual threads that poll for events,
 75          * yielding between polls and unparking virtual threads when file descriptors are
 76          * ready for I/O. If there are no events then the poller threads park until there
 77          * are I/O events to poll. This mode helps to integrate polling with virtual
 78          * thread scheduling. The approach is similar to the default scheme in "User-level
 79          * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
 80          * (https://dl.acm.org/doi/10.1145/3379483).
 81          */
 82         VTHREAD_POLLERS






 83     }
 84 
 85     /**
 86      * Initialize a Poller.
 87      */
 88     protected Poller() {
 89     }
 90 
 91     /**
 92      * Returns the poller's file descriptor, used when the read and write poller threads
 93      * are virtual threads.
 94      *
































 95      * @throws UnsupportedOperationException if not supported
 96      */
 97     int fdVal() {
 98         throw new UnsupportedOperationException();
 99     }
100 






101     /**
102      * Register the file descriptor. The registration is "one shot", meaning it should
103      * be polled at most once.
104      */
105     abstract void implRegister(int fdVal) throws IOException;
106 
107     /**
108      * Deregister the file descriptor.
109      * @param polled true if the file descriptor has already been polled
110      */
111     abstract void implDeregister(int fdVal, boolean polled);
112 
113     /**
114      * Poll for events. The {@link #polled(int)} method is invoked for each
115      * polled file descriptor.
116      *
117      * @param timeout if positive then block for up to {@code timeout} milliseconds,
118      *     if zero then don't block, if -1 then block indefinitely
119      * @return the number of file descriptors polled
120      */
121     abstract int poll(int timeout) throws IOException;
122 









123     /**
124      * Callback by the poll method when a file descriptor is polled.
125      */
126     final void polled(int fdVal) {
127         wakeup(fdVal);



128     }
129 
130     /**
131      * Parks the current thread until a file descriptor is ready for the given op.
132      * @param fdVal the file descriptor
133      * @param event POLLIN or POLLOUT
134      * @param nanos the waiting time or 0 to wait indefinitely
135      * @param supplier supplies a boolean to indicate if the enclosing object is open
136      */
137     static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier)
138         throws IOException
139     {
140         assert nanos >= 0L;
141         if (event == Net.POLLIN) {
142             POLLERS.readPoller(fdVal).poll(fdVal, nanos, supplier);
143         } else if (event == Net.POLLOUT) {
144             POLLERS.writePoller(fdVal).poll(fdVal, nanos, supplier);
145         } else {
146             assert false;
147         }
148     }
149 
150     /**
151      * Parks the current thread until a Selector's file descriptor is ready.
152      * @param fdVal the Selector's file descriptor
153      * @param nanos the waiting time or 0 to wait indefinitely
154      */
155     static void pollSelector(int fdVal, long nanos) throws IOException {
156         assert nanos >= 0L;
157         Poller poller = POLLERS.masterPoller();
158         if (poller == null) {
159             poller = POLLERS.readPoller(fdVal);
160         }
161         poller.poll(fdVal, nanos, () -> true);
162     }
163 
164     /**
165      * If there is a thread polling the given file descriptor for the given event then
166      * the thread is unparked.
167      */
168     static void stopPoll(int fdVal, int event) {
169         if (event == Net.POLLIN) {
170             POLLERS.readPoller(fdVal).wakeup(fdVal);
171         } else if (event == Net.POLLOUT) {
172             POLLERS.writePoller(fdVal).wakeup(fdVal);
173         } else {
174             throw new IllegalArgumentException();
175         }
176     }
177 
178     /**
179      * If there are any threads polling the given file descriptor then they are unparked.
180      */
181     static void stopPoll(int fdVal) {
182         stopPoll(fdVal, Net.POLLIN);
183         stopPoll(fdVal, Net.POLLOUT);
184     }
185 
186     /**
187      * Parks the current thread until a file descriptor is ready.
188      */
189     private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException {
190         register(fdVal);
191         try {
192             boolean isOpen = supplier.getAsBoolean();
193             if (isOpen) {
194                 if (nanos > 0) {
195                     LockSupport.parkNanos(nanos);
196                 } else {
197                     LockSupport.park();
198                 }
199             }
200         } finally {
201             deregister(fdVal);
202         }
203     }
204 
205     /**
206      * Registers the file descriptor to be polled at most once when the file descriptor
207      * is ready for I/O.
208      */
209     private void register(int fdVal) throws IOException {
210         Thread previous = map.put(fdVal, Thread.currentThread());
211         assert previous == null;
212         try {
213             implRegister(fdVal);
214         } catch (Throwable t) {
215             map.remove(fdVal);
216             throw t;


217         }
218     }
219 
220     /**
221      * Deregister the file descriptor so that the file descriptor is not polled.
222      */
223     private void deregister(int fdVal) {
224         Thread previous = map.remove(fdVal);
225         boolean polled = (previous == null);
226         assert polled || previous == Thread.currentThread();
227         implDeregister(fdVal, polled);
228     }
229 
230     /**
231      * Unparks any thread that is polling the given file descriptor.
232      */
233     private void wakeup(int fdVal) {
234         Thread t = map.remove(fdVal);
235         if (t != null) {
236             LockSupport.unpark(t);
237         }
238     }
239 
240     /**
241      * Master polling loop. The {@link #polled(int)} method is invoked for each file
242      * descriptor that is polled.
243      */
244     private void pollerLoop() {
245         owner = Thread.currentThread();
246         try {
247             for (;;) {
248                 poll(-1);
249             }
250         } catch (Exception e) {
251             e.printStackTrace();
252         }
253     }
254 
255     /**
256      * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
257      * descriptor that is polled.
258      *
259      * The sub-poller registers its file descriptor with the master poller to park until
260      * there are events to poll. When unparked, it does non-blocking polls and parks
261      * again when there are no more events. The sub-poller yields after each poll to help
262      * with fairness and to avoid re-registering with the master poller where possible.
263      */
264     private void subPollerLoop(Poller masterPoller) {
265         assert Thread.currentThread().isVirtual();
266         owner = Thread.currentThread();
267         try {
268             int polled = 0;
269             for (;;) {
270                 if (polled == 0) {
271                     masterPoller.poll(fdVal(), 0, () -> true);  // park

272                 } else {
273                     Thread.yield();
274                 }
275                 polled = poll(0);
276             }
277         } catch (Exception e) {
278             e.printStackTrace();
279         }
280     }
281 
282     /**
283      * Returns the number I/O operations currently registered with this poller.
284      */
285     public int registered() {
286         return map.size();
287     }
288 
289     @Override
290     public String toString() {
291         return String.format("%s [registered = %d, owner = %s]",
292                 Objects.toIdentityString(this), registered(), owner);
293     }
294 
295     /**
296      * The Pollers used for read and write events.
297      */
298     private static class Pollers {
299         private final PollerProvider provider;
300         private final Poller.Mode pollerMode;
301         private final Poller masterPoller;
302         private final Poller[] readPollers;
303         private final Poller[] writePollers;
304 
305         // used by start method to executor is kept alive
306         private Executor executor;





307 
308         /**
309          * Creates the Poller instances based on configuration.
310          */
311         Pollers() throws IOException {
312             PollerProvider provider = PollerProvider.provider();
313             Poller.Mode mode;
314             String s = System.getProperty("jdk.pollerMode");
315             if (s != null) {
316                 if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) {
317                     mode = Mode.SYSTEM_THREADS;
318                 } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) {
319                     mode = Mode.VTHREAD_POLLERS;
320                 } else {
321                     throw new RuntimeException("Can't parse '" + s + "' as polling mode");
322                 }
323             } else {
324                 mode = provider.defaultPollerMode();
325             }
326 
327             // vthread poller mode needs a master poller
328             Poller masterPoller = (mode == Mode.VTHREAD_POLLERS)
329                     ? provider.readPoller(false)
330                     : null;
331 
332             // read pollers (or sub-pollers)
333             int readPollerCount = pollerCount("jdk.readPollers", provider.defaultReadPollers(mode));
334             Poller[] readPollers = new Poller[readPollerCount];
335             for (int i = 0; i < readPollerCount; i++) {
336                 readPollers[i] = provider.readPoller(mode == Mode.VTHREAD_POLLERS);
337             }
338 
339             // write pollers (or sub-pollers)
340             int writePollerCount = pollerCount("jdk.writePollers", provider.defaultWritePollers(mode));










































341             Poller[] writePollers = new Poller[writePollerCount];
342             for (int i = 0; i < writePollerCount; i++) {
343                 writePollers[i] = provider.writePoller(mode == Mode.VTHREAD_POLLERS);









344             }
345 
346             this.provider = provider;
347             this.pollerMode = mode;
348             this.masterPoller = masterPoller;
349             this.readPollers = readPollers;
350             this.writePollers = writePollers;
351         }
352 
353         /**
354          * Starts the Poller threads.
355          */
356         void start() {
357             if (pollerMode == Mode.VTHREAD_POLLERS) {
358                 startPlatformThread("MasterPoller", masterPoller::pollerLoop);
359                 ThreadFactory factory = Thread.ofVirtual()
360                         .inheritInheritableThreadLocals(false)
361                         .name("SubPoller-", 0)
362                         .uncaughtExceptionHandler((t, e) -> e.printStackTrace())
363                         .factory();
364                 executor = Executors.newThreadPerTaskExecutor(factory);
365                 Arrays.stream(readPollers).forEach(p -> {
366                     executor.execute(() -> p.subPollerLoop(masterPoller));
367                 });
368                 Arrays.stream(writePollers).forEach(p -> {
369                     executor.execute(() -> p.subPollerLoop(masterPoller));
370                 });
371             } else {
372                 Arrays.stream(readPollers).forEach(p -> {
373                     startPlatformThread("Read-Poller", p::pollerLoop);
374                 });
375                 Arrays.stream(writePollers).forEach(p -> {
376                     startPlatformThread("Write-Poller", p::pollerLoop);
377                 });
378             }
379         }
380 
381         /**
382          * Returns the master poller, or null if there is no master poller.
383          */
384         Poller masterPoller() {
385             return masterPoller;






386         }
387 
388         /**
389          * Returns the read poller for the given file descriptor.
390          */
391         Poller readPoller(int fdVal) {
392             int index = provider.fdValToIndex(fdVal, readPollers.length);
393             return readPollers[index];
394         }
395 


































































396         /**
397          * Returns the write poller for the given file descriptor.
398          */
399         Poller writePoller(int fdVal) {
400             int index = provider.fdValToIndex(fdVal, writePollers.length);
401             return writePollers[index];









































































402         }
403 
404         /**
405          * Return the list of read pollers.
406          */
407         List<Poller> readPollers() {
408             return List.of(readPollers);










































































409         }
410 
411         /**
412          * Return the list of write pollers.

413          */
414         List<Poller> writePollers() {
415             return List.of(writePollers);
















416         }
417 


















































418 
419         /**
420          * Reads the given property name to get the poller count. If the property is
421          * set then the value must be a power of 2. Returns 1 if the property is not
422          * set.
423          * @throws IllegalArgumentException if the property is set to a value that
424          * is not a power of 2.
425          */
426         private static int pollerCount(String propName, int defaultCount) {
427             String s = System.getProperty(propName);
428             int count = (s != null) ? Integer.parseInt(s) : defaultCount;
429 
430             // check power of 2
431             if (count != Integer.highestOneBit(count)) {
432                 String msg = propName + " is set to a value that is not a power of 2";
433                 throw new IllegalArgumentException(msg);
434             }
435             return count;
436         }
437 
438         /**
439          * Starts a platform thread to run the given task.
440          */
441         private void startPlatformThread(String name, Runnable task) {
442             try {
443                 Thread thread = InnocuousThread.newSystemThread(name, task);
444                 thread.setDaemon(true);
445                 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
446                 thread.start();
447             } catch (Exception e) {
448                 throw new InternalError(e);
449             }
450         }






































































451     }
452 
453     /**
454      * Return the master poller or null if there is no master poller.
455      */
456     public static Poller masterPoller() {
457         return POLLERS.masterPoller();
458     }
459 
460     /**
461      * Return the list of read pollers.
462      */
463     public static List<Poller> readPollers() {
464         return POLLERS.readPollers();
465     }
466 
467     /**
468      * Return the list of write pollers.
469      */
470     public static List<Poller> writePollers() {
471         return POLLERS.writePollers();
472     }
473 }

  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package sun.nio.ch;
 26 
 27 import java.io.IOException;
 28 import java.io.UncheckedIOException;
 29 import java.lang.ref.Reference;
 30 import java.util.Arrays;
 31 import java.util.List;
 32 import java.util.Map;
 33 import java.util.Objects;
 34 import java.util.concurrent.ConcurrentHashMap;
 35 import java.util.concurrent.ExecutorService;
 36 import java.util.concurrent.Executors;
 37 import java.util.concurrent.ThreadFactory;
 38 import java.util.concurrent.locks.LockSupport;
 39 import java.util.function.BooleanSupplier;
 40 import jdk.internal.access.JavaLangAccess;
 41 import jdk.internal.access.SharedSecrets;
 42 import jdk.internal.misc.InnocuousThread;
 43 import jdk.internal.misc.TerminatingThreadLocal;
 44 import jdk.internal.vm.Continuation;
 45 import jdk.internal.vm.ContinuationSupport;
 46 import jdk.internal.vm.annotation.Stable;
 47 
 48 /**
 49  * Polls file descriptors. Virtual threads invoke the poll method to park
 50  * until a given file descriptor is ready for I/O.
 51  */
 52 public abstract class Poller {
 53     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 54 
 55     private static final PollerGroup POLLER_GROUP;
 56     static {
 57         try {
 58             PollerProvider provider = PollerProvider.provider();
 59             Mode mode = pollerMode(provider.defaultPollerMode());
 60             PollerGroup group = switch (mode) {
 61                 case SYSTEM_THREADS -> new SystemThreadsPollerGroup(provider);
 62                 case VTHREAD_POLLERS -> new VirtualThreadsPollerGroup(provider);
 63                 case PER_CARRIER -> new PerCarrierPollerGroup(provider);
 64             };
 65             group.start();
 66             POLLER_GROUP = group;
 67         } catch (IOException ioe) {
 68             throw new ExceptionInInitializerError(ioe);
 69         }
 70     }
 71 
 72     // the poller or sub-poller thread
 73     private @Stable Thread owner;
 74 
 75     // maps file descriptors to parked Thread
 76     private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
 77 
 78     // shutdown if supported by poller group.
 79     private volatile boolean shutdown;
 80 
 81     /**
 82      * Poller mode.
 83      */
 84     enum Mode {
 85         /**
 86          * ReadPoller and WritePoller are dedicated platform threads that block waiting
 87          * for events and unpark virtual threads when file descriptors are ready for I/O.
 88          */
 89         SYSTEM_THREADS,
 90 
 91         /**
 92          * ReadPoller and WritePoller threads are virtual threads that poll for events,
 93          * yielding between polls and unparking virtual threads when file descriptors are
 94          * ready for I/O. If there are no events then the poller threads park until there
 95          * are I/O events to poll. This mode helps to integrate polling with virtual
 96          * thread scheduling. The approach is similar to the default scheme in "User-level
 97          * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
 98          * (https://dl.acm.org/doi/10.1145/3379483).
 99          */
100         VTHREAD_POLLERS,
101 
102         /**
103          * A dedicated ReadPoller is created for each carrier thread.
104          * There is one system-wide (carrier agnostic) WritePoller.
105          */
106         PER_CARRIER
107     }
108 
109     /**
110      * Initialize a Poller.
111      */
112     protected Poller() {
113     }
114 
115     /**
116      * Closes the poller and release resources. This method can only be used to cleanup
117      * when creating a poller group fails.
118      */
119     abstract void close() throws IOException;
120 
121     /**
122      * Returns the poller's thread owner.
123      */
124     private Thread owner() {
125         return owner;
126     }
127 
128     /**
129      * Sets the poller's thread owner.
130      */
131     private void setOwner() {
132         owner = Thread.currentThread();
133     }
134 
135     /**
136      * Returns true if this poller is marked for shutdown.
137      */
138     final boolean isShutdown() {
139         return shutdown;
140     }
141 
142     /**
143      * Marks this poller for shutdown.
144      */
145     private void setShutdown() {
146         shutdown = true;
147     }
148 
149     /**
150      * Returns the poller's file descriptor to use when polling with the master poller.
151      * @throws UnsupportedOperationException if not supported
152      */
153     int fdVal() {
154         throw new UnsupportedOperationException();
155     }
156 
157     /**
158      * Invoked if when this poller's file descriptor is polled by the master poller.
159      */
160     void pollerPolled() throws IOException {
161     }
162 
163     /**
164      * Register the file descriptor. The registration is "one shot", meaning it should
165      * be polled at most once.
166      */
167     abstract void implRegister(int fdVal) throws IOException;
168 
169     /**
170      * Deregister the file descriptor.
171      * @param polled true if the file descriptor has already been polled
172      */
173     abstract void implDeregister(int fdVal, boolean polled) throws IOException;
174 
175     /**
176      * Poll for events. The {@link #polled(int)} method is invoked for each
177      * polled file descriptor.
178      *
179      * @param timeout if positive then block for up to {@code timeout} milliseconds,
180      *     if zero then don't block, if -1 then block indefinitely
181      * @return >0 if file descriptors are polled, 0 if no file descriptor polled
182      */
183     abstract int poll(int timeout) throws IOException;
184 
185     /**
186      * Wakeup the poller thread if blocked in poll.
187      *
188      * @throws UnsupportedOperationException if not supported
189      */
190     void wakeupPoller() throws IOException {
191         throw new UnsupportedOperationException();
192     }
193 
194     /**
195      * Callback by the poll method when a file descriptor is polled.
196      */
197     final void polled(int fdVal) {
198         Thread t = map.remove(fdVal);
199         if (t != null) {
200             LockSupport.unpark(t);
201         }
202     }
203 
204     /**
205      * Parks the current thread until a file descriptor is ready for the given op.
206      * @param fdVal the file descriptor
207      * @param event POLLIN or POLLOUT
208      * @param nanos the waiting time or 0 to wait indefinitely
209      * @param isOpen supplies a boolean to indicate if the enclosing object is open
210      */
211     static void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
212         POLLER_GROUP.poll(fdVal, event, nanos, isOpen);









213     }
214 
215     /**
216      * Parks the current thread until a Selector's file descriptor is ready.
217      * @param fdVal the Selector's file descriptor
218      * @param nanos the waiting time or 0 to wait indefinitely
219      */
220     static void pollSelector(int fdVal, long nanos) throws IOException {
221         POLLER_GROUP.pollSelector(fdVal, nanos);



















222     }
223 
224     /**
225      * Unpark the given thread so that it stops polling.
226      */
227     static void stopPoll(Thread thread) {
228         LockSupport.unpark(thread);

229     }
230 
231     /**
232      * Parks the current thread until a file descriptor is ready.
233      */
234     private void poll(int fdVal, long nanos, BooleanSupplier isOpen) throws IOException {
235         register(fdVal);
236         try {
237             if (isOpen.getAsBoolean() && !isShutdown()) {

238                 if (nanos > 0) {
239                     LockSupport.parkNanos(nanos);
240                 } else {
241                     LockSupport.park();
242                 }
243             }
244         } finally {
245             deregister(fdVal);
246         }
247     }
248 
249     /**
250      * Registers the file descriptor to be polled at most once when the file descriptor
251      * is ready for I/O.
252      */
253     private void register(int fdVal) throws IOException {
254         Thread previous = map.put(fdVal, Thread.currentThread());
255         assert previous == null;
256         try {
257             implRegister(fdVal);
258         } catch (Throwable t) {
259             map.remove(fdVal);
260             throw t;
261         } finally {
262             Reference.reachabilityFence(this);
263         }
264     }
265 
266     /**
267      * Deregister the file descriptor so that the file descriptor is not polled.
268      */
269     private void deregister(int fdVal) throws IOException {
270         Thread previous = map.remove(fdVal);
271         boolean polled = (previous == null);
272         assert polled || previous == Thread.currentThread();
273         try {
274             implDeregister(fdVal, polled);
275         } finally {
276             Reference.reachabilityFence(this);






277         }
278     }
279 
280     /**
281      * Master polling loop. The {@link #polled(int)} method is invoked for each file
282      * descriptor that is polled.
283      */
284     private void pollerLoop() {
285         setOwner();
286         try {
287             while (!isShutdown()) {
288                 poll(-1);
289             }
290         } catch (Exception e) {
291             e.printStackTrace();
292         }
293     }
294 
295     /**
296      * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
297      * descriptor that is polled.
298      *
299      * The sub-poller registers its file descriptor with the master poller to park until
300      * there are events to poll. When unparked, it does non-blocking polls and parks
301      * again when there are no more events. The sub-poller yields after each poll to help
302      * with fairness and to avoid re-registering with the master poller where possible.
303      */
304     private void subPollerLoop(Poller masterPoller) {
305         assert Thread.currentThread().isVirtual();
306         setOwner();
307         try {
308             int polled = 0;
309             while (!isShutdown()) {
310                 if (polled == 0) {
311                     masterPoller.poll(fdVal(), 0, () -> true);  // park
312                     pollerPolled();
313                 } else {
314                     Thread.yield();
315                 }
316                 polled = poll(0);
317             }
318         } catch (Exception e) {
319             e.printStackTrace();
320         }
321     }
322 
323     /**
324      * Unparks all threads waiting on a file descriptor registered with this poller.
325      */
326     private void wakeupAll() {
327         map.values().forEach(LockSupport::unpark);
328     }
329 
330     @Override
331     public String toString() {
332         return String.format("%s [registered = %d, owner = %s]",
333             Objects.toIdentityString(this), map.size(), owner);
334     }
335 
336     /**
337      * A group of poller threads that support virtual threads polling file descriptors.
338      */
339     private static abstract class PollerGroup {
340         private final PollerProvider provider;




341 
342         PollerGroup(PollerProvider provider) {
343             this.provider = provider;
344         }
345 
346         PollerProvider provider() {
347             return provider;
348         }
349 
350         /**
351          * Starts the poller group and any system-wide poller threads.
352          */
353         abstract void start();














354 
355         /**
356          * Parks the current thread until a file descriptor is ready for the given op.
357          */
358         abstract void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException;
359 
360        /**
361         * Parks the current thread until a Selector's file descriptor is ready.
362         */
363         void pollSelector(int fdVal, long nanos) throws IOException {
364             poll(fdVal, Net.POLLIN, nanos, () -> true);
365         }
366 
367         /**
368          * Starts a platform thread to run the given task.
369          */
370         protected final void startPlatformThread(String name, Runnable task) {
371             Thread thread = InnocuousThread.newSystemThread(name, task);
372             thread.setDaemon(true);
373             thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
374             thread.start();
375         }
376 
377         /**
378          * Return the master poller for default scheduler, or null if no master poller.
379          */
380         abstract Poller defaultMasterPoller();
381 
382         /**
383          * Return the read pollers for the default scheduler.
384          */
385         abstract List<Poller> defaultReadPollers();
386 
387         /**
388          * Return the write pollers for the default scheduler.
389          */
390         abstract List<Poller> defaultWritePollers();
391     }
392 
393     /**
394      * The poller group for the SYSTEM_THREADS polling mode. The read and write pollers
395      * are system-wide (scheduler agnostic) platform threads.
396      */
397     private static class SystemThreadsPollerGroup extends PollerGroup {
398         // system-wide read and write pollers
399         private final Poller[] readPollers;
400         private final Poller[] writePollers;
401 
402         SystemThreadsPollerGroup(PollerProvider provider) throws IOException {
403             super(provider);
404 
405             int readPollerCount = pollerCount("jdk.readPollers",
406                     provider.defaultReadPollers(Mode.SYSTEM_THREADS));
407             int writePollerCount = pollerCount("jdk.writePollers",
408                     provider.defaultWritePollers(Mode.SYSTEM_THREADS));
409 
410             Poller[] readPollers = new Poller[readPollerCount];
411             Poller[] writePollers = new Poller[writePollerCount];
412             try {
413                 for (int i = 0; i < readPollerCount; i++) {
414                     readPollers[i] = provider.readPoller(false);
415                 }
416                 for (int i = 0; i < writePollerCount; i++) {
417                     writePollers[i] = provider.writePoller(false);
418                 }
419             } catch (Throwable e) {
420                 closeAll(readPollers);
421                 closeAll(writePollers);
422                 throw e;
423             }
424 



425             this.readPollers = readPollers;
426             this.writePollers = writePollers;
427         }
428 
429         /**
430          * Close the given pollers.
431          */
432         private static void closeAll(Poller... pollers) {
433             for (Poller poller : pollers) {
434                 if (poller != null) {
435                     try {
436                         poller.close();
437                     } catch (IOException _) { }
438                 }















439             }
440         }
441 
442         /**
443          * Start poller threads.
444          */
445         @Override
446         void start() {
447             Arrays.stream(readPollers).forEach(p -> {
448                 startPlatformThread("Read-Poller", p::pollerLoop);
449             });
450             Arrays.stream(writePollers).forEach(p -> {
451                 startPlatformThread("Write-Poller", p::pollerLoop);
452             });
453         }
454 
455         private Poller readPoller(int fdVal) {
456             int index = provider().fdValToIndex(fdVal, readPollers.length);



457             return readPollers[index];
458         }
459 
460         private Poller writePoller(int fdVal) {
461             int index = provider().fdValToIndex(fdVal, writePollers.length);
462             return writePollers[index];
463         }
464 
465         @Override
466         void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
467             Poller poller = (event == Net.POLLIN)
468                     ? readPoller(fdVal)
469                     : writePoller(fdVal);
470             poller.poll(fdVal, nanos, isOpen);
471         }
472 
473         @Override
474         Poller defaultMasterPoller() {
475             return null;
476         }
477 
478         @Override
479         List<Poller> defaultReadPollers() {
480             return List.of(readPollers);
481         }
482 
483         @Override
484         List<Poller> defaultWritePollers() {
485             return List.of(writePollers);
486         }
487     }
488 
489     /**
490      * The poller group for the VTHREAD_POLLERS poller mode. The read and write pollers
491      * are scheduler-specific virtual threads. The default scheduler may have many read
492      * and write pollers. Custom schedulers have one read poller and one write poller.
493      * When read and write pollers must block then they register with a system-wide
494      * (scheduler agnostic) "master poller" that runs in a dedicated platform thread.
495      */
496     private static class VirtualThreadsPollerGroup extends PollerGroup {
497         private static final Thread.VirtualThreadScheduler DEFAULT_SCHEDULER = JLA.defaultVirtualThreadScheduler();
498 
499         // system-wide master poller
500         private final Poller masterPoller;
501 
502         // number of read and write pollers for default scheduler
503         private final int defaultReadPollerCount;
504         private final int defaultWritePollerCount;
505 
506         // maps scheduler to a set of read and write pollers
507         private record Pollers(ExecutorService executor, Poller[] readPollers, Poller[] writePollers) { }
508         private final Map<Thread.VirtualThreadScheduler, Pollers> POLLERS = new ConcurrentHashMap<>();
509 
510         VirtualThreadsPollerGroup(PollerProvider provider) throws IOException {
511             super(provider);
512 
513             var mode = Mode.VTHREAD_POLLERS;
514             this.defaultReadPollerCount = pollerCount("jdk.readPollers",
515                     provider.defaultReadPollers(mode));
516             this.defaultWritePollerCount = pollerCount("jdk.writePollers",
517                     provider.defaultWritePollers(mode));
518             this.masterPoller = provider.readPoller(false);
519         }
520 
521         @Override
522         void start() {
523             startPlatformThread("Master-Poller", masterPoller::pollerLoop);
524         }
525 
526         /**
527          * Create the read and write pollers for the given scheduler.
528          */
529         private Pollers createPollers(Thread.VirtualThreadScheduler scheduler) {
530             int readPollerCount;
531             int writePollerCount;
532             if (scheduler == DEFAULT_SCHEDULER) {
533                 readPollerCount = defaultReadPollerCount;
534                 writePollerCount = defaultWritePollerCount;
535             } else {
536                 readPollerCount = 1;
537                 writePollerCount = 1;
538             }
539 
540             Poller[] readPollers = new Poller[readPollerCount];
541             Poller[] writePollers = new Poller[writePollerCount];
542             try {
543                 for (int i = 0; i < readPollerCount; i++) {
544                     readPollers[i] = provider().readPoller(true);
545                 }
546                 for (int i = 0; i < writePollerCount; i++) {
547                     writePollers[i] = provider().writePoller(true);
548                 }
549             } catch (IOException ioe) {
550                 closeAll(readPollers);
551                 closeAll(writePollers);
552                 throw new UncheckedIOException(ioe);
553             }
554 
555             @SuppressWarnings("restricted")
556             ThreadFactory factory = Thread.ofVirtual()
557                     .scheduler(scheduler)
558                     .inheritInheritableThreadLocals(false)
559                     .name("SubPoller-", 0)
560                     .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
561                     .factory();
562             ExecutorService executor = Executors.newThreadPerTaskExecutor(factory);
563 
564             Arrays.stream(readPollers).forEach(p -> {
565                 executor.execute(() -> p.subPollerLoop(masterPoller));
566             });
567             Arrays.stream(writePollers).forEach(p -> {
568                 executor.execute(() -> p.subPollerLoop(masterPoller));
569             });
570 
571             return new Pollers(executor, readPollers, writePollers);
572         }
573 
574         @Override
575         void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
576             Thread.VirtualThreadScheduler scheduler;
577             if (Thread.currentThread().isVirtual()) {
578                 scheduler = JLA.virtualThreadScheduler(Thread.currentThread());
579             } else {
580                 scheduler = DEFAULT_SCHEDULER;
581             }
582             Pollers pollers;
583             try {
584                 pollers = POLLERS.computeIfAbsent(scheduler, _ -> createPollers(scheduler));
585             } catch (UncheckedIOException uioe) {
586                 throw uioe.getCause();
587             }
588 
589             Poller poller;
590             if (event == Net.POLLIN) {
591                 Poller[] readPollers = pollers.readPollers();
592                 int index = provider().fdValToIndex(fdVal, readPollers.length);
593                 poller = readPollers[index];
594             } else {
595                 Poller[] writePollers = pollers.writePollers();
596                 int index = provider().fdValToIndex(fdVal, writePollers.length);
597                 poller = writePollers[index];
598             }
599             poller.poll(fdVal, nanos, isOpen);
600         }
601 
602         @Override
603         void pollSelector(int fdVal, long nanos) throws IOException {
604             masterPoller.poll(fdVal, nanos, () -> true);
605         }
606 
607         /**
608          * Close the given pollers.
609          */
610         private static void closeAll(Poller... pollers) {
611             for (Poller poller : pollers) {
612                 if (poller != null) {
613                     try {
614                         poller.close();
615                     } catch (IOException _) { }
616                 }
617             }
618         }
619 
620         @Override
621         Poller defaultMasterPoller() {
622             return masterPoller;
623         }
624 
625         @Override
626         List<Poller> defaultReadPollers() {
627             Pollers pollers = POLLERS.get(DEFAULT_SCHEDULER);
628             return (pollers != null) ? List.of(pollers.readPollers()) : List.of();
629         }
630 
631         @Override
632         List<Poller> defaultWritePollers() {
633             Pollers pollers = POLLERS.get(DEFAULT_SCHEDULER);
634             return (pollers != null) ? List.of(pollers.writePollers()) : List.of();
635         }
636     }
637 
638     /**
639      * The poller group for the PER_CARRIER polling mode. A dedicated read poller is
640      * created for each carrier thread. When a virtual thread polls a file descriptor
641      * for POLLIN, then it will use (almost always, not guaranteed) to see the read
642      * poller for its carrier. The read poller terminates if carrier thread terminates.
643      * There is one system-wide (carrier agnostic) write poller.
644      */
645     private static class PerCarrierPollerGroup extends PollerGroup {
646         private static final boolean USE_SUBPOLLER;
647         static {
648             String s = System.getProperty("jdk.useSubPoller");
649             USE_SUBPOLLER = (s == null) || s.isEmpty() || Boolean.parseBoolean(s);
650         }
651 
652         private static final TerminatingThreadLocal<PerCarrierPollerGroup> POLLER_GROUP =
653             new TerminatingThreadLocal<>() {
654                 @Override
655                 protected void threadTerminated(PerCarrierPollerGroup pollerGroup) {
656                     pollerGroup.carrierTerminated();
657                 }
658             };
659 
660         // -XX:-VMContinuations or called from platform thread
661         private static final Thread PLACEHOLDER = new Thread();
662 
663         // maps carrier thread to its read poller
664         private final Map<Thread, Poller> READ_POLLERS = new ConcurrentHashMap<>();
665 
666         private final Poller writePoller;
667         private final Poller masterPoller;
668 
669         PerCarrierPollerGroup(PollerProvider provider) throws IOException {
670             super(provider);
671 
672             this.writePoller = provider.writePoller(false);
673             if (USE_SUBPOLLER) {
674                 this.masterPoller = provider.readPoller(false);
675             } else {
676                 this.masterPoller = null;
677             }
678         }
679 
680         @Override
681         void start() {
682             startPlatformThread("Write-Poller", writePoller::pollerLoop);
683             if (masterPoller != null) {
684                 startPlatformThread("Master-Poller", masterPoller::pollerLoop);
685             }
686         }
687 
688         /**
689          * Starts a read poller with the given name.
690          * @throws UncheckedIOException if an I/O error occurs
691          */
692         private Poller startReadPoller(String name) {
693             try {
694                 Poller readPoller = provider().readPoller(USE_SUBPOLLER);
695                 if (USE_SUBPOLLER) {
696                     var scheduler = JLA.virtualThreadScheduler(Thread.currentThread());
697                     @SuppressWarnings("restricted")
698                     var _ = Thread.ofVirtual().scheduler(scheduler)
699                             .inheritInheritableThreadLocals(false)
700                             .name(name)
701                             .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
702                             .start(() -> subPollerLoop(readPoller));
703                 } else {
704                     startPlatformThread(name, () -> pollLoop(readPoller));
705                 }
706                 return readPoller;
707             } catch (IOException ioe) {
708                 throw new UncheckedIOException(ioe);
709             }
710         }
711 
712         @Override
713         void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
714             if (event == Net.POLLOUT) {
715                 writePoller.poll(fdVal, nanos, isOpen);
716                 return;
717             }
718 
719             assert event == Net.POLLIN;
720             if (Thread.currentThread().isVirtual() && ContinuationSupport.isSupported()) {
721                 Poller readPoller;
722                 // get read poller for this carrier
723                 Continuation.pin();
724                 try {
725                     Thread carrier = JLA.currentCarrierThread();
726                     try {
727                         readPoller = READ_POLLERS.computeIfAbsent(carrier, _ -> {
728                             String name = carrier.getName() + "-Read-Poller";
729                             return startReadPoller(name);
730                         });
731                     } catch (UncheckedIOException uioe) {
732                         throw uioe.getCause();
733                     }
734                     POLLER_GROUP.set(this);
735                 } finally {
736                     Continuation.unpin();
737                 }
738                 // may execute on a different carrier
739                 readPoller.poll(fdVal, nanos, isOpen);
740                 return;
741             }
742 
743             // -XX:-VMContinuations or called from platform thread
744             if (masterPoller != null) {
745                 masterPoller.poll(fdVal, nanos, isOpen);
746             } else {
747                 Poller readPoller;
748                 try {
749                     readPoller = READ_POLLERS.computeIfAbsent(PLACEHOLDER,
750                             _ -> startReadPoller("Read-Poller"));
751                 } catch (UncheckedIOException uioe) {
752                     throw uioe.getCause();
753                 }
754                 readPoller.poll(fdVal, nanos, isOpen);
755             }
756         }
757 
758         @Override
759         void pollSelector(int fdVal, long nanos) throws IOException {
760             masterPoller.poll(fdVal, nanos, () -> true);
761         }
762 
763         /**
764          * Read-poller polling loop.




765          */
766         private void pollLoop(Poller readPoller) {
767             try {
768                 readPoller.pollerLoop();
769             } finally {
770                 // wakeup all threads waiting on file descriptors registered with the
771                 // read poller, these I/O operation will migrate to another carrier.
772                 readPoller.wakeupAll();

773             }

774         }
775 
776         /**
777          * Read-poll sub-poller polling loop.
778          */
779         private void subPollerLoop(Poller readPoller) {
780             try {
781                 readPoller.subPollerLoop(masterPoller);
782             } finally {
783                 // wakeup all threads waiting on file descriptors registered with the
784                 // read poller, these I/O operation will migrate to another carrier.
785                 readPoller.wakeupAll();

786             }
787         }
788 
789         /**
790          * Invoked by the carrier thread before it terminates.
791          */
792         private void carrierTerminated() {
793             Poller readPoller = READ_POLLERS.remove(Thread.currentThread());
794             if (readPoller != null) {
795                 readPoller.setShutdown();
796                 try {
797                     readPoller.wakeupPoller();
798                 } catch (Throwable e) {
799                     e.printStackTrace();
800                 }
801             }
802         }
803 
804         @Override
805         Poller defaultMasterPoller() {
806             return masterPoller;
807         }
808 
809         @Override
810         List<Poller> defaultReadPollers() {
811             // return all read pollers for now.
812             return READ_POLLERS.values().stream().toList();
813         }
814 
815         @Override
816         List<Poller> defaultWritePollers() {
817             return List.of(writePoller);
818         }
819     }
820 
821     /**
822      * Returns the poller mode.
823      */
824     private static Mode pollerMode(Mode defaultPollerMode) {
825         String s = System.getProperty("jdk.pollerMode");
826         if (s != null) {
827             if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) {
828                 return Mode.SYSTEM_THREADS;
829             } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) {
830                 return Mode.VTHREAD_POLLERS;
831             } else if (s.equalsIgnoreCase(Mode.PER_CARRIER.name()) || s.equals("3")) {
832                 return Mode.PER_CARRIER;
833             } else {
834                 throw new RuntimeException("Can't parse '" + s + "' as polling mode");
835             }
836         } else {
837             return defaultPollerMode;
838         }
839     }
840 
841     /**
842      * Reads the given property name to get the poller count. If the property is
843      * set then the value must be a power of 2. Returns 1 if the property is not
844      * set.
845      * @throws IllegalArgumentException if the property is set to a value that
846      * is not a power of 2.
847      */
848     private static int pollerCount(String propName, int defaultCount) {
849         String s = System.getProperty(propName);
850         int count = (s != null) ? Integer.parseInt(s) : defaultCount;
851 
852         // check power of 2
853         if (count != Integer.highestOneBit(count)) {
854             String msg = propName + " is set to a value that is not a power of 2";
855             throw new IllegalArgumentException(msg);
856         }
857         return count;
858     }
859 
860     /**
861      * Return the master poller or null if there is no master poller.
862      */
863     public static Poller masterPoller() {
864         return POLLER_GROUP.defaultMasterPoller();
865     }
866 
867     /**
868      * Return the list of read pollers.
869      */
870     public static List<Poller> readPollers() {
871         return POLLER_GROUP.defaultReadPollers();
872     }
873 
874     /**
875      * Return the list of write pollers.
876      */
877     public static List<Poller> writePollers() {
878         return POLLER_GROUP.defaultWritePollers();
879     }
880 }
< prev index next >