< prev index next >

src/java.base/share/classes/sun/nio/ch/Poller.java

Print this page

  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package sun.nio.ch;
 26 
 27 import java.io.IOException;


 28 import java.util.Arrays;
 29 import java.util.List;
 30 import java.util.Map;
 31 import java.util.Objects;

 32 import java.util.concurrent.ConcurrentHashMap;
 33 import java.util.concurrent.Executor;
 34 import java.util.concurrent.Executors;
 35 import java.util.concurrent.ThreadFactory;
 36 import java.util.concurrent.locks.LockSupport;
 37 import java.util.function.BooleanSupplier;


 38 import jdk.internal.misc.InnocuousThread;



 39 import jdk.internal.vm.annotation.Stable;
 40 
 41 /**
 42  * Polls file descriptors. Virtual threads invoke the poll method to park
 43  * until a given file descriptor is ready for I/O.
 44  */
 45 public abstract class Poller {
 46     private static final Pollers POLLERS;
 47     static {
 48         try {
 49             var pollers = new Pollers();
 50             pollers.start();
 51             POLLERS = pollers;
 52         } catch (IOException ioe) {
 53             throw new ExceptionInInitializerError(ioe);
 54         }
 55     }
 56 
 57     // the poller or sub-poller thread



 58     private @Stable Thread owner;
 59 
 60     // maps file descriptors to parked Thread
 61     private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
 62 



 63     /**
 64      * Poller mode.
 65      */
 66     enum Mode {
 67         /**
 68          * ReadPoller and WritePoller are dedicated platform threads that block waiting
 69          * for events and unpark virtual threads when file descriptors are ready for I/O.
 70          */
 71         SYSTEM_THREADS,
 72 
 73         /**
 74          * ReadPoller and WritePoller threads are virtual threads that poll for events,
 75          * yielding between polls and unparking virtual threads when file descriptors are
 76          * ready for I/O. If there are no events then the poller threads park until there
 77          * are I/O events to poll. This mode helps to integrate polling with virtual
 78          * thread scheduling. The approach is similar to the default scheme in "User-level
 79          * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
 80          * (https://dl.acm.org/doi/10.1145/3379483).
 81          */
 82         VTHREAD_POLLERS










































 83     }
 84 
 85     /**
 86      * Initialize a Poller.
 87      */
 88     protected Poller() {
 89     }
 90 
 91     /**
 92      * Returns the poller's file descriptor, used when the read and write poller threads
 93      * are virtual threads.
 94      *

























 95      * @throws UnsupportedOperationException if not supported
 96      */
 97     int fdVal() {
 98         throw new UnsupportedOperationException();
 99     }
100 






101     /**
102      * Register the file descriptor. The registration is "one shot", meaning it should
103      * be polled at most once.
104      */
105     abstract void implRegister(int fdVal) throws IOException;
106 
107     /**
108      * Deregister the file descriptor.
109      * @param polled true if the file descriptor has already been polled
110      */
111     abstract void implDeregister(int fdVal, boolean polled);
112 
113     /**
114      * Poll for events. The {@link #polled(int)} method is invoked for each
115      * polled file descriptor.
116      *
117      * @param timeout if positive then block for up to {@code timeout} milliseconds,
118      *     if zero then don't block, if -1 then block indefinitely
119      * @return the number of file descriptors polled
120      */
121     abstract int poll(int timeout) throws IOException;
122 









123     /**
124      * Callback by the poll method when a file descriptor is polled.
125      */
126     final void polled(int fdVal) {
127         wakeup(fdVal);



128     }
129 
130     /**
131      * Parks the current thread until a file descriptor is ready for the given op.
132      * @param fdVal the file descriptor
133      * @param event POLLIN or POLLOUT
134      * @param nanos the waiting time or 0 to wait indefinitely
135      * @param supplier supplies a boolean to indicate if the enclosing object is open
136      */
137     static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier)
138         throws IOException
139     {
140         assert nanos >= 0L;
141         if (event == Net.POLLIN) {
142             POLLERS.readPoller(fdVal).poll(fdVal, nanos, supplier);
143         } else if (event == Net.POLLOUT) {
144             POLLERS.writePoller(fdVal).poll(fdVal, nanos, supplier);
145         } else {
146             assert false;
147         }
148     }
149 
150     /**
151      * Parks the current thread until a Selector's file descriptor is ready.
152      * @param fdVal the Selector's file descriptor
153      * @param nanos the waiting time or 0 to wait indefinitely
154      */
155     static void pollSelector(int fdVal, long nanos) throws IOException {
156         assert nanos >= 0L;
157         Poller poller = POLLERS.masterPoller();
158         if (poller == null) {
159             poller = POLLERS.readPoller(fdVal);
160         }
161         poller.poll(fdVal, nanos, () -> true);
162     }
163 
164     /**
165      * If there is a thread polling the given file descriptor for the given event then
166      * the thread is unparked.
167      */
168     static void stopPoll(int fdVal, int event) {
169         if (event == Net.POLLIN) {
170             POLLERS.readPoller(fdVal).wakeup(fdVal);
171         } else if (event == Net.POLLOUT) {
172             POLLERS.writePoller(fdVal).wakeup(fdVal);
173         } else {
174             throw new IllegalArgumentException();
175         }
176     }
177 
178     /**
179      * If there are any threads polling the given file descriptor then they are unparked.
180      */
181     static void stopPoll(int fdVal) {
182         stopPoll(fdVal, Net.POLLIN);
183         stopPoll(fdVal, Net.POLLOUT);
184     }
185 
186     /**
187      * Parks the current thread until a file descriptor is ready.
188      */
189     private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException {
190         register(fdVal);
191         try {
192             boolean isOpen = supplier.getAsBoolean();
193             if (isOpen) {
194                 if (nanos > 0) {
195                     LockSupport.parkNanos(nanos);
196                 } else {
197                     LockSupport.park();
198                 }
199             }
200         } finally {
201             deregister(fdVal);
202         }
203     }
204 
205     /**
206      * Registers the file descriptor to be polled at most once when the file descriptor
207      * is ready for I/O.
208      */
209     private void register(int fdVal) throws IOException {
210         Thread previous = map.put(fdVal, Thread.currentThread());
211         assert previous == null;
212         try {
213             implRegister(fdVal);
214         } catch (Throwable t) {
215             map.remove(fdVal);
216             throw t;


217         }
218     }
219 
220     /**
221      * Deregister the file descriptor so that the file descriptor is not polled.
222      */
223     private void deregister(int fdVal) {
224         Thread previous = map.remove(fdVal);
225         boolean polled = (previous == null);
226         assert polled || previous == Thread.currentThread();
227         implDeregister(fdVal, polled);
228     }
229 
230     /**
231      * Unparks any thread that is polling the given file descriptor.
232      */
233     private void wakeup(int fdVal) {
234         Thread t = map.remove(fdVal);
235         if (t != null) {
236             LockSupport.unpark(t);
237         }
238     }
239 
240     /**
241      * Master polling loop. The {@link #polled(int)} method is invoked for each file
242      * descriptor that is polled.
243      */
244     private void pollerLoop() {
245         owner = Thread.currentThread();
246         try {
247             for (;;) {
248                 poll(-1);
249             }
250         } catch (Exception e) {
251             e.printStackTrace();
252         }
253     }
254 
255     /**
256      * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
257      * descriptor that is polled.
258      *
259      * The sub-poller registers its file descriptor with the master poller to park until
260      * there are events to poll. When unparked, it does non-blocking polls and parks
261      * again when there are no more events. The sub-poller yields after each poll to help
262      * with fairness and to avoid re-registering with the master poller where possible.
263      */
264     private void subPollerLoop(Poller masterPoller) {
265         assert Thread.currentThread().isVirtual();
266         owner = Thread.currentThread();
267         try {
268             int polled = 0;
269             for (;;) {
270                 if (polled == 0) {
271                     masterPoller.poll(fdVal(), 0, () -> true);  // park

272                 } else {
273                     Thread.yield();
274                 }
275                 polled = poll(0);
276             }
277         } catch (Exception e) {
278             e.printStackTrace();
279         }
280     }
281 
282     /**
283      * Returns the number I/O operations currently registered with this poller.
284      */
285     public int registered() {
286         return map.size();
287     }
288 
289     @Override
290     public String toString() {
291         return String.format("%s [registered = %d, owner = %s]",
292                 Objects.toIdentityString(this), registered(), owner);
293     }
294 
295     /**
296      * The Pollers used for read and write events.
297      */
298     private static class Pollers {
299         private final PollerProvider provider;
300         private final Poller.Mode pollerMode;
301         private final Poller masterPoller;
302         private final Poller[] readPollers;
303         private final Poller[] writePollers;
304 
305         // used by start method to executor is kept alive
306         private Executor executor;





307 
308         /**
309          * Creates the Poller instances based on configuration.
310          */
311         Pollers() throws IOException {
312             PollerProvider provider = PollerProvider.provider();
313             Poller.Mode mode;
314             String s = System.getProperty("jdk.pollerMode");
315             if (s != null) {
316                 if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) {
317                     mode = Mode.SYSTEM_THREADS;
318                 } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) {
319                     mode = Mode.VTHREAD_POLLERS;
320                 } else {
321                     throw new RuntimeException("Can't parse '" + s + "' as polling mode");





































322                 }
323             } else {
324                 mode = provider.defaultPollerMode();
325             }


