1 /* 2 * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.nio.ch; 26 27 import java.lang.foreign.Arena; 28 import java.lang.foreign.MemorySegment; 29 import java.lang.foreign.ValueLayout; 30 import java.lang.ref.Cleaner.Cleanable; 31 import java.io.IOException; 32 import java.util.Map; 33 import java.util.concurrent.*; 34 import java.util.concurrent.locks.LockSupport; 35 import java.util.function.BooleanSupplier; 36 import jdk.internal.ref.CleanerFactory; 37 import sun.nio.ch.iouring.IOUring; 38 import sun.nio.ch.iouring.Cqe; 39 import sun.nio.ch.iouring.Sqe; 40 import jdk.internal.ffi.generated.iouring.*; 41 import static jdk.internal.ffi.generated.iouring.iouring_h.*; 42 43 /** 44 * Poller implementation based io_uring. 45 * 46 * @apiNote This implementation is experimental. There are many design choices, esp. 47 * around buffer management, that have to be explored. 48 */ 49 50 public class IoUringPoller extends Poller { 51 private static final Arena ARENA = Arena.ofAuto(); 52 private static final long ADDRESS_SIZE = ValueLayout.ADDRESS.byteSize(); 53 private static final int MAX_BUF_SIZE = 16384; 54 55 // submission queue polling enabled if kernel thread idle time > 0 millis 56 private static final int SQPOLL_IDLE_TIME = Integer.getInteger("jdk.io_uring.sqpoll_idle", 0); 57 58 // submition and completion queue sizes 59 private static final int DEFAULT_SQ_SIZE = (SQPOLL_IDLE_TIME > 0) ? 64 : 4; 60 private static final int SQ_SIZE = Integer.getInteger("jdk.io_uring.sqsize", DEFAULT_SQ_SIZE); 61 private static final int CQ_SIZE = Math.max(SQ_SIZE + 1, 1024); 62 63 // max completion events to consume in a blocking poll and non-blocking subpoll 64 private static final int MAX_EVENTS_PER_POLL = 64; 65 private static final int MAX_EVENTS_PER_SUBPOLL = 8; 66 67 private final int event; 68 private final IOUring ring; 69 private final EventFD readyEvent; // completion events posted to CQ ring 70 private final EventFD wakeupEvent; // wakeup event, used for shutdown 71 72 // close action, and cleaner if this is subpoller 73 private final Runnable closer; 74 private final Cleanable cleaner; 75 76 // used to coordinate access to submission queue 77 private final Object submitLock = new Object(); 78 79 // maps file descriptor to Thread when cancelling poll 80 private final Map<Integer, Thread> cancels = new ConcurrentHashMap<>(); 81 82 // the map value for in-progress read/write ops 83 private static class Op { 84 final Thread thread; 85 volatile int result; 86 Op(Thread thread) { 87 this.thread = thread; 88 } 89 Thread thread() { 90 return thread; 91 } 92 int result() { 93 return result; 94 } 95 void setResult(int result) { 96 this.result = result; 97 } 98 } 99 100 // maps user data to in-progress read/write ops 101 private final Map<Long, Op> ops; 102 103 // per poller cache of memory segments used for read/write ops 104 private final BlockingQueue<MemorySegment> memorySegmentCache; 105 106 IoUringPoller(Poller.Mode mode, 107 boolean subPoller, 108 boolean read, 109 boolean supportIoOps) throws IOException { 110 IOUring ring = new IOUring(SQ_SIZE, CQ_SIZE, 0, 0, 0, SQPOLL_IDLE_TIME); 111 EventFD wakeupEvent = null; 112 EventFD readyEvent = null; 113 114 if (subPoller) { 115 try { 116 // event to allow registering with master poller 117 readyEvent = new EventFD(); 118 ring.register_eventfd(readyEvent.efd()); 119 120 // wakeup event to allow for shutdown 121 if (mode == Poller.Mode.POLLER_PER_CARRIER) { 122 wakeupEvent = new EventFD(); 123 int efd = wakeupEvent.efd(); 124 IOUtil.configureBlocking(efd, false); 125 submitPollAdd(ring, efd, Net.POLLIN, efd); 126 enter(ring, 1); 127 } 128 } catch (Throwable e) { 129 ring.close(); 130 if (readyEvent != null) readyEvent.close(); 131 if (wakeupEvent != null) wakeupEvent.close(); 132 throw e; 133 } 134 } 135 136 this.event = (read) ? Net.POLLIN : Net.POLLOUT; 137 this.ring = ring; 138 this.readyEvent = readyEvent; 139 this.wakeupEvent = wakeupEvent; 140 141 // setup if supporting read/write ops 142 if (supportIoOps) { 143 this.ops = new ConcurrentHashMap<>(); 144 this.memorySegmentCache = new LinkedTransferQueue<>(); 145 } else { 146 this.ops = null; 147 this.memorySegmentCache = null; 148 } 149 150 // create action to close io_uring instance, register cleaner if this is a subpoller 151 this.closer = closer(ring, readyEvent, wakeupEvent); 152 if (subPoller) { 153 this.cleaner = CleanerFactory.cleaner().register(this, closer); 154 } else { 155 this.cleaner = null; 156 } 157 } 158 159 /** 160 * Returns an action to close the io_uring and release other resources. 161 */ 162 private static Runnable closer(IOUring ring, EventFD readyEvent, EventFD wakeupEvent) { 163 return () -> { 164 try { 165 ring.close(); 166 if (readyEvent != null) readyEvent.close(); 167 if (wakeupEvent != null) wakeupEvent.close(); 168 } catch (IOException _) { } 169 }; 170 } 171 172 @Override 173 void close() throws IOException { 174 if (cleaner != null) { 175 cleaner.clean(); 176 } else { 177 closer.run(); 178 } 179 } 180 181 @Override 182 int fdVal() { 183 if (readyEvent == null) { 184 throw new UnsupportedOperationException(); 185 } 186 return readyEvent.efd(); 187 } 188 189 @Override 190 void pollerPolled() throws IOException { 191 readyEvent.reset(); 192 } 193 194 /** 195 * Submits a IORING_OP_POLL_ADD op to poll a file descriptor for read or write. 196 */ 197 @Override 198 void implStartPoll(int fd) throws IOException { 199 assert fd > 0; // fd == 0 used for wakeup 200 201 synchronized (submitLock) { 202 // fd is the user data for IORING_OP_POLL_ADD request 203 submitPollAdd(ring, fd, event, fd); 204 enter(1); 205 } 206 } 207 208 /** 209 * Submits a IORING_OP_POLL_REMOVE op, and waits for it complete, to stop polling 210 * a file descriptor. A no-op if already polled. 211 */ 212 @Override 213 void implStopPoll(int fd, boolean polled) throws IOException { 214 if (!polled && !isShutdown()) { 215 cancels.put(fd, Thread.currentThread()); 216 217 synchronized (submitLock) { 218 // TBD if SQPOLL enabled, need IORING_OP_POLL_ADD to be processed 219 220 // fd was the user data for IORING_OP_POLL_ADD request 221 // -fd is the user data for IORING_OP_POLL_REMOVE request 222 submitPollRemove(fd, -fd); 223 enter(1); 224 } 225 226 while (cancels.containsKey(fd) && !isShutdown()) { 227 LockSupport.park(); 228 } 229 } 230 } 231 232 /** 233 * Uses IORING_OP_READ op to read bytes into a byte array. 234 */ 235 @Override 236 int implRead(int fd, byte[] b, int off, int len, long nanos, BooleanSupplier isOpen) throws IOException { 237 // off-heap buffer for read op 238 MemorySegment buf = takeMemorySegment(); 239 len = Math.min(len, (int) buf.byteSize()); 240 241 // use the buf address as the user_data 242 long udata = buf.address(); 243 var op = new Op(Thread.currentThread()); 244 ops.put(udata, op); 245 246 int res = 0; 247 try { 248 synchronized (submitLock) { 249 submitRead(fd, buf, len, -1L, udata); 250 } 251 if (isOpen.getAsBoolean() && !isShutdown()) { 252 if (nanos > 0) { 253 LockSupport.parkNanos(nanos); 254 } else { 255 LockSupport.park(); 256 } 257 } 258 } finally { 259 Op previous = ops.remove(udata); 260 assert previous == op; 261 262 res = op.result(); 263 try { 264 if (res > 0) { 265 // copy bytes into the byte array 266 assert res <= len; 267 MemorySegment dst = MemorySegment.ofArray(b); 268 MemorySegment.copy(buf, 0, dst, off, res); 269 } else if (res < 0) { 270 // EOF or read failed 271 if (res != -1) { 272 throw new IOException("IORING_OP_READ failed errno=" + (-res)); 273 } 274 } else { 275 // TBD if SQPOLL enabled, need the request to be processed before cancel 276 277 // read did not complete, need to cancel. If the cancel fails then 278 // we can't return the buffer to the cache. 279 cancelOp(fd, udata); 280 res = IOStatus.UNAVAILABLE; 281 } 282 } finally { 283 if (res != 0) { 284 offerMemorySegment(buf); 285 } 286 } 287 } 288 return res; 289 } 290 291 /** 292 * Uses IORING_OP_WRITE op to write bytes from a byte array. 293 */ 294 @Override 295 int implWrite(int fd, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException { 296 // off-heap buffer for write op 297 MemorySegment buf = takeMemorySegment(); 298 len = Math.min(len, (int) buf.byteSize()); 299 300 // copy the bytes from the byte array into the buffer 301 MemorySegment src = MemorySegment.ofArray(b); 302 MemorySegment.copy(src, off, buf, 0, len); 303 304 // use the buffer address as the user_data 305 long udata = buf.address(); 306 var op = new Op(Thread.currentThread()); 307 ops.put(udata, op); 308 309 int res = 0; 310 try { 311 synchronized (submitLock) { 312 submitWrite(fd, buf, len, -1L, udata); 313 } 314 if (isOpen.getAsBoolean() && !isShutdown()) { 315 LockSupport.park(); 316 } 317 } finally { 318 Op previous = ops.remove(udata); 319 assert previous == op; 320 321 res = op.result(); 322 try { 323 if (res > 0) { 324 assert res <= len; 325 } else if (res < 0) { 326 throw new IOException("IORING_OP_WRITE failed errno=" + (-res)); 327 } else { 328 // TBD if SQPOLL enabled, need the request to be processed before cancel 329 330 // write did not complete, need to cancel. If the cancel fails then 331 // we can't return the buffer to the cache. 332 cancelOp(fd, udata); 333 res = IOStatus.UNAVAILABLE; 334 } 335 } finally { 336 if (res != 0) { 337 offerMemorySegment(buf); 338 } 339 } 340 } 341 return res; 342 } 343 344 @Override 345 void wakeupPoller() throws IOException { 346 if (wakeupEvent == null) { 347 throw new UnsupportedOperationException(); 348 } 349 350 // causes subpoller to wakeup 351 wakeupEvent.set(); 352 } 353 354 @Override 355 int poll(int timeout) throws IOException { 356 if (timeout > 0) { 357 // timed polls not supported by this Poller 358 throw new UnsupportedOperationException(); 359 } 360 boolean block = (timeout == -1); 361 int max = block ? MAX_EVENTS_PER_POLL : MAX_EVENTS_PER_SUBPOLL; 362 int polled = tryPoll(max); 363 if (polled > 0 || !block) { 364 return polled; 365 } else { 366 int ret = ring.enter(0, 1, 0); // wait for at least one completion 367 if (ret < 0) { 368 throw new IOException("io_uring_enter failed, " + 369 IOUring.getError(ret)); 370 } 371 return tryPoll(max); 372 } 373 } 374 375 /** 376 * Poll or handle completions up to the given max without blocking. This method also 377 * handles the completion of any cancelled operations. 378 * @retutn the number of sockets polled and I/O operations completed 379 */ 380 private int tryPoll(int max) { 381 int polled = 0; 382 Cqe cqe; 383 while (polled < max && ((cqe = ring.pollCompletion()) != null)) { 384 long udata = cqe.user_data(); 385 386 // handle read/write ops 387 if (ops != null) { 388 Op op = ops.get(udata); 389 if (op != null) { 390 int res = cqe.res(); 391 op.setResult((res != 0) ? res : -1); // map 0 to -1 at EOF 392 LockSupport.unpark(op.thread()); 393 polled++; 394 continue; 395 } 396 } 397 398 // handle poll and cancls ops, user data is fd or -fd 399 int fd = (int) udata; 400 if (fd > 0 && (wakeupEvent == null || fd != wakeupEvent.efd())) { 401 // poll done 402 polled(fd); 403 polled++; 404 } else if (fd < 0) { 405 // cancel done 406 Thread t = cancels.remove(-fd); 407 if (t != null) { 408 LockSupport.unpark(t); 409 } 410 } 411 } 412 return polled; 413 } 414 415 /** 416 * Invoke io_uring_enter to submit the SQE entries 417 */ 418 private static void enter(IOUring ring, int n) throws IOException { 419 if (SQPOLL_IDLE_TIME > 0) { 420 ring.pollingEnter(); 421 } else { 422 int ret = ring.enter(n, 0, 0); 423 if (ret < 0) { 424 throw new IOException("io_uring_enter failed, " + 425 IOUring.getError(ret)); 426 } 427 assert ret == n; 428 } 429 } 430 431 private void enter(int n) throws IOException { 432 enter(ring, n); 433 } 434 435 /** 436 * Submit IORING_OP_POLL_ADD operation. 437 */ 438 private static void submitPollAdd(IOUring ring, 439 int fd, 440 int events, 441 long udata) throws IOException { 442 Sqe sqe = new Sqe() 443 .opcode(IORING_OP_POLL_ADD()) 444 .fd(fd) 445 .user_data(udata) 446 .poll_events(events); 447 ring.submit(sqe); 448 } 449 450 private void submitPollAdd(int fd, int events, long udata) throws IOException { 451 submitPollAdd(ring, fd, events, udata); 452 } 453 454 /** 455 * Submit IORING_OP_POLL_REMOVE operation. 456 * @param req_udata the user data to identify the original POLL_ADD 457 * @param udata the user data for the POLL_REMOVE op 458 */ 459 private void submitPollRemove(long req_udata, long udata) throws IOException { 460 @SuppressWarnings("restricted") 461 MemorySegment address = MemorySegment.ofAddress(req_udata).reinterpret(ADDRESS_SIZE); 462 Sqe sqe = new Sqe() 463 .opcode(IORING_OP_POLL_REMOVE()) 464 .addr(address) 465 .user_data(udata); 466 ring.submit(sqe); 467 } 468 469 /** 470 * Submit IORING_OP_READ operation. 471 */ 472 private void submitRead(int fd, MemorySegment buf, int len, long pos, long udata) throws IOException { 473 Sqe sqe = new Sqe() 474 .opcode(IORING_OP_READ()) 475 .fd(fd) 476 .addr(buf) 477 .len(len) 478 .off(pos) // file position or -1L 479 .user_data(udata); 480 ring.submit(sqe); 481 enter(1); 482 } 483 484 /** 485 * Submit IORING_OP_WRITE operation. 486 */ 487 private void submitWrite(int fd, MemorySegment buf, int len, long pos, long udata) throws IOException { 488 Sqe sqe = new Sqe() 489 .opcode(IORING_OP_WRITE()) 490 .fd(fd) 491 .addr(buf) 492 .len(len) 493 .off(pos) // file position or -1L 494 .user_data(udata); 495 ring.submit(sqe); 496 enter(1); 497 } 498 499 /** 500 * Cancels an operation submitted with the given user_data. 501 */ 502 private void cancelOp(int fd, long req_udata) throws IOException { 503 @SuppressWarnings("restricted") 504 MemorySegment address = MemorySegment.ofAddress(req_udata).reinterpret(ADDRESS_SIZE); 505 cancels.put(fd, Thread.currentThread()); 506 synchronized (submitLock) { 507 Sqe sqe = new Sqe() 508 .opcode(IORING_OP_ASYNC_CANCEL()) 509 .addr(address) 510 .user_data(-fd); // user data for IORING_OP_ASYNC_CANCEL 511 ring.submit(sqe); 512 enter(1); 513 } 514 while (cancels.containsKey(fd) && !isShutdown()) { 515 LockSupport.park(); 516 } 517 } 518 519 private MemorySegment takeMemorySegment() { 520 MemorySegment buf = memorySegmentCache.poll(); 521 if (buf == null) { 522 buf = ARENA.allocate(MAX_BUF_SIZE); 523 } 524 return buf; 525 } 526 527 private void offerMemorySegment(MemorySegment buf) { 528 memorySegmentCache.offer(buf); 529 } 530 }