1 /* 2 * Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.nio.ch; 26 27 import java.io.IOException; 28 import java.util.Arrays; 29 import java.util.Map; 30 import java.util.concurrent.ConcurrentHashMap; 31 import java.util.concurrent.Executor; 32 import java.util.concurrent.Executors; 33 import java.util.concurrent.ThreadFactory; 34 import java.util.concurrent.locks.LockSupport; 35 import java.util.function.BooleanSupplier; 36 import jdk.internal.misc.InnocuousThread; 37 import sun.security.action.GetPropertyAction; 38 39 /** 40 * Polls file descriptors. Virtual threads invoke the poll method to park 41 * until a given file descriptor is ready for I/O. 42 */ 43 abstract class Poller { 44 private static final Pollers POLLERS; 45 static { 46 try { 47 var pollers = new Pollers(); 48 pollers.start(); 49 POLLERS = pollers; 50 } catch (IOException ioe) { 51 throw new ExceptionInInitializerError(ioe); 52 } 53 } 54 55 // maps file descriptors to parked Thread 56 private final Map<Integer, Thread> map = new ConcurrentHashMap<>(); 57 58 /** 59 * Poller mode. 60 */ 61 enum Mode { 62 /** 63 * ReadPoller and WritePoller are dedicated platform threads that block waiting 64 * for events and unpark virtual threads when file descriptors are ready for I/O. 65 */ 66 SYSTEM_THREADS, 67 68 /** 69 * ReadPoller and WritePoller threads are virtual threads that poll for events, 70 * yielding between polls and unparking virtual threads when file descriptors are 71 * ready for I/O. If there are no events then the poller threads park until there 72 * are I/O events to poll. This mode helps to integrate polling with virtual 73 * thread scheduling. The approach is similar to the default scheme in "User-level 74 * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020 75 * (https://dl.acm.org/doi/10.1145/3379483). 76 */ 77 VTHREAD_POLLERS 78 } 79 80 /** 81 * Initialize a Poller. 82 */ 83 protected Poller() { 84 } 85 86 /** 87 * Returns the poller's file descriptor, used when the read and write poller threads 88 * are virtual threads. 89 * 90 * @throws UnsupportedOperationException if not supported 91 */ 92 int fdVal() { 93 throw new UnsupportedOperationException(); 94 } 95 96 /** 97 * Register the file descriptor. The registration is "one shot", meaning it should 98 * be polled at most once. 99 */ 100 abstract void implRegister(int fdVal) throws IOException; 101 102 /** 103 * Deregister the file descriptor. 104 * @param polled true if the file descriptor has already been polled 105 */ 106 abstract void implDeregister(int fdVal, boolean polled); 107 108 /** 109 * Poll for events. The {@link #polled(int)} method is invoked for each 110 * polled file descriptor. 111 * 112 * @param timeout if positive then block for up to {@code timeout} milliseconds, 113 * if zero then don't block, if -1 then block indefinitely 114 * @return the number of file descriptors polled 115 */ 116 abstract int poll(int timeout) throws IOException; 117 118 /** 119 * Callback by the poll method when a file descriptor is polled. 120 */ 121 final void polled(int fdVal) { 122 wakeup(fdVal); 123 } 124 125 /** 126 * Parks the current thread until a file descriptor is ready for the given op. 127 * @param fdVal the file descriptor 128 * @param event POLLIN or POLLOUT 129 * @param nanos the waiting time or 0 to wait indefinitely 130 * @param supplier supplies a boolean to indicate if the enclosing object is open 131 */ 132 static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier) 133 throws IOException 134 { 135 assert nanos >= 0L; 136 if (event == Net.POLLIN) { 137 POLLERS.readPoller(fdVal).poll(fdVal, nanos, supplier); 138 } else if (event == Net.POLLOUT) { 139 POLLERS.writePoller(fdVal).poll(fdVal, nanos, supplier); 140 } else { 141 assert false; 142 } 143 } 144 145 /** 146 * Parks the current thread until a Selector's file descriptor is ready. 147 * @param fdVal the Selector's file descriptor 148 * @param nanos the waiting time or 0 to wait indefinitely 149 */ 150 static void pollSelector(int fdVal, long nanos) throws IOException { 151 assert nanos >= 0L; 152 Poller poller = POLLERS.masterPoller(); 153 if (poller == null) { 154 poller = POLLERS.readPoller(fdVal); 155 } 156 poller.poll(fdVal, nanos, () -> true); 157 } 158 159 /** 160 * If there is a thread polling the given file descriptor for the given event then 161 * the thread is unparked. 162 */ 163 static void stopPoll(int fdVal, int event) { 164 if (event == Net.POLLIN) { 165 POLLERS.readPoller(fdVal).wakeup(fdVal); 166 } else if (event == Net.POLLOUT) { 167 POLLERS.writePoller(fdVal).wakeup(fdVal); 168 } else { 169 throw new IllegalArgumentException(); 170 } 171 } 172 173 /** 174 * If there are any threads polling the given file descriptor then they are unparked. 175 */ 176 static void stopPoll(int fdVal) { 177 stopPoll(fdVal, Net.POLLIN); 178 stopPoll(fdVal, Net.POLLOUT); 179 } 180 181 /** 182 * Parks the current thread until a file descriptor is ready. 183 */ 184 private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException { 185 register(fdVal); 186 try { 187 boolean isOpen = supplier.getAsBoolean(); 188 if (isOpen) { 189 if (nanos > 0) { 190 LockSupport.parkNanos(nanos); 191 } else { 192 LockSupport.park(); 193 } 194 } 195 } finally { 196 deregister(fdVal); 197 } 198 } 199 200 /** 201 * Registers the file descriptor to be polled at most once when the file descriptor 202 * is ready for I/O. 203 */ 204 private void register(int fdVal) throws IOException { 205 Thread previous = map.put(fdVal, Thread.currentThread()); 206 assert previous == null; 207 implRegister(fdVal); 208 } 209 210 /** 211 * Deregister the file descriptor so that the file descriptor is not polled. 212 */ 213 private void deregister(int fdVal) { 214 Thread previous = map.remove(fdVal); 215 boolean polled = (previous == null); 216 assert polled || previous == Thread.currentThread(); 217 implDeregister(fdVal, polled); 218 } 219 220 /** 221 * Unparks any thread that is polling the given file descriptor. 222 */ 223 private void wakeup(int fdVal) { 224 Thread t = map.remove(fdVal); 225 if (t != null) { 226 LockSupport.unpark(t); 227 } 228 } 229 230 /** 231 * Master polling loop. The {@link #polled(int)} method is invoked for each file 232 * descriptor that is polled. 233 */ 234 private void pollerLoop() { 235 try { 236 for (;;) { 237 poll(-1); 238 } 239 } catch (Exception e) { 240 e.printStackTrace(); 241 } 242 } 243 244 /** 245 * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file 246 * descriptor that is polled. 247 * 248 * The sub-poller registers its file descriptor with the master poller to park until 249 * there are events to poll. When unparked, it does non-blocking polls and parks 250 * again when there are no more events. The sub-poller yields after each poll to help 251 * with fairness and to avoid re-registering with the master poller where possible. 252 */ 253 private void subPollerLoop(Poller masterPoller) { 254 assert Thread.currentThread().isVirtual(); 255 try { 256 int polled = 0; 257 for (;;) { 258 if (polled == 0) { 259 masterPoller.poll(fdVal(), 0, () -> true); // park 260 } else { 261 Thread.yield(); 262 } 263 polled = poll(0); 264 } 265 } catch (Exception e) { 266 e.printStackTrace(); 267 } 268 } 269 270 /** 271 * The Pollers used for read and write events. 272 */ 273 private static class Pollers { 274 private final PollerProvider provider; 275 private final Poller.Mode pollerMode; 276 private final Poller masterPoller; 277 private final Poller[] readPollers; 278 private final Poller[] writePollers; 279 280 // used by start method to executor is kept alive 281 private Executor executor; 282 283 /** 284 * Creates the Poller instances based on configuration. 285 */ 286 Pollers() throws IOException { 287 PollerProvider provider = PollerProvider.provider(); 288 Poller.Mode mode; 289 String s = GetPropertyAction.privilegedGetProperty("jdk.pollerMode"); 290 if (s != null) { 291 if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) { 292 mode = Mode.SYSTEM_THREADS; 293 } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) { 294 mode = Mode.VTHREAD_POLLERS; 295 } else { 296 throw new RuntimeException("Can't parse '" + s + "' as polling mode"); 297 } 298 } else { 299 mode = provider.defaultPollerMode(); 300 } 301 302 // vthread poller mode needs a master poller 303 Poller masterPoller = (mode == Mode.VTHREAD_POLLERS) 304 ? provider.readPoller(false) 305 : null; 306 307 // read pollers (or sub-pollers) 308 int readPollerCount = pollerCount("jdk.readPollers", provider.defaultReadPollers(mode)); 309 Poller[] readPollers = new Poller[readPollerCount]; 310 for (int i = 0; i < readPollerCount; i++) { 311 readPollers[i] = provider.readPoller(mode == Mode.VTHREAD_POLLERS); 312 } 313 314 // write pollers (or sub-pollers) 315 int writePollerCount = pollerCount("jdk.writePollers", provider.defaultWritePollers(mode)); 316 Poller[] writePollers = new Poller[writePollerCount]; 317 for (int i = 0; i < writePollerCount; i++) { 318 writePollers[i] = provider.writePoller(mode == Mode.VTHREAD_POLLERS); 319 } 320 321 this.provider = provider; 322 this.pollerMode = mode; 323 this.masterPoller = masterPoller; 324 this.readPollers = readPollers; 325 this.writePollers = writePollers; 326 } 327 328 /** 329 * Starts the Poller threads. 330 */ 331 void start() { 332 if (pollerMode == Mode.VTHREAD_POLLERS) { 333 startPlatformThread("MasterPoller", masterPoller::pollerLoop); 334 ThreadFactory factory = Thread.ofVirtual() 335 .inheritInheritableThreadLocals(false) 336 .name("SubPoller-", 0) 337 .uncaughtExceptionHandler((t, e) -> e.printStackTrace()) 338 .factory(); 339 executor = Executors.newThreadPerTaskExecutor(factory); 340 Arrays.stream(readPollers).forEach(p -> { 341 executor.execute(() -> p.subPollerLoop(masterPoller)); 342 }); 343 Arrays.stream(writePollers).forEach(p -> { 344 executor.execute(() -> p.subPollerLoop(masterPoller)); 345 }); 346 } else { 347 Arrays.stream(readPollers).forEach(p -> { 348 startPlatformThread("Read-Poller", p::pollerLoop); 349 }); 350 Arrays.stream(writePollers).forEach(p -> { 351 startPlatformThread("Write-Poller", p::pollerLoop); 352 }); 353 } 354 } 355 356 /** 357 * Returns the master poller, or null if there is no master poller. 358 */ 359 Poller masterPoller() { 360 return masterPoller; 361 } 362 363 /** 364 * Returns the read poller for the given file descriptor. 365 */ 366 Poller readPoller(int fdVal) { 367 int index = provider.fdValToIndex(fdVal, readPollers.length); 368 return readPollers[index]; 369 } 370 371 /** 372 * Returns the write poller for the given file descriptor. 373 */ 374 Poller writePoller(int fdVal) { 375 int index = provider.fdValToIndex(fdVal, writePollers.length); 376 return writePollers[index]; 377 } 378 379 /** 380 * Reads the given property name to get the poller count. If the property is 381 * set then the value must be a power of 2. Returns 1 if the property is not 382 * set. 383 * @throws IllegalArgumentException if the property is set to a value that 384 * is not a power of 2. 385 */ 386 private static int pollerCount(String propName, int defaultCount) { 387 String s = GetPropertyAction.privilegedGetProperty(propName); 388 int count = (s != null) ? Integer.parseInt(s) : defaultCount; 389 390 // check power of 2 391 if (count != Integer.highestOneBit(count)) { 392 String msg = propName + " is set to a value that is not a power of 2"; 393 throw new IllegalArgumentException(msg); 394 } 395 return count; 396 } 397 398 /** 399 * Starts a platform thread to run the given task. 400 */ 401 private void startPlatformThread(String name, Runnable task) { 402 try { 403 Thread thread = InnocuousThread.newSystemThread(name, task); 404 thread.setDaemon(true); 405 thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace()); 406 thread.start(); 407 } catch (Exception e) { 408 throw new InternalError(e); 409 } 410 } 411 } 412 }