326 
327             // vthread poller mode needs a master poller
328             Poller masterPoller = (mode == Mode.VTHREAD_POLLERS)
329                     ? provider.readPoller(false)
330                     : null;



331 
332             // read pollers (or sub-pollers)
333             int readPollerCount = pollerCount("jdk.readPollers", provider.defaultReadPollers(mode));


334             Poller[] readPollers = new Poller[readPollerCount];
335             for (int i = 0; i < readPollerCount; i++) {
336                 readPollers[i] = provider.readPoller(mode == Mode.VTHREAD_POLLERS);










337             }
338 
339             // write pollers (or sub-pollers)
340             int writePollerCount = pollerCount("jdk.writePollers", provider.defaultWritePollers(mode));

































































341             Poller[] writePollers = new Poller[writePollerCount];
342             for (int i = 0; i < writePollerCount; i++) {
343                 writePollers[i] = provider.writePoller(mode == Mode.VTHREAD_POLLERS);











344             }
345 
346             this.provider = provider;
347             this.pollerMode = mode;
348             this.masterPoller = masterPoller;
349             this.readPollers = readPollers;
350             this.writePollers = writePollers;







351         }
352 
353         /**
354          * Starts the Poller threads.
355          */
356         void start() {
357             if (pollerMode == Mode.VTHREAD_POLLERS) {
358                 startPlatformThread("MasterPoller", masterPoller::pollerLoop);
359                 ThreadFactory factory = Thread.ofVirtual()
360                         .inheritInheritableThreadLocals(false)
361                         .name("SubPoller-", 0)
362                         .uncaughtExceptionHandler((t, e) -> e.printStackTrace())
363                         .factory();
364                 executor = Executors.newThreadPerTaskExecutor(factory);
365                 Arrays.stream(readPollers).forEach(p -> {
366                     executor.execute(() -> p.subPollerLoop(masterPoller));
367                 });
368                 Arrays.stream(writePollers).forEach(p -> {
369                     executor.execute(() -> p.subPollerLoop(masterPoller));
370                 });
371             } else {
372                 Arrays.stream(readPollers).forEach(p -> {
373                     startPlatformThread("Read-Poller", p::pollerLoop);
374                 });
375                 Arrays.stream(writePollers).forEach(p -> {
376                     startPlatformThread("Write-Poller", p::pollerLoop);
377                 });
378             }
379         }
380 
381         /**
382          * Returns the master poller, or null if there is no master poller.
383          */





















