1 /*
  2  * Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package sun.nio.ch;
 26 
 27 import java.io.IOException;
 28 import java.util.Arrays;
 29 import java.util.Map;
 30 import java.util.concurrent.ConcurrentHashMap;
 31 import java.util.concurrent.Executor;
 32 import java.util.concurrent.Executors;
 33 import java.util.concurrent.ThreadFactory;
 34 import java.util.concurrent.locks.LockSupport;
 35 import java.util.function.BooleanSupplier;
 36 import jdk.internal.misc.InnocuousThread;
 37 import sun.security.action.GetPropertyAction;
 38 
 39 /**
 40  * Polls file descriptors. Virtual threads invoke the poll method to park
 41  * until a given file descriptor is ready for I/O.
 42  */
 43 abstract class Poller {
 44     private static final Pollers POLLERS;
 45     static {
 46         try {
 47             var pollers = new Pollers();
 48             pollers.start();
 49             POLLERS = pollers;
 50         } catch (IOException ioe) {
 51             throw new ExceptionInInitializerError(ioe);
 52         }
 53     }
 54 
 55     // maps file descriptors to parked Thread
 56     private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
 57 
 58     /**
 59      * Poller mode.
 60      */
 61     enum Mode {
 62         /**
 63          * ReadPoller and WritePoller are dedicated platform threads that block waiting
 64          * for events and unpark virtual threads when file descriptors are ready for I/O.
 65          */
 66         SYSTEM_THREADS,
 67 
 68         /**
 69          * ReadPoller and WritePoller threads are virtual threads that poll for events,
 70          * yielding between polls and unparking virtual threads when file descriptors are
 71          * ready for I/O. If there are no events then the poller threads park until there
 72          * are I/O events to poll. This mode helps to integrate polling with virtual
 73          * thread scheduling. The approach is similar to the default scheme in "User-level
 74          * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
 75          * (https://dl.acm.org/doi/10.1145/3379483).
 76          */
 77         VTHREAD_POLLERS
 78     }
 79 
 80     /**
 81      * Initialize a Poller.
 82      */
 83     protected Poller() {
 84     }
 85 
 86     /**
 87      * Returns the poller's file descriptor, used when the read and write poller threads
 88      * are virtual threads.
 89      *
 90      * @throws UnsupportedOperationException if not supported
 91      */
 92     int fdVal() {
 93         throw new UnsupportedOperationException();
 94     }
 95 
 96     /**
 97      * Register the file descriptor. The registration is "one shot", meaning it should
 98      * be polled at most once.
 99      */
100     abstract void implRegister(int fdVal) throws IOException;
101 
102     /**
103      * Deregister the file descriptor.
104      * @param polled true if the file descriptor has already been polled
105      */
106     abstract void implDeregister(int fdVal, boolean polled);
107 
108     /**
109      * Poll for events. The {@link #polled(int)} method is invoked for each
110      * polled file descriptor.
111      *
112      * @param timeout if positive then block for up to {@code timeout} milliseconds,
113      *     if zero then don't block, if -1 then block indefinitely
114      * @return the number of file descriptors polled
115      */
116     abstract int poll(int timeout) throws IOException;
117 
118     /**
119      * Callback by the poll method when a file descriptor is polled.
120      */
121     final void polled(int fdVal) {
122         wakeup(fdVal);
123     }
124 
125     /**
126      * Parks the current thread until a file descriptor is ready for the given op.
127      * @param fdVal the file descriptor
128      * @param event POLLIN or POLLOUT
129      * @param nanos the waiting time or 0 to wait indefinitely
130      * @param supplier supplies a boolean to indicate if the enclosing object is open
131      */
132     static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier)
133         throws IOException
134     {
135         assert nanos >= 0L;
136         if (event == Net.POLLIN) {
137             POLLERS.readPoller(fdVal).poll(fdVal, nanos, supplier);
138         } else if (event == Net.POLLOUT) {
139             POLLERS.writePoller(fdVal).poll(fdVal, nanos, supplier);
140         } else {
141             assert false;
142         }
143     }
144 
145     /**
146      * Parks the current thread until a Selector's file descriptor is ready.
147      * @param fdVal the Selector's file descriptor
148      * @param nanos the waiting time or 0 to wait indefinitely
149      */
150     static void pollSelector(int fdVal, long nanos) throws IOException {
151         assert nanos >= 0L;
152         Poller poller = POLLERS.masterPoller();
153         if (poller == null) {
154             poller = POLLERS.readPoller(fdVal);
155         }
156         poller.poll(fdVal, nanos, () -> true);
157     }
158 
159     /**
160      * If there is a thread polling the given file descriptor for the given event then
161      * the thread is unparked.
162      */
163     static void stopPoll(int fdVal, int event) {
164         if (event == Net.POLLIN) {
165             POLLERS.readPoller(fdVal).wakeup(fdVal);
166         } else if (event == Net.POLLOUT) {
167             POLLERS.writePoller(fdVal).wakeup(fdVal);
168         } else {
169             throw new IllegalArgumentException();
170         }
171     }
172 
173     /**
174      * If there are any threads polling the given file descriptor then they are unparked.
175      */
176     static void stopPoll(int fdVal) {
177         stopPoll(fdVal, Net.POLLIN);
178         stopPoll(fdVal, Net.POLLOUT);
179     }
180 
181     /**
182      * Parks the current thread until a file descriptor is ready.
183      */
184     private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException {
185         register(fdVal);
186         try {
187             boolean isOpen = supplier.getAsBoolean();
188             if (isOpen) {
189                 if (nanos > 0) {
190                     LockSupport.parkNanos(nanos);
191                 } else {
192                     LockSupport.park();
193                 }
194             }
195         } finally {
196             deregister(fdVal);
197         }
198     }
199 
200     /**
201      * Registers the file descriptor to be polled at most once when the file descriptor
202      * is ready for I/O.
203      */
204     private void register(int fdVal) throws IOException {
205         Thread previous = map.put(fdVal, Thread.currentThread());
206         assert previous == null;
207         implRegister(fdVal);
208     }
209 
210     /**
211      * Deregister the file descriptor so that the file descriptor is not polled.
212      */
213     private void deregister(int fdVal) {
214         Thread previous = map.remove(fdVal);
215         boolean polled = (previous == null);
216         assert polled || previous == Thread.currentThread();
217         implDeregister(fdVal, polled);
218     }
219 
220     /**
221      * Unparks any thread that is polling the given file descriptor.
222      */
223     private void wakeup(int fdVal) {
224         Thread t = map.remove(fdVal);
225         if (t != null) {
226             LockSupport.unpark(t);
227         }
228     }
229 
230     /**
231      * Master polling loop. The {@link #polled(int)} method is invoked for each file
232      * descriptor that is polled.
233      */
234     private void pollerLoop() {
235         try {
236             for (;;) {
237                 poll(-1);
238             }
239         } catch (Exception e) {
240             e.printStackTrace();
241         }
242     }
243 
244     /**
245      * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
246      * descriptor that is polled.
247      *
248      * The sub-poller registers its file descriptor with the master poller to park until
249      * there are events to poll. When unparked, it does non-blocking polls and parks
250      * again when there are no more events. The sub-poller yields after each poll to help
251      * with fairness and to avoid re-registering with the master poller where possible.
252      */
253     private void subPollerLoop(Poller masterPoller) {
254         assert Thread.currentThread().isVirtual();
255         try {
256             int polled = 0;
257             for (;;) {
258                 if (polled == 0) {
259                     masterPoller.poll(fdVal(), 0, () -> true);  // park
260                 } else {
261                     Thread.yield();
262                 }
263                 polled = poll(0);
264             }
265         } catch (Exception e) {
266             e.printStackTrace();
267         }
268     }
269 
270     /**
271      * The Pollers used for read and write events.
272      */
273     private static class Pollers {
274         private final PollerProvider provider;
275         private final Poller.Mode pollerMode;
276         private final Poller masterPoller;
277         private final Poller[] readPollers;
278         private final Poller[] writePollers;
279 
280         // used by start method to executor is kept alive
281         private Executor executor;
282 
283         /**
284          * Creates the Poller instances based on configuration.
285          */
286         Pollers() throws IOException {
287             PollerProvider provider = PollerProvider.provider();
288             Poller.Mode mode;
289             String s = GetPropertyAction.privilegedGetProperty("jdk.pollerMode");
290             if (s != null) {
291                 if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) {
292                     mode = Mode.SYSTEM_THREADS;
293                 } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) {
294                     mode = Mode.VTHREAD_POLLERS;
295                 } else {
296                     throw new RuntimeException("Can't parse '" + s + "' as polling mode");
297                 }
298             } else {
299                 mode = provider.defaultPollerMode();
300             }
301 
302             // vthread poller mode needs a master poller
303             Poller masterPoller = (mode == Mode.VTHREAD_POLLERS)
304                     ? provider.readPoller(false)
305                     : null;
306 
307             // read pollers (or sub-pollers)
308             int readPollerCount = pollerCount("jdk.readPollers", provider.defaultReadPollers(mode));
309             Poller[] readPollers = new Poller[readPollerCount];
310             for (int i = 0; i < readPollerCount; i++) {
311                 readPollers[i] = provider.readPoller(mode == Mode.VTHREAD_POLLERS);
312             }
313 
314             // write pollers (or sub-pollers)
315             int writePollerCount = pollerCount("jdk.writePollers", provider.defaultWritePollers(mode));
316             Poller[] writePollers = new Poller[writePollerCount];
317             for (int i = 0; i < writePollerCount; i++) {
318                 writePollers[i] = provider.writePoller(mode == Mode.VTHREAD_POLLERS);
319             }
320 
321             this.provider = provider;
322             this.pollerMode = mode;
323             this.masterPoller = masterPoller;
324             this.readPollers = readPollers;
325             this.writePollers = writePollers;
326         }
327 
328         /**
329          * Starts the Poller threads.
330          */
331         void start() {
332             if (pollerMode == Mode.VTHREAD_POLLERS) {
333                 startPlatformThread("MasterPoller", masterPoller::pollerLoop);
334                 ThreadFactory factory = Thread.ofVirtual()
335                         .inheritInheritableThreadLocals(false)
336                         .name("SubPoller-", 0)
337                         .uncaughtExceptionHandler((t, e) -> e.printStackTrace())
338                         .factory();
339                 executor = Executors.newThreadPerTaskExecutor(factory);
340                 Arrays.stream(readPollers).forEach(p -> {
341                     executor.execute(() -> p.subPollerLoop(masterPoller));
342                 });
343                 Arrays.stream(writePollers).forEach(p -> {
344                     executor.execute(() -> p.subPollerLoop(masterPoller));
345                 });
346             } else {
347                 Arrays.stream(readPollers).forEach(p -> {
348                     startPlatformThread("Read-Poller", p::pollerLoop);
349                 });
350                 Arrays.stream(writePollers).forEach(p -> {
351                     startPlatformThread("Write-Poller", p::pollerLoop);
352                 });
353             }
354         }
355 
356         /**
357          * Returns the master poller, or null if there is no master poller.
358          */
359         Poller masterPoller() {
360             return masterPoller;
361         }
362 
363         /**
364          * Returns the read poller for the given file descriptor.
365          */
366         Poller readPoller(int fdVal) {
367             int index = provider.fdValToIndex(fdVal, readPollers.length);
368             return readPollers[index];
369         }
370 
371         /**
372          * Returns the write poller for the given file descriptor.
373          */
374         Poller writePoller(int fdVal) {
375             int index = provider.fdValToIndex(fdVal, writePollers.length);
376             return writePollers[index];
377         }
378 
379         /**
380          * Reads the given property name to get the poller count. If the property is
381          * set then the value must be a power of 2. Returns 1 if the property is not
382          * set.
383          * @throws IllegalArgumentException if the property is set to a value that
384          * is not a power of 2.
385          */
386         private static int pollerCount(String propName, int defaultCount) {
387             String s = GetPropertyAction.privilegedGetProperty(propName);
388             int count = (s != null) ? Integer.parseInt(s) : defaultCount;
389 
390             // check power of 2
391             if (count != Integer.highestOneBit(count)) {
392                 String msg = propName + " is set to a value that is not a power of 2";
393                 throw new IllegalArgumentException(msg);
394             }
395             return count;
396         }
397 
398         /**
399          * Starts a platform thread to run the given task.
400          */
401         private void startPlatformThread(String name, Runnable task) {
402             try {
403                 Thread thread = InnocuousThread.newSystemThread(name, task);
404                 thread.setDaemon(true);
405                 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
406                 thread.start();
407             } catch (Exception e) {
408                 throw new InternalError(e);
409             }
410         }
411     }
412 }