1 /*
  2  * Copyright (c) 2017, 2024, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package sun.nio.ch;
 26 
 27 import java.io.IOException;
 28 import java.util.Arrays;
 29 import java.util.List;
 30 import java.util.Map;
 31 import java.util.Objects;
 32 import java.util.concurrent.ConcurrentHashMap;
 33 import java.util.concurrent.Executor;
 34 import java.util.concurrent.Executors;
 35 import java.util.concurrent.ThreadFactory;
 36 import java.util.concurrent.locks.LockSupport;
 37 import java.util.function.BooleanSupplier;
 38 import jdk.internal.misc.InnocuousThread;
 39 import jdk.internal.vm.annotation.Stable;
 40 
 41 /**
 42  * Polls file descriptors. Virtual threads invoke the poll method to park
 43  * until a given file descriptor is ready for I/O.
 44  */
 45 public abstract class Poller {
 46     private static final Pollers POLLERS;
 47     static {
 48         try {
 49             var pollers = new Pollers();
 50             pollers.start();
 51             POLLERS = pollers;
 52         } catch (IOException ioe) {
 53             throw new ExceptionInInitializerError(ioe);
 54         }
 55     }
 56 
 57     // the poller or sub-poller thread
 58     private @Stable Thread owner;
 59 
 60     // maps file descriptors to parked Thread
 61     private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
 62 
 63     /**
 64      * Poller mode.
 65      */
 66     enum Mode {
 67         /**
 68          * ReadPoller and WritePoller are dedicated platform threads that block waiting
 69          * for events and unpark virtual threads when file descriptors are ready for I/O.
 70          */
 71         SYSTEM_THREADS,
 72 
 73         /**
 74          * ReadPoller and WritePoller threads are virtual threads that poll for events,
 75          * yielding between polls and unparking virtual threads when file descriptors are
 76          * ready for I/O. If there are no events then the poller threads park until there
 77          * are I/O events to poll. This mode helps to integrate polling with virtual
 78          * thread scheduling. The approach is similar to the default scheme in "User-level
 79          * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
 80          * (https://dl.acm.org/doi/10.1145/3379483).
 81          */
 82         VTHREAD_POLLERS
 83     }
 84 
 85     /**
 86      * Initialize a Poller.
 87      */
 88     protected Poller() {
 89     }
 90 
 91     /**
 92      * Returns the poller's file descriptor, used when the read and write poller threads
 93      * are virtual threads.
 94      *
 95      * @throws UnsupportedOperationException if not supported
 96      */
 97     int fdVal() {
 98         throw new UnsupportedOperationException();
 99     }
100 
101     /**
102      * Register the file descriptor. The registration is "one shot", meaning it should
103      * be polled at most once.
104      */
105     abstract void implRegister(int fdVal) throws IOException;
106 
107     /**
108      * Deregister the file descriptor.
109      * @param polled true if the file descriptor has already been polled
110      */
111     abstract void implDeregister(int fdVal, boolean polled);
112 
113     /**
114      * Poll for events. The {@link #polled(int)} method is invoked for each
115      * polled file descriptor.
116      *
117      * @param timeout if positive then block for up to {@code timeout} milliseconds,
118      *     if zero then don't block, if -1 then block indefinitely
119      * @return the number of file descriptors polled
120      */
121     abstract int poll(int timeout) throws IOException;
122 
123     /**
124      * Callback by the poll method when a file descriptor is polled.
125      */
126     final void polled(int fdVal) {
127         wakeup(fdVal);
128     }
129 
130     /**
131      * Parks the current thread until a file descriptor is ready for the given op.
132      * @param fdVal the file descriptor
133      * @param event POLLIN or POLLOUT
134      * @param nanos the waiting time or 0 to wait indefinitely
135      * @param supplier supplies a boolean to indicate if the enclosing object is open
136      */
137     static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier)
138         throws IOException
139     {
140         assert nanos >= 0L;
141         if (event == Net.POLLIN) {
142             POLLERS.readPoller(fdVal).poll(fdVal, nanos, supplier);
143         } else if (event == Net.POLLOUT) {
144             POLLERS.writePoller(fdVal).poll(fdVal, nanos, supplier);
145         } else {
146             assert false;
147         }
148     }
149 
150     /**
151      * Parks the current thread until a Selector's file descriptor is ready.
152      * @param fdVal the Selector's file descriptor
153      * @param nanos the waiting time or 0 to wait indefinitely
154      */
155     static void pollSelector(int fdVal, long nanos) throws IOException {
156         assert nanos >= 0L;
157         Poller poller = POLLERS.masterPoller();
158         if (poller == null) {
159             poller = POLLERS.readPoller(fdVal);
160         }
161         poller.poll(fdVal, nanos, () -> true);
162     }
163 
164     /**
165      * If there is a thread polling the given file descriptor for the given event then
166      * the thread is unparked.
167      */
168     static void stopPoll(int fdVal, int event) {
169         if (event == Net.POLLIN) {
170             POLLERS.readPoller(fdVal).wakeup(fdVal);
171         } else if (event == Net.POLLOUT) {
172             POLLERS.writePoller(fdVal).wakeup(fdVal);
173         } else {
174             throw new IllegalArgumentException();
175         }
176     }
177 
178     /**
179      * If there are any threads polling the given file descriptor then they are unparked.
180      */
181     static void stopPoll(int fdVal) {
182         stopPoll(fdVal, Net.POLLIN);
183         stopPoll(fdVal, Net.POLLOUT);
184     }
185 
186     /**
187      * Parks the current thread until a file descriptor is ready.
188      */
189     private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException {
190         register(fdVal);
191         try {
192             boolean isOpen = supplier.getAsBoolean();
193             if (isOpen) {
194                 if (nanos > 0) {
195                     LockSupport.parkNanos(nanos);
196                 } else {
197                     LockSupport.park();
198                 }
199             }
200         } finally {
201             deregister(fdVal);
202         }
203     }
204 
205     /**
206      * Registers the file descriptor to be polled at most once when the file descriptor
207      * is ready for I/O.
208      */
209     private void register(int fdVal) throws IOException {
210         Thread previous = map.put(fdVal, Thread.currentThread());
211         assert previous == null;
212         try {
213             implRegister(fdVal);
214         } catch (Throwable t) {
215             map.remove(fdVal);
216             throw t;
217         }
218     }
219 
220     /**
221      * Deregister the file descriptor so that the file descriptor is not polled.
222      */
223     private void deregister(int fdVal) {
224         Thread previous = map.remove(fdVal);
225         boolean polled = (previous == null);
226         assert polled || previous == Thread.currentThread();
227         implDeregister(fdVal, polled);
228     }
229 
230     /**
231      * Unparks any thread that is polling the given file descriptor.
232      */
233     private void wakeup(int fdVal) {
234         Thread t = map.remove(fdVal);
235         if (t != null) {
236             LockSupport.unpark(t);
237         }
238     }
239 
240     /**
241      * Master polling loop. The {@link #polled(int)} method is invoked for each file
242      * descriptor that is polled.
243      */
244     private void pollerLoop() {
245         owner = Thread.currentThread();
246         try {
247             for (;;) {
248                 poll(-1);
249             }
250         } catch (Exception e) {
251             e.printStackTrace();
252         }
253     }
254 
255     /**
256      * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
257      * descriptor that is polled.
258      *
259      * The sub-poller registers its file descriptor with the master poller to park until
260      * there are events to poll. When unparked, it does non-blocking polls and parks
261      * again when there are no more events. The sub-poller yields after each poll to help
262      * with fairness and to avoid re-registering with the master poller where possible.
263      */
264     private void subPollerLoop(Poller masterPoller) {
265         assert Thread.currentThread().isVirtual();
266         owner = Thread.currentThread();
267         try {
268             int polled = 0;
269             for (;;) {
270                 if (polled == 0) {
271                     masterPoller.poll(fdVal(), 0, () -> true);  // park
272                 } else {
273                     Thread.yield();
274                 }
275                 polled = poll(0);
276             }
277         } catch (Exception e) {
278             e.printStackTrace();
279         }
280     }
281 
282     /**
283      * Returns the number I/O operations currently registered with this poller.
284      */
285     public int registered() {
286         return map.size();
287     }
288 
289     @Override
290     public String toString() {
291         return String.format("%s [registered = %d, owner = %s]",
292                 Objects.toIdentityString(this), registered(), owner);
293     }
294 
295     /**
296      * The Pollers used for read and write events.
297      */
298     private static class Pollers {
299         private final PollerProvider provider;
300         private final Poller.Mode pollerMode;
301         private final Poller masterPoller;
302         private final Poller[] readPollers;
303         private final Poller[] writePollers;
304 
305         // used by start method to executor is kept alive
306         private Executor executor;
307 
308         /**
309          * Creates the Poller instances based on configuration.
310          */
311         Pollers() throws IOException {
312             PollerProvider provider = PollerProvider.provider();
313             Poller.Mode mode;
314             String s = System.getProperty("jdk.pollerMode");
315             if (s != null) {
316                 if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) {
317                     mode = Mode.SYSTEM_THREADS;
318                 } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) {
319                     mode = Mode.VTHREAD_POLLERS;
320                 } else {
321                     throw new RuntimeException("Can't parse '" + s + "' as polling mode");
322                 }
323             } else {
324                 mode = provider.defaultPollerMode();
325             }
326 
327             // vthread poller mode needs a master poller
328             Poller masterPoller = (mode == Mode.VTHREAD_POLLERS)
329                     ? provider.readPoller(false)
330                     : null;
331 
332             // read pollers (or sub-pollers)
333             int readPollerCount = pollerCount("jdk.readPollers", provider.defaultReadPollers(mode));
334             Poller[] readPollers = new Poller[readPollerCount];
335             for (int i = 0; i < readPollerCount; i++) {
336                 readPollers[i] = provider.readPoller(mode == Mode.VTHREAD_POLLERS);
337             }
338 
339             // write pollers (or sub-pollers)
340             int writePollerCount = pollerCount("jdk.writePollers", provider.defaultWritePollers(mode));
341             Poller[] writePollers = new Poller[writePollerCount];
342             for (int i = 0; i < writePollerCount; i++) {
343                 writePollers[i] = provider.writePoller(mode == Mode.VTHREAD_POLLERS);
344             }
345 
346             this.provider = provider;
347             this.pollerMode = mode;
348             this.masterPoller = masterPoller;
349             this.readPollers = readPollers;
350             this.writePollers = writePollers;
351         }
352 
353         /**
354          * Starts the Poller threads.
355          */
356         void start() {
357             if (pollerMode == Mode.VTHREAD_POLLERS) {
358                 startPlatformThread("MasterPoller", masterPoller::pollerLoop);
359                 ThreadFactory factory = Thread.ofVirtual()
360                         .inheritInheritableThreadLocals(false)
361                         .name("SubPoller-", 0)
362                         .uncaughtExceptionHandler((t, e) -> e.printStackTrace())
363                         .factory();
364                 executor = Executors.newThreadPerTaskExecutor(factory);
365                 Arrays.stream(readPollers).forEach(p -> {
366                     executor.execute(() -> p.subPollerLoop(masterPoller));
367                 });
368                 Arrays.stream(writePollers).forEach(p -> {
369                     executor.execute(() -> p.subPollerLoop(masterPoller));
370                 });
371             } else {
372                 Arrays.stream(readPollers).forEach(p -> {
373                     startPlatformThread("Read-Poller", p::pollerLoop);
374                 });
375                 Arrays.stream(writePollers).forEach(p -> {
376                     startPlatformThread("Write-Poller", p::pollerLoop);
377                 });
378             }
379         }
380 
381         /**
382          * Returns the master poller, or null if there is no master poller.
383          */
384         Poller masterPoller() {
385             return masterPoller;
386         }
387 
388         /**
389          * Returns the read poller for the given file descriptor.
390          */
391         Poller readPoller(int fdVal) {
392             int index = provider.fdValToIndex(fdVal, readPollers.length);
393             return readPollers[index];
394         }
395 
396         /**
397          * Returns the write poller for the given file descriptor.
398          */
399         Poller writePoller(int fdVal) {
400             int index = provider.fdValToIndex(fdVal, writePollers.length);
401             return writePollers[index];
402         }
403 
404         /**
405          * Return the list of read pollers.
406          */
407         List<Poller> readPollers() {
408             return List.of(readPollers);
409         }
410 
411         /**
412          * Return the list of write pollers.
413          */
414         List<Poller> writePollers() {
415             return List.of(writePollers);
416         }
417 
418 
419         /**
420          * Reads the given property name to get the poller count. If the property is
421          * set then the value must be a power of 2. Returns 1 if the property is not
422          * set.
423          * @throws IllegalArgumentException if the property is set to a value that
424          * is not a power of 2.
425          */
426         private static int pollerCount(String propName, int defaultCount) {
427             String s = System.getProperty(propName);
428             int count = (s != null) ? Integer.parseInt(s) : defaultCount;
429 
430             // check power of 2
431             if (count != Integer.highestOneBit(count)) {
432                 String msg = propName + " is set to a value that is not a power of 2";
433                 throw new IllegalArgumentException(msg);
434             }
435             return count;
436         }
437 
438         /**
439          * Starts a platform thread to run the given task.
440          */
441         private void startPlatformThread(String name, Runnable task) {
442             try {
443                 Thread thread = InnocuousThread.newSystemThread(name, task);
444                 thread.setDaemon(true);
445                 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
446                 thread.start();
447             } catch (Exception e) {
448                 throw new InternalError(e);
449             }
450         }
451     }
452 
453     /**
454      * Return the master poller or null if there is no master poller.
455      */
456     public static Poller masterPoller() {
457         return POLLERS.masterPoller();
458     }
459 
460     /**
461      * Return the list of read pollers.
462      */
463     public static List<Poller> readPollers() {
464         return POLLERS.readPollers();
465     }
466 
467     /**
468      * Return the list of write pollers.
469      */
470     public static List<Poller> writePollers() {
471         return POLLERS.writePollers();
472     }
473 }