384         Poller masterPoller() {
385             return masterPoller;
386         }
387 
388         /**
389          * Returns the read poller for the given file descriptor.
390          */
391         Poller readPoller(int fdVal) {
392             int index = provider.fdValToIndex(fdVal, readPollers.length);
393             return readPollers[index];


394         }
























395 
396         /**
397          * Returns the write poller for the given file descriptor.
398          */
399         Poller writePoller(int fdVal) {
400             int index = provider.fdValToIndex(fdVal, writePollers.length);



























401             return writePollers[index];
402         }
403 
404         /**
405          * Return the list of read pollers.
406          */
407         List<Poller> readPollers() {
408             return List.of(readPollers);
















409         }
410 
411         /**
412          * Return the list of write pollers.
413          */
414         List<Poller> writePollers() {
415             return List.of(writePollers);
































416         }
417 




418 
419         /**
420          * Reads the given property name to get the poller count. If the property is
421          * set then the value must be a power of 2. Returns 1 if the property is not
422          * set.
423          * @throws IllegalArgumentException if the property is set to a value that
424          * is not a power of 2.
425          */
426         private static int pollerCount(String propName, int defaultCount) {
427             String s = System.getProperty(propName);
428             int count = (s != null) ? Integer.parseInt(s) : defaultCount;
429 
430             // check power of 2
431             if (count != Integer.highestOneBit(count)) {
432                 String msg = propName + " is set to a value that is not a power of 2";
433                 throw new IllegalArgumentException(msg);


434             }
435             return count;
436         }
437 
438         /**
439          * Starts a platform thread to run the given task.
440          */
441         private void startPlatformThread(String name, Runnable task) {

442             try {
443                 Thread thread = InnocuousThread.newSystemThread(name, task);
444                 thread.setDaemon(true);
445                 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
446                 thread.start();
447             } catch (Exception e) {
448                 throw new InternalError(e);
449             }
450         }


































