1 /*
  2  * Copyright (c) 2017, 2024, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package sun.nio.ch;
 26 
 27 import java.io.IOException;
 28 import java.io.UncheckedIOException;
 29 import java.util.Arrays;
 30 import java.util.List;
 31 import java.util.Map;
 32 import java.util.Objects;
 33 import java.util.concurrent.ConcurrentHashMap;
 34 import java.util.concurrent.Executor;
 35 import java.util.concurrent.Executors;
 36 import java.util.concurrent.ThreadFactory;
 37 import java.util.concurrent.locks.LockSupport;
 38 import java.util.function.BooleanSupplier;
 39 import java.util.function.Supplier;
 40 import jdk.internal.access.JavaLangAccess;
 41 import jdk.internal.access.SharedSecrets;
 42 import jdk.internal.misc.InnocuousThread;
 43 import jdk.internal.vm.annotation.Stable;
 44 
 45 /**
 46  * Polls file descriptors. Virtual threads invoke the poll method to park
 47  * until a given file descriptor is ready for I/O.
 48  */
 49 public abstract class Poller {
 50     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 51 
 52     private static final PollerProvider PROVIDER = PollerProvider.provider();
 53 
 54     private static final Executor DEFAULT_SCHEDULER = JLA.virtualThreadDefaultScheduler();
 55 
 56     private static Supplier<Mode> POLLER_MODE = StableValue.supplier(Poller::pollerMode);
 57 
 58     private static Supplier<PollerGroup> DEFAULT_POLLER_GROUP = StableValue.supplier(PollerGroup::create);
 59 
 60     // maps scheduler to PollerGroup, custom schedulers can't be GC'ed at this time
 61     private static final Map<Executor, PollerGroup> POLLER_GROUPS = new ConcurrentHashMap<>();
 62 
 63     // the poller or sub-poller thread
 64     private @Stable Thread owner;
 65 
 66     // maps file descriptors to parked Thread
 67     private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
 68 
 69     /**
 70      * Poller mode.
 71      */
 72     enum Mode {
 73         /**
 74          * ReadPoller and WritePoller are dedicated platform threads that block waiting
 75          * for events and unpark virtual threads when file descriptors are ready for I/O.
 76          */
 77         SYSTEM_THREADS,
 78 
 79         /**
 80          * ReadPoller and WritePoller threads are virtual threads that poll for events,
 81          * yielding between polls and unparking virtual threads when file descriptors are
 82          * ready for I/O. If there are no events then the poller threads park until there
 83          * are I/O events to poll. This mode helps to integrate polling with virtual
 84          * thread scheduling. The approach is similar to the default scheme in "User-level
 85          * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
 86          * (https://dl.acm.org/doi/10.1145/3379483).
 87          */
 88         VTHREAD_POLLERS
 89     }
 90 
 91     /**
 92      * Initialize a Poller.
 93      */
 94     protected Poller() {
 95     }
 96 
 97     /**
 98      * Returns the poller's file descriptor, used when the read and write poller threads
 99      * are virtual threads.
100      *
101      * @throws UnsupportedOperationException if not supported
102      */
103     int fdVal() {
104         throw new UnsupportedOperationException();
105     }
106 
107     /**
108      * Register the file descriptor. The registration is "one shot", meaning it should
109      * be polled at most once.
110      */
111     abstract void implRegister(int fdVal) throws IOException;
112 
113     /**
114      * Deregister the file descriptor.
115      * @param polled true if the file descriptor has already been polled
116      */
117     abstract void implDeregister(int fdVal, boolean polled);
118 
119     /**
120      * Poll for events. The {@link #polled(int)} method is invoked for each
121      * polled file descriptor.
122      *
123      * @param timeout if positive then block for up to {@code timeout} milliseconds,
124      *     if zero then don't block, if -1 then block indefinitely
125      * @return the number of file descriptors polled
126      */
127     abstract int poll(int timeout) throws IOException;
128 
129     /**
130      * Callback by the poll method when a file descriptor is polled.
131      */
132     final void polled(int fdVal) {
133         wakeup(fdVal);
134     }
135 
136     /**
137      * Parks the current thread until a file descriptor is ready for the given op.
138      * @param fdVal the file descriptor
139      * @param event POLLIN or POLLOUT
140      * @param nanos the waiting time or 0 to wait indefinitely
141      * @param supplier supplies a boolean to indicate if the enclosing object is open
142      */
143     static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier)
144         throws IOException
145     {
146         assert nanos >= 0L;
147         PollerGroup pollerGroup = PollerGroup.groupFor(Thread.currentThread());
148         if (event == Net.POLLIN) {
149             pollerGroup.readPoller(fdVal).poll(fdVal, nanos, supplier);
150         } else if (event == Net.POLLOUT) {
151             pollerGroup.writePoller(fdVal).poll(fdVal, nanos, supplier);
152         } else {
153             assert false;
154         }
155     }
156 
157     /**
158      * Parks the current thread until a Selector's file descriptor is ready.
159      * @param fdVal the Selector's file descriptor
160      * @param nanos the waiting time or 0 to wait indefinitely
161      */
162     static void pollSelector(int fdVal, long nanos) throws IOException {
163         assert nanos >= 0L;
164         PollerGroup pollerGroup = PollerGroup.groupFor(Thread.currentThread());
165         Poller poller = pollerGroup.masterPoller();
166         if (poller == null) {
167             poller = pollerGroup.readPoller(fdVal);
168         }
169         poller.poll(fdVal, nanos, () -> true);
170     }
171 
172     /**
173      * Unpark the given thread so that it stops polling.
174      */
175     static void stopPoll(Thread thread) {
176         LockSupport.unpark(thread);
177     }
178 
179     /**
180      * Parks the current thread until a file descriptor is ready.
181      */
182     private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException {
183         register(fdVal);
184         try {
185             boolean isOpen = supplier.getAsBoolean();
186             if (isOpen) {
187                 if (nanos > 0) {
188                     LockSupport.parkNanos(nanos);
189                 } else {
190                     LockSupport.park();
191                 }
192             }
193         } finally {
194             deregister(fdVal);
195         }
196     }
197 
198     /**
199      * Registers the file descriptor to be polled at most once when the file descriptor
200      * is ready for I/O.
201      */
202     private void register(int fdVal) throws IOException {
203         Thread previous = map.put(fdVal, Thread.currentThread());
204         assert previous == null;
205         try {
206             implRegister(fdVal);
207         } catch (Throwable t) {
208             map.remove(fdVal);
209             throw t;
210         }
211     }
212 
213     /**
214      * Deregister the file descriptor so that the file descriptor is not polled.
215      */
216     private void deregister(int fdVal) {
217         Thread previous = map.remove(fdVal);
218         boolean polled = (previous == null);
219         assert polled || previous == Thread.currentThread();
220         implDeregister(fdVal, polled);
221     }
222 
223     /**
224      * Unparks any thread that is polling the given file descriptor.
225      */
226     private void wakeup(int fdVal) {
227         Thread t = map.remove(fdVal);
228         if (t != null) {
229             LockSupport.unpark(t);
230         }
231     }
232 
233     /**
234      * Master polling loop. The {@link #polled(int)} method is invoked for each file
235      * descriptor that is polled.
236      */
237     private void pollerLoop() {
238         owner = Thread.currentThread();
239         try {
240             for (;;) {
241                 poll(-1);
242             }
243         } catch (Exception e) {
244             e.printStackTrace();
245         }
246     }
247 
248     /**
249      * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
250      * descriptor that is polled.
251      *
252      * The sub-poller registers its file descriptor with the master poller to park until
253      * there are events to poll. When unparked, it does non-blocking polls and parks
254      * again when there are no more events. The sub-poller yields after each poll to help
255      * with fairness and to avoid re-registering with the master poller where possible.
256      */
257     private void subPollerLoop(Poller masterPoller) {
258         assert Thread.currentThread().isVirtual();
259         owner = Thread.currentThread();
260         try {
261             int polled = 0;
262             for (;;) {
263                 if (polled == 0) {
264                     masterPoller.poll(fdVal(), 0, () -> true);  // park
265                 } else {
266                     Thread.yield();
267                 }
268                 polled = poll(0);
269             }
270         } catch (Exception e) {
271             e.printStackTrace();
272         }
273     }
274 
275     @Override
276     public String toString() {
277         return String.format("%s [registered = %d, owner = %s]",
278             Objects.toIdentityString(this), map.size(), owner);
279     }
280 
281     /**
282      * The read/write pollers.
283      */
284     private static class PollerGroup {
285         private final Executor scheduler;
286         private final Poller[] readPollers;
287         private final Poller[] writePollers;
288         private final Poller masterPoller;
289         private final Executor executor;
290 
291         PollerGroup(Executor scheduler) throws IOException {
292             Mode mode = Poller.POLLER_MODE.get();
293             int readPollerCount, writePollerCount;
294             Poller masterPoller;
295             if (scheduler == DEFAULT_SCHEDULER) {
296                 readPollerCount = pollerCount("jdk.readPollers", PROVIDER.defaultReadPollers(mode));
297                 writePollerCount = pollerCount("jdk.writePollers", PROVIDER.defaultWritePollers(mode));
298                 masterPoller = (mode == Mode.VTHREAD_POLLERS)
299                         ? PROVIDER.readPoller(false)
300                         : null;
301             } else {
302                 readPollerCount = 1;
303                 writePollerCount = 1;
304                 if (mode == Mode.VTHREAD_POLLERS) {
305                     masterPoller = DEFAULT_POLLER_GROUP.get().masterPoller();
306                 } else {
307                     masterPoller = null;
308                 }
309             }
310 
311             Executor executor = null;
312             if (mode == Mode.VTHREAD_POLLERS) {
313                 String namePrefix;
314                 if (scheduler == DEFAULT_SCHEDULER) {
315                     namePrefix = "SubPoller-";
316                 } else {
317                     namePrefix = Objects.toIdentityString(scheduler) + "-SubPoller-";
318                 }
319                 @SuppressWarnings("restricted")
320                 ThreadFactory factory = Thread.ofVirtual()
321                         .scheduler(scheduler)
322                         .inheritInheritableThreadLocals(false)
323                         .name(namePrefix, 0)
324                         .uncaughtExceptionHandler((_, e) -> e.printStackTrace())
325                         .factory();
326                 executor = Executors.newThreadPerTaskExecutor(factory);
327             }
328 
329             // read pollers (or sub-pollers)
330             Poller[] readPollers = new Poller[readPollerCount];
331             for (int i = 0; i < readPollerCount; i++) {
332                 readPollers[i] = PROVIDER.readPoller(mode == Mode.VTHREAD_POLLERS);
333             }
334 
335             // write pollers (or sub-pollers)
336             Poller[] writePollers = new Poller[writePollerCount];
337             for (int i = 0; i < writePollerCount; i++) {
338                 writePollers[i] = PROVIDER.writePoller(mode == Mode.VTHREAD_POLLERS);
339             }
340 
341             this.scheduler = scheduler;
342             this.masterPoller = masterPoller;
343             this.readPollers = readPollers;
344             this.writePollers = writePollers;
345             this.executor = executor;
346         }
347 
348         static PollerGroup create(Executor scheduler) {
349             try {
350                 return new PollerGroup(scheduler).start();
351             } catch (IOException ioe) {
352                 throw new UncheckedIOException(ioe);
353             }
354         }
355 
356         static PollerGroup create() {
357             return create(DEFAULT_SCHEDULER);
358         }
359 
360         /**
361          * Start poller threads.
362          */
363         private PollerGroup start() {
364             if (POLLER_MODE.get() == Mode.VTHREAD_POLLERS) {
365                 if (scheduler == DEFAULT_SCHEDULER) {
366                     startPlatformThread("Master-Poller", masterPoller::pollerLoop);
367                 }
368                 Arrays.stream(readPollers).forEach(p -> {
369                     executor.execute(() -> p.subPollerLoop(masterPoller));
370                 });
371                 Arrays.stream(writePollers).forEach(p -> {
372                     executor.execute(() -> p.subPollerLoop(masterPoller));
373                 });
374             } else {
375                 // Mode.SYSTEM_THREADS
376                 Arrays.stream(readPollers).forEach(p -> {
377                     startPlatformThread("Read-Poller", p::pollerLoop);
378                 });
379                 Arrays.stream(writePollers).forEach(p -> {
380                     startPlatformThread("Write-Poller", p::pollerLoop);
381                 });
382             }
383             return this;
384         }
385 
386         Poller masterPoller() {
387             return masterPoller;
388         }
389 
390         List<Poller> readPollers() {
391             return List.of(readPollers);
392         }
393 
394         List<Poller> writePollers() {
395             return List.of(writePollers);
396         }
397 
398         /**
399          * Returns the read poller for the given file descriptor.
400          */
401         Poller readPoller(int fdVal) {
402             int index = PROVIDER.fdValToIndex(fdVal, readPollers.length);
403             return readPollers[index];
404         }
405 
406         /**
407          * Returns the write poller for the given file descriptor.
408          */
409         Poller writePoller(int fdVal) {
410             int index = PROVIDER.fdValToIndex(fdVal, writePollers.length);
411             return writePollers[index];
412         }
413 
414         /**
415          * Reads the given property name to get the poller count. If the property is
416          * set then the value must be a power of 2. Returns 1 if the property is not
417          * set.
418          * @throws IllegalArgumentException if the property is set to a value that
419          * is not a power of 2.
420          */
421         private static int pollerCount(String propName, int defaultCount) {
422             String s = System.getProperty(propName);
423             int count = (s != null) ? Integer.parseInt(s) : defaultCount;
424 
425             // check power of 2
426             if (count != Integer.highestOneBit(count)) {
427                 String msg = propName + " is set to a value that is not a power of 2";
428                 throw new IllegalArgumentException(msg);
429             }
430             return count;
431         }
432 
433         /**
434          * Starts a platform thread to run the given task.
435          */
436         private void startPlatformThread(String name, Runnable task) {
437             try {
438                 Thread thread = InnocuousThread.newSystemThread(name, task);
439                 thread.setDaemon(true);
440                 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
441                 thread.start();
442             } catch (Exception e) {
443                 throw new InternalError(e);
444             }
445         }
446 
447         /**
448          * Returns the PollerGroup that the given thread uses to poll file descriptors.
449          */
450         static PollerGroup groupFor(Thread thread) {
451             if (POLLER_MODE.get() == Mode.SYSTEM_THREADS) {
452                 return DEFAULT_POLLER_GROUP.get();
453             }
454             Executor scheduler;
455             if (thread.isVirtual()) {
456                 scheduler = JLA.virtualThreadScheduler(thread);
457             } else {
458                 scheduler = DEFAULT_SCHEDULER;
459             }
460             return POLLER_GROUPS.computeIfAbsent(scheduler, _ -> PollerGroup.create(scheduler));
461         }
462     }
463 
464     /**
465      * Returns the poller mode.
466      */
467     private static Mode pollerMode() {
468         String s = System.getProperty("jdk.pollerMode");
469         if (s != null) {
470             if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) {
471                 return Mode.SYSTEM_THREADS;
472             } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) {
473                 return Mode.VTHREAD_POLLERS;
474             } else {
475                 throw new RuntimeException("Can't parse '" + s + "' as polling mode");
476             }
477         } else {
478             return PROVIDER.defaultPollerMode();
479         }
480     }
481 
482     /**
483      * Return the master poller or null if there is no master poller.
484      */
485     public static Poller masterPoller() {
486         return DEFAULT_POLLER_GROUP.get().masterPoller();
487     }
488 
489     /**
490      * Return the list of read pollers.
491      */
492     public static List<Poller> readPollers() {
493         return DEFAULT_POLLER_GROUP.get().readPollers();
494     }
495 
496     /**
497      * Return the list of write pollers.
498      */
499     public static List<Poller> writePollers() {
500         return DEFAULT_POLLER_GROUP.get().writePollers();
501     }
502 }