1 /*
  2  * Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package sun.nio.ch;
 26 
 27 import java.io.IOError;
 28 import java.io.IOException;
 29 import java.util.Map;
 30 import java.util.concurrent.ConcurrentHashMap;
 31 import java.util.stream.Stream;
 32 import jdk.internal.misc.InnocuousThread;
 33 import jdk.internal.access.JavaLangAccess;
 34 import jdk.internal.access.SharedSecrets;
 35 import jdk.internal.misc.VirtualThreads;
 36 import sun.security.action.GetPropertyAction;
 37 
 38 /**
 39  * A Poller of file descriptors. A virtual thread registers the file descriptor
 40  * for a socket with a Poller before parking. The poller unparks the thread when
 41  * the socket is ready for I/O.
 42  */
 43 public abstract class Poller {
 44     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 45     private static final Poller[] READ_POLLERS;
 46     private static final Poller[] WRITE_POLLERS;
 47     private static final int READ_MASK, WRITE_MASK;
 48 
 49     static {
 50         PollerProvider provider = PollerProvider.provider();
 51         try {
 52             Poller[] readPollers = createReadPollers(provider);
 53             READ_POLLERS = readPollers;
 54             READ_MASK = readPollers.length - 1;
 55             Poller[] writePollers = createWritePollers(provider);
 56             WRITE_POLLERS = writePollers;
 57             WRITE_MASK = writePollers.length - 1;
 58         } catch (IOException ioe) {
 59             throw new IOError(ioe);
 60         }
 61     }
 62 
 63     /**
 64      * Create the read poller(s).
 65      */
 66     private static Poller[] createReadPollers(PollerProvider provider) throws IOException {
 67         int readPollerCount = pollerCount("jdk.readPollers");
 68         Poller[] readPollers = new Poller[readPollerCount];
 69         for (int i = 0; i< readPollerCount; i++) {
 70             var poller = provider.readPoller();
 71             poller.startPollerThread("Read-Poller-" + i);
 72             readPollers[i] = poller;
 73         }
 74         return readPollers;
 75     }
 76 
 77     /**
 78      * Create the write poller(s).
 79      */
 80     private static Poller[] createWritePollers(PollerProvider provider) throws IOException {
 81         int writePollerCount = pollerCount("jdk.writePollers");
 82         Poller[] writePollers = new Poller[writePollerCount];
 83         for (int i = 0; i< writePollerCount; i++) {
 84             var poller = provider.writePoller();
 85             poller.startPollerThread("Write-Poller-" + i);
 86             writePollers[i] = poller;
 87         }
 88         return writePollers;
 89     }
 90 
 91     /**
 92      * Reads the given property name to get the poller count. If the property is
 93      * set then the value must be a power of 2. Returns 1 if the property is not
 94      * set.
 95      * @throws IllegalArgumentException if the property is set to a value that
 96      * is not a power of 2.
 97      */
 98     private static int pollerCount(String propName) {
 99         String s = GetPropertyAction.privilegedGetProperty(propName, "1");
100         int count = Integer.parseInt(s);
101 
102         // check power of 2
103         if (count != (1 << log2(count))) {
104             String msg = propName + " is set to a vale that is not a power of 2";
105             throw new IllegalArgumentException(msg);
106         }
107         return count;
108     }
109 
110     private static int log2(int n) {
111         return 31 - Integer.numberOfLeadingZeros(n);
112     }
113 
114     /**
115      * Start a platform thread with the given name to poll file descriptors
116      * registered with this poller.
117      */
118     private void startPollerThread(String name) {
119         Runnable task = () -> {
120             try {
121                 for (;;) {
122                     poll();
123                 }
124             } catch (Exception e) {
125                 e.printStackTrace();
126             }
127         };
128         try {
129             Thread thread = JLA.executeOnCarrierThread(() ->
130                 InnocuousThread.newSystemThread(name, task)
131             );
132             thread.setDaemon(true);
133             thread.start();
134         } catch (Exception e) {
135             throw new InternalError(e);
136         }
137     }
138 
139     /**
140      * Registers the current thread to be unparked when a file descriptor is
141      * ready for I/O.
142      *
143      * @throws IOException if the register fails
144      * @throws IllegalArgumentException if the event is not POLLIN or POLLOUT
145      * @throws IllegalStateException if another thread is already registered
146      *         to be unparked when the file descriptor is ready for this event
147      */
148     static void register(int fdVal, int event) throws IOException {
149         if (event == Net.POLLIN) {
150             readPoller(fdVal).register(fdVal);
151         } else if (event == Net.POLLOUT) {
152             writePoller(fdVal).register(fdVal);
153         } else {
154             throw new IllegalArgumentException("Unknown event " + event);
155         }
156     }
157 
158     /**
159      * Maps the file descriptor value to a read poller.
160      */
161     private static Poller readPoller(int fdVal) {
162         return READ_POLLERS[fdVal & READ_MASK];
163     }
164 
165     /**
166      * Maps the file descriptor value to a write poller.
167      */
168     private static Poller writePoller(int fdVal) {
169         return WRITE_POLLERS[fdVal & WRITE_MASK];
170     }
171 
172     /**
173      * Deregister the current thread so it will not be unparked when a file descriptor
174      * is ready for I/O.
175      *
176      * @throws IllegalArgumentException if the event is not POLLIN or POLLOUT
177      */
178     static void deregister(int fdVal, int event) {
179         if (event == Net.POLLIN) {
180             readPoller(fdVal).deregister(fdVal);
181         } else if (event == Net.POLLOUT) {
182             writePoller(fdVal).deregister(fdVal);
183         } else {
184             throw new IllegalArgumentException("Unknown event " + event);
185         }
186     }
187 
188     /**
189      * Stops polling the file descriptor for the given event and unpark any
190      * strand registered to be unparked when the file descriptor is ready for I/O.
191      */
192     static void stopPoll(int fdVal, int event) {
193         if (event == Net.POLLIN) {
194             readPoller(fdVal).wakeup(fdVal);
195         } else if (event == Net.POLLOUT) {
196             writePoller(fdVal).wakeup(fdVal);
197         } else {
198             throw new IllegalArgumentException();
199         }
200     }
201 
202     /**
203      * Stops polling the file descriptor and unpark any threads that are registered
204      * to be unparked when the file descriptor is ready for I/O.
205      */
206     static void stopPoll(int fdVal) {
207         stopPoll(fdVal, Net.POLLIN);
208         stopPoll(fdVal, Net.POLLOUT);
209     }
210 
211     private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
212 
213     protected Poller() { }
214 
215     private void register(int fdVal) throws IOException {
216         assert Thread.currentThread().isVirtual();
217         Thread previous = map.putIfAbsent(fdVal, Thread.currentThread());
218         assert previous == null;
219         implRegister(fdVal);
220     }
221 
222     private void deregister(int fdVal) {
223         Thread previous = map.remove(fdVal);
224         assert previous == null || previous == Thread.currentThread();
225     }
226 
227     private Stream<Thread> registeredThreads() {
228         return map.values().stream();
229     }
230 
231     private void wakeup(int fdVal) {
232         Thread t = map.remove(fdVal);
233         if (t != null) {
234             implDeregister(fdVal);
235             VirtualThreads.unpark(t);
236         }
237     }
238 
239     /**
240      * Called by the polling facility when the file descriptor is polled
241      */
242     final void polled(int fdVal) {
243         Thread t = map.remove(fdVal);
244         if (t != null) {
245             VirtualThreads.unpark(t);
246         }
247     }
248 
249     /**
250      * Poll for events. The {@link #polled(int)} method is invoked for each
251      * polled file descriptor.
252      *
253      * @param timeout if positive then block for up to {@code timeout} milliseconds,
254      *     if zero then don't block, if -1 then block indefinitely
255      */
256     abstract int poll(int timeout) throws IOException;
257 
258     /**
259      * Poll for events, blocks indefinitely.
260      */
261     final int poll() throws IOException {
262         return poll(-1);
263     }
264 
265     /**
266      * Poll for events, non-blocking.
267      */
268     final int pollNow() throws IOException {
269         return poll(0);
270     }
271 
272     /**
273      * Returns the poller's file descriptor, or -1 if none.
274      */
275     int fdVal() {
276         return -1;
277     }
278 
279     /**
280      * Register the file descriptor
281      */
282     abstract void implRegister(int fdVal) throws IOException;
283 
284     /**
285      * Deregister (or disarm) the file descriptor
286      */
287     abstract void implDeregister(int fdVal);
288 
289     /**
290      * Return a stream of all threads blocked waiting for I/O operations.
291      */
292     public static Stream<Thread> blockedThreads() {
293         Stream<Thread> s = Stream.empty();
294         for (int i = 0; i < READ_POLLERS.length; i++) {
295             s = Stream.concat(s, READ_POLLERS[i].registeredThreads());
296         }
297         for (int i = 0; i < WRITE_POLLERS.length; i++) {
298             s = Stream.concat(s, WRITE_POLLERS[i].registeredThreads());
299         }
300         return s;
301     }
302 }