1 /*
  2  * Copyright (c) 2017, 2024, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package sun.nio.ch;
 26 
 27 import java.io.IOException;
 28 import java.util.Arrays;
 29 import java.util.List;
 30 import java.util.Map;
 31 import java.util.Objects;
 32 import java.util.concurrent.ConcurrentHashMap;
 33 import java.util.concurrent.Executor;
 34 import java.util.concurrent.Executors;
 35 import java.util.concurrent.ThreadFactory;
 36 import java.util.concurrent.locks.LockSupport;
 37 import java.util.function.BooleanSupplier;
 38 import jdk.internal.misc.InnocuousThread;
 39 import sun.security.action.GetPropertyAction;
 40 
 41 /**
 42  * Polls file descriptors. Virtual threads invoke the poll method to park
 43  * until a given file descriptor is ready for I/O.
 44  */
 45 public abstract class Poller {
 46     private static final Pollers POLLERS;
 47     static {
 48         try {
 49             var pollers = new Pollers();
 50             pollers.start();
 51             POLLERS = pollers;
 52         } catch (IOException ioe) {
 53             throw new ExceptionInInitializerError(ioe);
 54         }
 55     }
 56 
 57     // maps file descriptors to parked Thread
 58     private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
 59 
 60     /**
 61      * Poller mode.
 62      */
 63     enum Mode {
 64         /**
 65          * ReadPoller and WritePoller are dedicated platform threads that block waiting
 66          * for events and unpark virtual threads when file descriptors are ready for I/O.
 67          */
 68         SYSTEM_THREADS,
 69 
 70         /**
 71          * ReadPoller and WritePoller threads are virtual threads that poll for events,
 72          * yielding between polls and unparking virtual threads when file descriptors are
 73          * ready for I/O. If there are no events then the poller threads park until there
 74          * are I/O events to poll. This mode helps to integrate polling with virtual
 75          * thread scheduling. The approach is similar to the default scheme in "User-level
 76          * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
 77          * (https://dl.acm.org/doi/10.1145/3379483).
 78          */
 79         VTHREAD_POLLERS
 80     }
 81 
 82     /**
 83      * Initialize a Poller.
 84      */
 85     protected Poller() {
 86     }
 87 
 88     /**
 89      * Returns the poller's file descriptor, used when the read and write poller threads
 90      * are virtual threads.
 91      *
 92      * @throws UnsupportedOperationException if not supported
 93      */
 94     int fdVal() {
 95         throw new UnsupportedOperationException();
 96     }
 97 
 98     /**
 99      * Register the file descriptor. The registration is "one shot", meaning it should
100      * be polled at most once.
101      */
102     abstract void implRegister(int fdVal) throws IOException;
103 
104     /**
105      * Deregister the file descriptor.
106      * @param polled true if the file descriptor has already been polled
107      */
108     abstract void implDeregister(int fdVal, boolean polled);
109 
110     /**
111      * Poll for events. The {@link #polled(int)} method is invoked for each
112      * polled file descriptor.
113      *
114      * @param timeout if positive then block for up to {@code timeout} milliseconds,
115      *     if zero then don't block, if -1 then block indefinitely
116      * @return the number of file descriptors polled
117      */
118     abstract int poll(int timeout) throws IOException;
119 
120     /**
121      * Callback by the poll method when a file descriptor is polled.
122      */
123     final void polled(int fdVal) {
124         wakeup(fdVal);
125     }
126 
127     /**
128      * Parks the current thread until a file descriptor is ready for the given op.
129      * @param fdVal the file descriptor
130      * @param event POLLIN or POLLOUT
131      * @param nanos the waiting time or 0 to wait indefinitely
132      * @param supplier supplies a boolean to indicate if the enclosing object is open
133      */
134     static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier)
135         throws IOException
136     {
137         assert nanos >= 0L;
138         if (event == Net.POLLIN) {
139             POLLERS.readPoller(fdVal).poll(fdVal, nanos, supplier);
140         } else if (event == Net.POLLOUT) {
141             POLLERS.writePoller(fdVal).poll(fdVal, nanos, supplier);
142         } else {
143             assert false;
144         }
145     }
146 
147     /**
148      * Parks the current thread until a Selector's file descriptor is ready.
149      * @param fdVal the Selector's file descriptor
150      * @param nanos the waiting time or 0 to wait indefinitely
151      */
152     static void pollSelector(int fdVal, long nanos) throws IOException {
153         assert nanos >= 0L;
154         Poller poller = POLLERS.masterPoller();
155         if (poller == null) {
156             poller = POLLERS.readPoller(fdVal);
157         }
158         poller.poll(fdVal, nanos, () -> true);
159     }
160 
161     /**
162      * If there is a thread polling the given file descriptor for the given event then
163      * the thread is unparked.
164      */
165     static void stopPoll(int fdVal, int event) {
166         if (event == Net.POLLIN) {
167             POLLERS.readPoller(fdVal).wakeup(fdVal);
168         } else if (event == Net.POLLOUT) {
169             POLLERS.writePoller(fdVal).wakeup(fdVal);
170         } else {
171             throw new IllegalArgumentException();
172         }
173     }
174 
175     /**
176      * If there are any threads polling the given file descriptor then they are unparked.
177      */
178     static void stopPoll(int fdVal) {
179         stopPoll(fdVal, Net.POLLIN);
180         stopPoll(fdVal, Net.POLLOUT);
181     }
182 
183     /**
184      * Parks the current thread until a file descriptor is ready.
185      */
186     private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException {
187         register(fdVal);
188         try {
189             boolean isOpen = supplier.getAsBoolean();
190             if (isOpen) {
191                 if (nanos > 0) {
192                     LockSupport.parkNanos(nanos);
193                 } else {
194                     LockSupport.park();
195                 }
196             }
197         } finally {
198             deregister(fdVal);
199         }
200     }
201 
202     /**
203      * Registers the file descriptor to be polled at most once when the file descriptor
204      * is ready for I/O.
205      */
206     private void register(int fdVal) throws IOException {
207         Thread previous = map.put(fdVal, Thread.currentThread());
208         assert previous == null;
209         try {
210             implRegister(fdVal);
211         } catch (Throwable t) {
212             map.remove(fdVal);
213             throw t;
214         }
215     }
216 
217     /**
218      * Deregister the file descriptor so that the file descriptor is not polled.
219      */
220     private void deregister(int fdVal) {
221         Thread previous = map.remove(fdVal);
222         boolean polled = (previous == null);
223         assert polled || previous == Thread.currentThread();
224         implDeregister(fdVal, polled);
225     }
226 
227     /**
228      * Unparks any thread that is polling the given file descriptor.
229      */
230     private void wakeup(int fdVal) {
231         Thread t = map.remove(fdVal);
232         if (t != null) {
233             LockSupport.unpark(t);
234         }
235     }
236 
237     /**
238      * Master polling loop. The {@link #polled(int)} method is invoked for each file
239      * descriptor that is polled.
240      */
241     private void pollerLoop() {
242         try {
243             for (;;) {
244                 poll(-1);
245             }
246         } catch (Exception e) {
247             e.printStackTrace();
248         }
249     }
250 
251     /**
252      * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
253      * descriptor that is polled.
254      *
255      * The sub-poller registers its file descriptor with the master poller to park until
256      * there are events to poll. When unparked, it does non-blocking polls and parks
257      * again when there are no more events. The sub-poller yields after each poll to help
258      * with fairness and to avoid re-registering with the master poller where possible.
259      */
260     private void subPollerLoop(Poller masterPoller) {
261         assert Thread.currentThread().isVirtual();
262         try {
263             int polled = 0;
264             for (;;) {
265                 if (polled == 0) {
266                     masterPoller.poll(fdVal(), 0, () -> true);  // park
267                 } else {
268                     Thread.yield();
269                 }
270                 polled = poll(0);
271             }
272         } catch (Exception e) {
273             e.printStackTrace();
274         }
275     }
276 
277     /**
278      * Returns the number I/O operations currently registered with this poller.
279      */
280     public int registered() {
281         return map.size();
282     }
283 
284     @Override
285     public String toString() {
286         return Objects.toIdentityString(this) + " [registered = " + registered() + "]";
287     }
288 
289     /**
290      * The Pollers used for read and write events.
291      */
292     private static class Pollers {
293         private final PollerProvider provider;
294         private final Poller.Mode pollerMode;
295         private final Poller masterPoller;
296         private final Poller[] readPollers;
297         private final Poller[] writePollers;
298 
299         // used by start method to executor is kept alive
300         private Executor executor;
301 
302         /**
303          * Creates the Poller instances based on configuration.
304          */
305         Pollers() throws IOException {
306             PollerProvider provider = PollerProvider.provider();
307             Poller.Mode mode;
308             String s = GetPropertyAction.privilegedGetProperty("jdk.pollerMode");
309             if (s != null) {
310                 if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) {
311                     mode = Mode.SYSTEM_THREADS;
312                 } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) {
313                     mode = Mode.VTHREAD_POLLERS;
314                 } else {
315                     throw new RuntimeException("Can't parse '" + s + "' as polling mode");
316                 }
317             } else {
318                 mode = provider.defaultPollerMode();
319             }
320 
321             // vthread poller mode needs a master poller
322             Poller masterPoller = (mode == Mode.VTHREAD_POLLERS)
323                     ? provider.readPoller(false)
324                     : null;
325 
326             // read pollers (or sub-pollers)
327             int readPollerCount = pollerCount("jdk.readPollers", provider.defaultReadPollers(mode));
328             Poller[] readPollers = new Poller[readPollerCount];
329             for (int i = 0; i < readPollerCount; i++) {
330                 readPollers[i] = provider.readPoller(mode == Mode.VTHREAD_POLLERS);
331             }
332 
333             // write pollers (or sub-pollers)
334             int writePollerCount = pollerCount("jdk.writePollers", provider.defaultWritePollers(mode));
335             Poller[] writePollers = new Poller[writePollerCount];
336             for (int i = 0; i < writePollerCount; i++) {
337                 writePollers[i] = provider.writePoller(mode == Mode.VTHREAD_POLLERS);
338             }
339 
340             this.provider = provider;
341             this.pollerMode = mode;
342             this.masterPoller = masterPoller;
343             this.readPollers = readPollers;
344             this.writePollers = writePollers;
345         }
346 
347         /**
348          * Starts the Poller threads.
349          */
350         void start() {
351             if (pollerMode == Mode.VTHREAD_POLLERS) {
352                 startPlatformThread("MasterPoller", masterPoller::pollerLoop);
353                 ThreadFactory factory = Thread.ofVirtual()
354                         .inheritInheritableThreadLocals(false)
355                         .name("SubPoller-", 0)
356                         .uncaughtExceptionHandler((t, e) -> e.printStackTrace())
357                         .factory();
358                 executor = Executors.newThreadPerTaskExecutor(factory);
359                 Arrays.stream(readPollers).forEach(p -> {
360                     executor.execute(() -> p.subPollerLoop(masterPoller));
361                 });
362                 Arrays.stream(writePollers).forEach(p -> {
363                     executor.execute(() -> p.subPollerLoop(masterPoller));
364                 });
365             } else {
366                 Arrays.stream(readPollers).forEach(p -> {
367                     startPlatformThread("Read-Poller", p::pollerLoop);
368                 });
369                 Arrays.stream(writePollers).forEach(p -> {
370                     startPlatformThread("Write-Poller", p::pollerLoop);
371                 });
372             }
373         }
374 
375         /**
376          * Returns the master poller, or null if there is no master poller.
377          */
378         Poller masterPoller() {
379             return masterPoller;
380         }
381 
382         /**
383          * Returns the read poller for the given file descriptor.
384          */
385         Poller readPoller(int fdVal) {
386             int index = provider.fdValToIndex(fdVal, readPollers.length);
387             return readPollers[index];
388         }
389 
390         /**
391          * Returns the write poller for the given file descriptor.
392          */
393         Poller writePoller(int fdVal) {
394             int index = provider.fdValToIndex(fdVal, writePollers.length);
395             return writePollers[index];
396         }
397 
398         /**
399          * Return the list of read pollers.
400          */
401         List<Poller> readPollers() {
402             return List.of(readPollers);
403         }
404 
405         /**
406          * Return the list of write pollers.
407          */
408         List<Poller> writePollers() {
409             return List.of(writePollers);
410         }
411 
412 
413         /**
414          * Reads the given property name to get the poller count. If the property is
415          * set then the value must be a power of 2. Returns 1 if the property is not
416          * set.
417          * @throws IllegalArgumentException if the property is set to a value that
418          * is not a power of 2.
419          */
420         private static int pollerCount(String propName, int defaultCount) {
421             String s = GetPropertyAction.privilegedGetProperty(propName);
422             int count = (s != null) ? Integer.parseInt(s) : defaultCount;
423 
424             // check power of 2
425             if (count != Integer.highestOneBit(count)) {
426                 String msg = propName + " is set to a value that is not a power of 2";
427                 throw new IllegalArgumentException(msg);
428             }
429             return count;
430         }
431 
432         /**
433          * Starts a platform thread to run the given task.
434          */
435         private void startPlatformThread(String name, Runnable task) {
436             try {
437                 Thread thread = InnocuousThread.newSystemThread(name, task);
438                 thread.setDaemon(true);
439                 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
440                 thread.start();
441             } catch (Exception e) {
442                 throw new InternalError(e);
443             }
444         }
445     }
446 
447     /**
448      * Return the master poller or null if there is no master poller.
449      */
450     public static Poller masterPoller() {
451         return POLLERS.masterPoller();
452     }
453 
454     /**
455      * Return the list of read pollers.
456      */
457     public static List<Poller> readPollers() {
458         return POLLERS.readPollers();
459     }
460 
461     /**
462      * Return the list of write pollers.
463      */
464     public static List<Poller> writePollers() {
465         return POLLERS.writePollers();
466     }
467 }