451     }
452 
453     /**
454      * Return the master poller or null if there is no master poller.
455      */
456     public static Poller masterPoller() {
457         return POLLERS.masterPoller();
458     }
459 
460     /**
461      * Return the list of read pollers.
462      */
463     public static List<Poller> readPollers() {
464         return POLLERS.readPollers();
465     }
466 
467     /**
468      * Return the list of write pollers.
469      */
470     public static List<Poller> writePollers() {
471         return POLLERS.writePollers();
472     }
473 }

  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package sun.nio.ch;
 26 
 27 import java.io.IOException;
 28 import java.io.UncheckedIOException;
 29 import java.lang.ref.Reference;
 30 import java.util.Arrays;
 31 import java.util.List;
 32 import java.util.Map;
 33 import java.util.Objects;
 34 import java.util.Set;
 35 import java.util.concurrent.ConcurrentHashMap;
 36 import java.util.concurrent.Executor;
 37 import java.util.concurrent.Executors;
 38 import java.util.concurrent.ThreadFactory;
 39 import java.util.concurrent.locks.LockSupport;
 40 import java.util.function.BooleanSupplier;
 41 import jdk.internal.access.JavaLangAccess;
 42 import jdk.internal.access.SharedSecrets;
 43 import jdk.internal.misc.InnocuousThread;
 44 import jdk.internal.misc.TerminatingThreadLocal;
 45 import jdk.internal.vm.Continuation;
 46 import jdk.internal.vm.ContinuationSupport;
 47 import jdk.internal.vm.annotation.Stable;
 48 
 49 /**
 50  * Polls file descriptors. Virtual threads invoke the poll method to park
 51  * until a given file descriptor is ready for I/O.
 52  */
 53 public abstract class Poller {
 54     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();









 55 
 56     // the poller group for the I/O pollers and poller threads
 57     private static final PollerGroup POLLER_GROUP = createPollerGroup();
 58 
 59     // the poller or sub-poller thread (used for observability only)
 60     private @Stable Thread owner;
 61 
 62     // maps file descriptors to parked Thread
 63     private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
 64 
 65     // shutdown (if supported by poller group)
 66     private volatile boolean shutdown;
 67 
 68     /**
 69      * Poller mode.
 70      */
 71     enum Mode {
 72         /**
 73          * Read and write pollers are platform threads that block waiting for events and
 74          * unpark virtual threads when file descriptors are ready for I/O.
 75          */
 76         SYSTEM_THREADS,
 77 
 78         /**
 79          * Read and write pollers are virtual threads that poll for events, yielding
 80          * between polls and unparking virtual threads when file descriptors are
 81          * ready for I/O. If there are no events then the poller threads park until there
 82          * are I/O events to poll. This mode helps to integrate polling with virtual
 83          * thread scheduling. The approach is similar to the default scheme in "User-level
 84          * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
 85          * (https://dl.acm.org/doi/10.1145/3379483).
 86          */
 87         VTHREAD_POLLERS,
 88 
 89         /**
 90          * Read pollers are per-carrier virtual threads that poll for events, yielding
 91          * between polls and unparking virtual threads when file descriptors are ready
 92          * for I/O. If there are no events then the poller threads park until there
 93          * are I/O events to poll. The write poller is a system-wide platform thread.
 94          */
 95         POLLER_PER_CARRIER
 96     }
 97 
 98     /**
 99      * Create and return the PollerGroup.
100      */
101     private static PollerGroup createPollerGroup() {
102         try {
103             PollerProvider provider;
104             if (System.getProperty("jdk.pollerMode") instanceof String s) {
105                 Mode mode = switch (s) {
106                     case "1" -> Mode.SYSTEM_THREADS;
107                     case "2" -> Mode.VTHREAD_POLLERS;
108                     case "3" -> Mode.POLLER_PER_CARRIER;
109                     default -> {
110                         throw new RuntimeException(s + " is not a valid polling mode");
111                     }
112                 };
113                 provider = PollerProvider.createProvider(mode);
114             } else {
115                 provider = PollerProvider.createProvider();
116             }
117 
118             int readPollers = pollerCount("jdk.readPollers", provider.defaultReadPollers());
119             int writePollers = pollerCount("jdk.writePollers", provider.defaultWritePollers());
120             PollerGroup group = switch (provider.pollerMode()) {
121                 case SYSTEM_THREADS     -> new SystemThreadsPollerGroup(provider, readPollers, writePollers);
122                 case VTHREAD_POLLERS    -> new VThreadsPollerGroup(provider, readPollers, writePollers);
123                 case POLLER_PER_CARRIER -> new PollerPerCarrierPollerGroup(provider, writePollers);
124             };
125             group.start();
126             return group;
127         } catch (IOException ioe) {
128             throw new UncheckedIOException(ioe);
129         }
130     }
131 
132     /**
133      * Initialize a Poller.
134      */
135     protected Poller() {
136     }
137 
138     /**
139      * Closes the poller and release resources. This method can only be used to cleanup
140      * when creating a poller group fails.
141      */
142     abstract void close() throws IOException;
143 
144     /**
145      * Sets the poller's thread owner.
146      */
147     private void setOwner() {
148         owner = Thread.currentThread();
149     }
150 
151     /**
152      * Returns true if this poller is marked for shutdown.
153      */
154     boolean isShutdown() {
155         return shutdown;
156     }
157 
158     /**
159      * Marks this poller for shutdown.
160      */
161     private void setShutdown() {
162         shutdown = true;
163     }
164 
165     /**
166      * Returns the poller's file descriptor to use when polling with the master poller.
167      * @throws UnsupportedOperationException if not supported
168      */
169     int fdVal() {
170         throw new UnsupportedOperationException();
171     }
172 
173     /**
174      * Invoked if when this poller's file descriptor is polled by the master poller.
175      */
176     void pollerPolled() throws IOException {
177     }
178 
179     /**
180      * Register the file descriptor. The registration is "one shot", meaning it should
181      * be polled at most once.
182      */
183     abstract void implRegister(int fdVal) throws IOException;
184 
185     /**
186      * Deregister the file descriptor.
187      * @param polled true if the file descriptor has already been polled
188      */
189     abstract void implDeregister(int fdVal, boolean polled) throws IOException;
190 
191     /**
192      * Poll for events. The {@link #polled(int)} method is invoked for each
193      * polled file descriptor.
194      *
195      * @param timeout if positive then block for up to {@code timeout} milliseconds,
196      *     if zero then don't block, if -1 then block indefinitely
197      * @return >0 if file descriptors are polled, 0 if no file descriptor polled
198      */
199     abstract int poll(int timeout) throws IOException;
200 
201     /**
202      * Wakeup the poller thread if blocked in poll.
203      *
204      * @throws UnsupportedOperationException if not supported
205      */
206     void wakeupPoller() throws IOException {
207         throw new UnsupportedOperationException();
208     }
209 
210     /**
211      * Callback by the poll method when a file descriptor is polled.
212      */
213     final void polled(int fdVal) {
214         Thread t = map.remove(fdVal);
215         if (t != null) {
216             LockSupport.unpark(t);
217         }
218     }
219 
220     /**
221      * Parks the current thread until a file descriptor is ready for the given op.
222      * @param fdVal the file descriptor
223      * @param event POLLIN or POLLOUT
224      * @param nanos the waiting time or 0 to wait indefinitely
225      * @param isOpen supplies a boolean to indicate if the enclosing object is open
226      */
227     public static void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
228         POLLER_GROUP.poll(fdVal, event, nanos, isOpen);









229     }
230 
231     /**
232      * Parks the current thread until a Selector's file descriptor is ready.
233      * @param fdVal the Selector's file descriptor
234      * @param nanos the waiting time or 0 to wait indefinitely
235      */
236     public static void pollSelector(int fdVal, long nanos) throws IOException {
237         POLLER_GROUP.pollSelector(fdVal, nanos);



















238     }
239 
240     /**
241      * Unpark the given thread so that it stops polling.
242      */
243     public static void stopPoll(Thread thread) {
244         LockSupport.unpark(thread);

245     }
246 
247     /**
248      * Parks the current thread until a file descriptor is ready.
249      */
250     private void poll(int fdVal, long nanos, BooleanSupplier isOpen) throws IOException {
251         register(fdVal);
252         try {
253             if (isOpen.getAsBoolean() && !isShutdown()) {

254                 if (nanos > 0) {
255                     LockSupport.parkNanos(nanos);
256                 } else {
257                     LockSupport.park();
258                 }
259             }
260         } finally {
261             deregister(fdVal);
262         }
263     }
264 
265     /**
266      * Registers the file descriptor to be polled at most once when the file descriptor
267      * is ready for I/O.
268      */
269     private void register(int fdVal) throws IOException {
270         Thread previous = map.put(fdVal, Thread.currentThread());
271         assert previous == null;
272         try {
273             implRegister(fdVal);
274         } catch (Throwable t) {
275             map.remove(fdVal);
276             throw t;
277         } finally {
278             Reference.reachabilityFence(this);
279         }
280     }
281 
282     /**
283      * Deregister the file descriptor so that the file descriptor is not polled.
284      */
285     private void deregister(int fdVal) throws IOException {
286         Thread previous = map.remove(fdVal);
287         boolean polled = (previous == null);
288         assert polled || previous == Thread.currentThread();
289         try {
290             implDeregister(fdVal, polled);
291         } finally {
292             Reference.reachabilityFence(this);






293         }
294     }
295 
296     /**
297      * Master polling loop. The {@link #polled(int)} method is invoked for each file
298      * descriptor that is polled.
299      */
300     private void pollerLoop() {
301         setOwner();
302         try {
303             while (!isShutdown()) {
304                 poll(-1);
305             }
306         } catch (Exception e) {
307             e.printStackTrace();
308         }
309     }
310 
311     /**
312      * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
313      * descriptor that is polled.
314      *
315      * The sub-poller registers its file descriptor with the master poller to park until
316      * there are events to poll. When unparked, it does non-blocking polls and parks
317      * again when there are no more events. The sub-poller yields after each poll to help
318      * with fairness and to avoid re-registering with the master poller where possible.
319      */
320     private void subPollerLoop(Poller masterPoller) {
321         assert Thread.currentThread().isVirtual();
322         setOwner();
323         try {
324             int polled = 0;
325             while (!isShutdown()) {
326                 if (polled == 0) {
327                     masterPoller.poll(fdVal(), 0, () -> true);  // park
328                     pollerPolled();
329                 } else {
330                     Thread.yield();
331                 }
332                 polled = poll(0);
333             }
334         } catch (Exception e) {
335             e.printStackTrace();
336         }
337     }
338 
339     /**
340      * Unparks all threads waiting on a file descriptor registered with this poller.
341      */
342     private void wakeupAll() {
343         map.values().forEach(LockSupport::unpark);
344     }
345 
346     @Override
347     public String toString() {
348         return String.format("%s [registered = %d, owner = %s]",
349             Objects.toIdentityString(this), map.size(), owner);
350     }
351 
352     /**
353      * A group of poller threads that support virtual threads polling file descriptors.
354      */
355     private static abstract class PollerGroup {
356         private final PollerProvider provider;




357 
358         PollerGroup(PollerProvider provider) {
359             this.provider = provider;
360         }
361 
362         final PollerProvider provider() {
363             return provider;
364         }
365 
366         /**
367          * Starts the poller group and any system-wide poller threads.
368          */
369         abstract void start();
370 
371         /**
372          * Parks the current thread until a file descriptor is ready for the given op.
373          */
374         abstract void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException;
375 
376         /**
377          * Parks the current thread until a Selector's file descriptor is ready.
378          */
379         void pollSelector(int fdVal, long nanos) throws IOException {
380             poll(fdVal, Net.POLLIN, nanos, () -> true);
381         }
382 
383         /**
384          * Starts a platform thread to run the given task.
385          */
386         protected final void startPlatformThread(String name, Runnable task) {
387             Thread thread = InnocuousThread.newSystemThread(name, task);
388             thread.setDaemon(true);
389             thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
390             thread.start();
391         }
392 
393         /**
394          * Return the master poller, or null if no master poller.
395          */
396         abstract Poller masterPoller();
397 
398         /**
399          * Return the read pollers.
400          */
401         abstract List<Poller> readPollers();
402 
403         /**
404          * Return the write pollers.
405          */
406         abstract List<Poller> writePollers();
407 
408         /**
409          * Close the given pollers.
410          */
411         static void closeAll(Poller... pollers) {
412             for (Poller poller : pollers) {
413                 if (poller != null) {
414                     try {
415                         poller.close();
416                     } catch (IOException _) { }
417                 }


418             }
419         }
420     }
421 
422     /**
423      * SYSTEM_THREADS poller group. The read and write pollers are system-wide platform threads.
424      */
425     private static class SystemThreadsPollerGroup extends PollerGroup {
426         // system-wide read and write pollers
427         private final Poller[] readPollers;
428         private final Poller[] writePollers;
429 
430         SystemThreadsPollerGroup(PollerProvider provider,
431                                  int readPollerCount,
432                                  int writePollerCount) throws IOException {
433             super(provider);
434             Poller[] readPollers = new Poller[readPollerCount];
435             Poller[] writePollers = new Poller[writePollerCount];
436             try {
437                 for (int i = 0; i < readPollerCount; i++) {
438                     readPollers[i] = provider.readPoller(false);
439                 }
440                 for (int i = 0; i < writePollerCount; i++) {
441                     writePollers[i] = provider.writePoller(false);
442                 }
443             } catch (Throwable e) {
444                 closeAll(readPollers);
445                 closeAll(writePollers);
446                 throw e;
447             }
448 
449             this.readPollers = readPollers;
450             this.writePollers = writePollers;
451         }
452 
453         @Override
454         void start() {
455             Arrays.stream(readPollers).forEach(p -> {
456                 startPlatformThread("Read-Poller", p::pollerLoop);
457             });
458             Arrays.stream(writePollers).forEach(p -> {
459                 startPlatformThread("Write-Poller", p::pollerLoop);
460             });
461         }
462 
463         private Poller readPoller(int fdVal) {
464             int index = provider().fdValToIndex(fdVal, readPollers.length);
465             return readPollers[index];
466         }
467 
468         private Poller writePoller(int fdVal) {
469             int index = provider().fdValToIndex(fdVal, writePollers.length);
470             return writePollers[index];
471         }
472 
473         @Override
474         void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
475             Poller poller = (event == Net.POLLIN)
476                     ? readPoller(fdVal)
477                     : writePoller(fdVal);
478             poller.poll(fdVal, nanos, isOpen);
479         }
480 
481         @Override
482         Poller masterPoller() {
483             return null;
484         }
485 
486         @Override
487         List<Poller> readPollers() {
488             return List.of(readPollers);
489         }
490 
491         @Override
492         List<Poller> writePollers() {
493             return List.of(writePollers);
494         }
495     }
496 
497     /**
498      * VTHREAD_POLLERS poller group. The read and write pollers are virtual threads.
499      * When read and write pollers need to block then they register with a system-wide
500      * "master poller" that runs in a dedicated platform thread.
501      */
502     private static class VThreadsPollerGroup extends PollerGroup {
503         private final Poller masterPoller;
504         private final Poller[] readPollers;
505         private final Poller[] writePollers;
506 
507         // keep virtual thread pollers alive
508         private final Executor executor;
509 
510         VThreadsPollerGroup(PollerProvider provider,
511                             int readPollerCount,
512                             int writePollerCount) throws IOException {
513             super(provider);
514             Poller masterPoller = provider.readPoller(false);
515             Poller[] readPollers = new Poller[readPollerCount];
516             Poller[] writePollers = new Poller[writePollerCount];
517 
518             try {
519                 for (int i = 0; i < readPollerCount; i++) {
520                     readPollers[i] = provider.readPoller(true);
521                 }
522                 for (int i = 0; i < writePollerCount; i++) {
523                     writePollers[i] = provider.writePoller(true);
524                 }
525             } catch (Throwable e) {
526                 masterPoller.close();
527                 closeAll(readPollers);
528                 closeAll(writePollers);
529                 throw e;
530             }
531 


532             this.masterPoller = masterPoller;
533             this.readPollers = readPollers;
534             this.writePollers = writePollers;
535 
536             ThreadFactory factory = Thread.ofVirtual()
537                     .inheritInheritableThreadLocals(false)
538                     .name("SubPoller-", 0)
539                     .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
540                     .factory();
541             this.executor = Executors.newThreadPerTaskExecutor(factory);
542         }
543 
544         @Override


545         void start() {
546             startPlatformThread("Master-Poller", masterPoller::pollerLoop);
547             Arrays.stream(readPollers).forEach(p -> {
548                 executor.execute(() -> p.subPollerLoop(masterPoller));
549             });
550             Arrays.stream(writePollers).forEach(p -> {
551                 executor.execute(() -> p.subPollerLoop(masterPoller));
552             });















553         }
554 
555         private Poller readPoller(int fdVal) {
556             int index = provider().fdValToIndex(fdVal, readPollers.length);
557             return readPollers[index];
558         }
559 
560         private Poller writePoller(int fdVal) {
561             int index = provider().fdValToIndex(fdVal, writePollers.length);
562             return writePollers[index];
563         }
564 
565         @Override
566         void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
567             Poller poller = (event == Net.POLLIN)
568                     ? readPoller(fdVal)
569                     : writePoller(fdVal);
570             poller.poll(fdVal, nanos, isOpen);
571         }
572 
573         @Override
574         void pollSelector(int fdVal, long nanos) throws IOException {
575             masterPoller.poll(fdVal, nanos, () -> true);
576         }
577 
578         @Override
579         Poller masterPoller() {
580             return masterPoller;
581         }
582 
583         @Override
584         List<Poller> readPollers() {
585             return List.of(readPollers);
586         }
587 
588         @Override
589         List<Poller> writePollers() {
590             return List.of(writePollers);
591         }
592     }
593 
594     /**
595      * POLLER_PER_CARRIER poller group. The read poller is a per-carrier virtual thread.
596      * When a virtual thread polls a file descriptor for POLLIN, then it will use (almost
597      * always, not guaranteed) the read poller for its carrier. When a read poller needs
598      * to block then it registers with a system-wide "master poller" that runs in a
599      * dedicated platform thread. The read poller terminates if the carrier terminates.
600      * The write pollers are system-wide platform threads (usually one).
601      */
602     private static class PollerPerCarrierPollerGroup extends PollerGroup {
603         private record CarrierPoller(PollerPerCarrierPollerGroup group, Poller readPoller) { }
604         private static final TerminatingThreadLocal<CarrierPoller> CARRIER_POLLER =
605             new TerminatingThreadLocal<>() {
606                 @Override
607                 protected void threadTerminated(CarrierPoller carrierPoller) {
608                     Poller readPoller = carrierPoller.readPoller();
609                     carrierPoller.group().carrierTerminated(readPoller);
610                 }
611             };
612 
613         private final Poller masterPoller;
614         private final Set<Poller> readPollers;
615         private final Poller[] writePollers;
616 
617         /**
618          * Create a PollerPerCarrierPollerGroup with the given number of write pollers.
619          */
620         PollerPerCarrierPollerGroup(PollerProvider provider,
621                                     int writePollerCount) throws IOException {
622             super(provider);
623             Poller masterPoller = provider.readPoller(false);
624             Poller[] writePollers = new Poller[writePollerCount];
625             try {
626                 for (int i = 0; i < writePollerCount; i++) {
627                     writePollers[i] = provider.writePoller(false);
628                 }
629             } catch (Throwable e) {
630                 masterPoller.close();
631                 closeAll(writePollers);
632                 throw e;
633             }
634             this.masterPoller = masterPoller;
635             this.readPollers = ConcurrentHashMap.newKeySet();;
636             this.writePollers = writePollers;
637         }
638 
639         @Override
640         void start() {
641             startPlatformThread("Master-Poller", masterPoller::pollerLoop);
642             Arrays.stream(writePollers).forEach(p -> {
643                 startPlatformThread("Write-Poller", p::pollerLoop);
644             });
645         }
646 
647         private Poller writePoller(int fdVal) {
648             int index = provider().fdValToIndex(fdVal, writePollers.length);
649             return writePollers[index];
650         }
651 
652         /**
653          * Starts a read sub-poller in a virtual thread.
654          */
655         private Poller startReadPoller() throws IOException {
656             assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
657 
658             // create read sub-poller
659             Poller readPoller = provider().readPoller(true);
660             readPollers.add(readPoller);
661 
662             // start virtual thread to execute sub-polling loop
663             Thread carrier = JLA.currentCarrierThread();
664             var scheduler = JLA.virtualThreadScheduler(Thread.currentThread());
665             @SuppressWarnings("restricted")
666             var _ = Thread.ofVirtual()
667                     .scheduler(scheduler)
668                     .inheritInheritableThreadLocals(false)
669                     .name(carrier.getName() + "-Read-Poller")
670                     .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
671                     .start(() -> subPollerLoop(readPoller));
672             return readPoller;
673         }
674 
675         /**
676          * Returns the read poller for the current carrier, starting it if required.
677          */
678         private Poller readPoller() throws IOException {
679             assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
680             Continuation.pin();
681             try {
682                 CarrierPoller carrierPoller = CARRIER_POLLER.get();
683                 if (carrierPoller != null) {
684                     return carrierPoller.readPoller();
685                 } else {
686                     // first poll on this carrier will start poller
687                     Poller readPoller = startReadPoller();
688                     CARRIER_POLLER.set(new CarrierPoller(this, readPoller));
689                     return readPoller;
690                 }
691             } finally {
692                 Continuation.unpin();
693             }
694         }
695 
696         @Override
697         void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
698             // for POLLIN, get the read poller for this carrier
699             if (event == Net.POLLIN
700                     && Thread.currentThread().isVirtual()
701                     && ContinuationSupport.isSupported()) {
702                 readPoller().poll(fdVal, nanos, isOpen);
703                 return;
704             }
705 
706             // -XX:-VMContinuations or POLLIN from platform thread does master poller
707             if (event == Net.POLLIN) {
708                 masterPoller.poll(fdVal, nanos, isOpen);
709             } else {
710                 writePoller(fdVal).poll(fdVal, nanos, isOpen);
711             }
712         }
713 
714         @Override
715         void pollSelector(int fdVal, long nanos) throws IOException {
716             masterPoller.poll(fdVal, nanos, () -> true);
717         }
718 
719         /**
720          * Sub-poller polling loop.




721          */
722         private void subPollerLoop(Poller readPoller) {
723             try {
724                 readPoller.subPollerLoop(masterPoller);
725             } finally {
726                 // wakeup all threads waiting on file descriptors registered with the
727                 // read poller, these I/O operation will migrate to another carrier.
728                 readPoller.wakeupAll();
729 
730                 // remove from serviceability view
731                 readPollers.remove(readPoller);
732             }

733         }
734 
735         /**
736          * Invoked by the carrier thread before it terminates.
737          */
738         private void carrierTerminated(Poller readPoller) {
739             readPoller.setShutdown();
740             try {
741                 readPoller.wakeupPoller();
742             } catch (Throwable e) {
743                 e.printStackTrace();



744             }
745         }
746 
747         @Override
748         Poller masterPoller() {
749             return masterPoller;
750         }
751 
752         @Override
753         List<Poller> readPollers() {
754             return readPollers.stream().toList();
755         }
756 
757         @Override
758         List<Poller> writePollers() {
759             return List.of(writePollers);
760         }
761     }
762 
763     /**
764      * Reads the given property name to get the poller count. If the property is
765      * set then the value must be a power of 2. Returns 1 if the property is not
766      * set.
767      * @throws IllegalArgumentException if the property is set to a value that
768      * is not a power of 2.
769      */
770     private static int pollerCount(String propName, int defaultCount) {
771         String s = System.getProperty(propName);
772         int count = (s != null) ? Integer.parseInt(s) : defaultCount;
773 
774         // check power of 2
775         if (count != Integer.highestOneBit(count)) {
776             String msg = propName + " is set to a value that is not a power of 2";
777             throw new IllegalArgumentException(msg);
778         }
779         return count;
780     }
781 
782     /**
783      * Return the master poller or null if there is no master poller.
784      */
785     public static Poller masterPoller() {
786         return POLLER_GROUP.masterPoller();
787     }
788 
789     /**
790      * Return the list of read pollers.
791      */
792     public static List<Poller> readPollers() {
793         return POLLER_GROUP.readPollers();
794     }
795 
796     /**
797      * Return the list of write pollers.
798      */
799     public static List<Poller> writePollers() {
800         return POLLER_GROUP.writePollers();
801     }
802 }
< prev index next >