1 /* 2 * Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch.iouring; 27 28 import jdk.internal.ffi.generated.iouring.*; 29 30 import java.io.IOException; 31 import java.lang.foreign.*; 32 import java.lang.invoke.MethodHandle; 33 import java.lang.invoke.VarHandle; 34 import java.nio.ByteBuffer; 35 import java.time.Duration; 36 37 import static java.lang.foreign.ValueLayout.JAVA_BYTE; 38 import static sun.nio.ch.iouring.Util.strerror; 39 import static sun.nio.ch.iouring.Util.locateHandleFromLib; 40 import static sun.nio.ch.iouring.Util.locateStdHandle; 41 import static sun.nio.ch.iouring.Util.INT_POINTER; 42 import static jdk.internal.ffi.generated.iouring.iouring_h.*; 43 import static jdk.internal.ffi.generated.iouring.iouring_h_1.IORING_REGISTER_EVENTFD; 44 import static jdk.internal.ffi.generated.iouring.iouring_h_1.IORING_UNREGISTER_EVENTFD; 45 46 /** 47 * Low level interface to a Linux io_uring. It provides an asynchronous 48 * interface. Requests are submitted through the {@link #submit(Sqe)} method. 49 * Completion events can be awaited by calling {@link #enter(int, int, int)}. 50 * Completions represented by {@link Cqe} are then obtained by calling 51 * {@link #pollCompletion()}. Completions are linked to submissions by the 52 * {@link Cqe#user_data()} field of the {@code Cqe} which contains the 53 * same 64-bit (long) value that was supplied in the submitted {@link Sqe}. 54 * <p> 55 * Some IOUring operations work with kernel registered direct ByteBuffers. 56 * When creating an IOUring instance, a number of these buffers can be 57 * created in a pool. Registered buffers are not used with regular 58 * IOUring read/write operations. 59 */ 60 @SuppressWarnings("restricted") 61 public class IOUring { 62 private static final Arena arena = Arena.ofAuto(); 63 64 private static final boolean TRACE = System 65 .getProperty("jdk.io_uring.trace", "false") 66 .equalsIgnoreCase("true"); 67 private final SubmissionQueue sq; 68 private final CompletionQueue cq; 69 private final int fd; // The ringfd 70 private int epollfd = -1; // The epoll(7) if set 71 private static final int INT_SIZE = (int)ValueLayout.JAVA_INT.byteSize(); 72 73 private final Arena autoArena = Arena.ofAuto(); 74 75 private final KMappedBuffers mappedBuffers; 76 77 /** 78 * Creates an IOURing and initializes the ring structures. {@code entries} 79 * (or the next higher power of 2) is the size of the Submission Queue. 80 * Currently, the completion queue returned will be double the size 81 * of the Submission queue. 82 */ 83 public IOUring(int entries) throws IOException { 84 this(entries, 0, 0); 85 } 86 87 /** 88 * Creates an IOURing and initializes the ring structures. 89 * @param sq_entries the number of submission queue entries to allocate 90 * @param cq_entries the number of completion queue entries to allocate 91 * @param flags io_uring_params flags 92 * @throws IOException if an IOException occurs 93 */ 94 public IOUring(int sq_entries, int cq_entries, int flags) throws IOException { 95 this(sq_entries, cq_entries, 0, 0, -1, 0); 96 } 97 98 /** 99 * Creates an IOURing initializes the ring structures and allocates a 100 * number of direct {@link ByteBuffer}s which are additionally mapped 101 * into the kernel address space. 102 * 103 * @param sq_entries the number of submission queue entries to allocate 104 * @param cq_entries the number of completion queue entries to allocate 105 * @param flags io_uring_params flags 106 * @param nmappedBuffers number of mapped direct ByteBuffers to create 107 * @param mappedBufsize size of each buffer in bytes 108 * @param poll_idle_time the number of milliseconds to allow kernel polling 109 * thread to remain idle. {@code 0} means polling disabled. 110 * @throws IOException if an IOException occurs 111 */ 112 public IOUring(int sq_entries, 113 int cq_entries, 114 int flags, 115 int nmappedBuffers, 116 int mappedBufsize, 117 int poll_idle_time) throws IOException { 118 if (TRACE) 119 System.out.printf("IOUring poll_idle_time = %d\n", 120 poll_idle_time); 121 122 MemorySegment params_seg = getSegmentFor(io_uring_params.$LAYOUT()); 123 124 if (cq_entries > 0) { 125 io_uring_params.cq_entries(params_seg, cq_entries); 126 flags |= IORING_SETUP_CQSIZE(); 127 } 128 129 boolean polling = false; 130 131 if (poll_idle_time > 0) { 132 io_uring_params.sq_thread_idle(params_seg, poll_idle_time); 133 flags |= IORING_SETUP_SQPOLL(); 134 polling = true; 135 } 136 137 if (flags != 0) { 138 io_uring_params.flags(params_seg, flags); 139 } 140 141 // call setup 142 fd = io_uring_setup(sq_entries, params_seg); 143 if (fd < 0) { 144 throw new IOException(errorString(fd)); 145 } 146 147 if (poll_idle_time > 0) { 148 int i = io_uring_params.sq_thread_idle(params_seg); 149 if (TRACE) System.out.printf("poll_idle_time = %d\n", i); 150 } 151 mappedBuffers = new KMappedBuffers(nmappedBuffers, mappedBufsize); 152 if (nmappedBuffers > 0) { 153 mappedBuffers.register(fd); 154 } 155 // Offsets segments 156 MemorySegment cq_off_seg = io_uring_params.cq_off(params_seg); 157 MemorySegment sq_off_seg = io_uring_params.sq_off(params_seg); 158 159 // Offsets to cqe array and the sqe index array 160 int cq_off_cqes = io_cqring_offsets.cqes(cq_off_seg); 161 int sq_off_array = io_sqring_offsets.array(sq_off_seg); 162 int sq_off_flags = io_sqring_offsets.flags(sq_off_seg); 163 164 // Acual number of entries in each Q 165 sq_entries = io_uring_params.sq_entries(params_seg); 166 cq_entries = io_uring_params.cq_entries(params_seg); 167 168 int sq_size = sq_off_array + sq_entries * INT_SIZE; 169 int cq_size = cq_off_cqes + cq_entries * (int)io_uring_cqe.sizeof(); 170 171 boolean singleMmap = (io_uring_params.features(params_seg) 172 & IORING_FEAT_SINGLE_MMAP()) != 0; 173 174 if (singleMmap) { 175 if (cq_size > sq_size) 176 sq_size = cq_size; 177 cq_size = sq_size; 178 } 179 var sqe_seg = mmap(sq_size, fd, IORING_OFF_SQ_RING()); 180 181 MemorySegment cqes_seg; 182 if (singleMmap) { 183 cqes_seg = sqe_seg; 184 } else { 185 cqes_seg = mmap(cq_size, fd, IORING_OFF_CQ_RING()); 186 } 187 188 // Masks 189 int sq_mask = sqe_seg.get(ValueLayout.JAVA_INT, 190 io_sqring_offsets.ring_mask(sq_off_seg)); 191 int cq_mask = cqes_seg.get(ValueLayout.JAVA_INT, 192 io_cqring_offsets.ring_mask(cq_off_seg)); 193 194 var sqes = mmap(sq_entries * io_uring_sqe.sizeof(), 195 fd, IORING_OFF_SQES()); 196 197 cq = new CompletionQueue(cqes_seg.asSlice(cq_off_cqes), 198 cqes_seg.asSlice(io_cqring_offsets.head(cq_off_seg)), 199 cqes_seg.asSlice(io_cqring_offsets.tail(cq_off_seg)), 200 cq_mask); 201 202 sq = new SubmissionQueue(sqe_seg.asSlice(sq_off_array), 203 sqe_seg.asSlice(io_sqring_offsets.head(sq_off_seg)), 204 sqe_seg.asSlice(io_sqring_offsets.tail(sq_off_seg)), 205 sq_mask, sqe_seg.asSlice(sq_off_flags), polling, 206 sqes); 207 if (TRACE) 208 System.out.printf("IOUring: ringfd: %d\n", fd); 209 } 210 211 212 public void close() throws IOException { 213 int ret; 214 SystemCallContext ctx = SystemCallContext.get(); 215 try { 216 ret = (int)close_fn.invokeExact(ctx.errnoCaptureSegment(), 217 ringFd()); 218 } catch (Throwable e) { 219 throw new RuntimeException(e); 220 } 221 ctx.throwIOExceptionOnError(ret); 222 223 } 224 225 public int eventfd() throws IOException { 226 int ret; 227 SystemCallContext ctx = SystemCallContext.get(); 228 try { 229 ret = (int)eventfd_fn.invokeExact(ctx.errnoCaptureSegment(), 230 0, 0); 231 } catch (Throwable e) { 232 throw new RuntimeException(e); 233 } 234 ctx.throwIOExceptionOnError(ret); 235 return ret; 236 } 237 238 private int initEpoll() throws IOException { 239 int ret; 240 SystemCallContext ctx = SystemCallContext.get(); 241 try { 242 ret = (int)epoll_create_fn.invokeExact(ctx.errnoCaptureSegment(), 243 ringFd(), 1); 244 } catch (Throwable e) { 245 throw new RuntimeException(e); 246 } 247 ctx.throwIOExceptionOnError(ret); 248 return ret; 249 } 250 251 public void register_eventfd(int efd) throws IOException { 252 int ret; 253 SystemCallContext ctx = SystemCallContext.get(); 254 MemorySegment fdseg = 255 arena.allocateFrom(ValueLayout.JAVA_INT, efd); 256 257 try { 258 ret = (int)evregister_fn 259 .invokeExact( 260 ctx.errnoCaptureSegment(), 261 NR_io_uring_register, 262 fd, IORING_REGISTER_EVENTFD(), 263 fdseg, 1 264 ); 265 } catch (Throwable e) { 266 throw new RuntimeException(e); 267 } 268 ctx.throwIOExceptionOnError(ret); 269 } 270 271 public void unregister_eventfd() throws IOException { 272 int ret; 273 SystemCallContext ctx = SystemCallContext.get(); 274 275 try { 276 ret = (int)evregister_fn 277 .invokeExact( 278 ctx.errnoCaptureSegment(), 279 NR_io_uring_register, 280 fd, IORING_UNREGISTER_EVENTFD(), 281 MemorySegment.NULL, 0 282 ); 283 } catch (Throwable e) { 284 throw new RuntimeException(e); 285 } 286 ctx.throwIOExceptionOnError(ret); 287 288 } 289 290 /** 291 * Asynchronously submits an Sqe to this IOUring. Can be called 292 * multiple times before enter(). 293 * 294 * @param sqe 295 * @throws IOException if submission q full 296 */ 297 public void submit(Sqe sqe) throws IOException { 298 if (!sq.submit(sqe)) { 299 enter(0, 0, IORING_ENTER_SQ_WAIT()); 300 if (!sq.submit(sqe)) { 301 throw new IOException("Submission Queue full: wait failed"); 302 } 303 } 304 if (TRACE) 305 System.out.printf("submit: %s \n", sqe); 306 } 307 308 /** 309 * Notifies the kernel of entries on the Submission Q and waits for a 310 * number of responses (completion events). If this returns normally 311 * with value {@code n > 0}, this means that n requests have been accepted 312 * by the kernel. A normal return also means that the requested number of 313 * completion events have been received {@link #pollCompletion()} can be 314 * called {@code nreceive} times to obtain the results. 315 * 316 * @param nsubmit number of requests to submit 317 * @param nreceive block until this number of events received 318 * @param flags flags to pass to io_uring_enter 319 * 320 * @return if return value less than 0 means an error occurred. Otherwise, 321 * the number of Sqes successfully submitted. 322 */ 323 public int enter(int nsubmit, int nreceive, int flags) throws IOException { 324 if (TRACE) System.out.printf("enter([fd:%d] %d, %d, %d) called\n", 325 this.fd, nsubmit, nreceive, flags); 326 327 if (nreceive > 0) { 328 flags |= IORING_ENTER_GETEVENTS(); 329 } 330 int res = io_uring_enter(this.fd, nsubmit, nreceive, flags); 331 if (TRACE) System.out.printf("enter [fd:%d] returns %d\n", 332 this.fd, res); 333 return res; 334 } 335 336 /** 337 * In polling mode, use this instead of enter() on the submission side 338 * to check if kernel poller needs to be woken up. It checks if the kernel 339 * polling thread has exited, and if so it restarts it. 340 */ 341 public void pollingEnter() throws IOException { 342 if (TRACE) System.out.printf("pollingEnter([fd:%d]) called\n", this.fd); 343 if (!sq.polling()) 344 throw new IllegalStateException("IOUring not in polling mode"); 345 346 if ((sq.getSQFlags() & IORING_SQ_NEED_WAKEUP()) > 0) { 347 if (TRACE) System.out.println("pollingEnter: waking up kernel"); 348 enter(0, 0, IORING_ENTER_SQ_WAKEUP()); 349 } 350 if (TRACE) System.out.printf("pollingEnter [fd:%d] return\n", this.fd); 351 } 352 353 /** 354 * Returns the allocated size of the Submission Q. If the requested size 355 * was not a power of 2, then the allocated size will be the next highest 356 * power of 2. 357 * 358 * @return 359 */ 360 public int sqsize() { 361 return sq.ringSize; 362 } 363 364 /** 365 * Returns the number of free entries in the Submission Q 366 */ 367 public int sqfree() { 368 return sq.nAvail(); 369 } 370 371 /** 372 * Returns whether the completion Q is empty or not. 373 * 374 * @return 375 */ 376 public boolean cqempty() { 377 return cq.nEntries() == 0; 378 } 379 380 /** 381 * Returns the allocated size of the Completion Q. 382 * Currently, double the size of the Submission Q 383 * 384 * @return 385 */ 386 public int cqsize() { 387 return cq.ringSize; 388 } 389 390 public int epoll_fd() { 391 return epollfd; 392 } 393 394 /** 395 * Polls the Completion Queue for results. 396 * 397 * @return a Cqe if available or {@code null} 398 */ 399 public Cqe pollCompletion() { 400 Cqe cqe = cq.pollHead(); 401 if (TRACE) 402 System.out.printf("pollCompletion: -> %s\n", cqe); 403 return cqe; 404 } 405 406 /** 407 * Returns a String description of the given errno value 408 * 409 * @param errno 410 * @return 411 */ 412 public static String strerror(int errno) { 413 return Util.strerror(errno); 414 } 415 416 private static int io_uring_setup(int entries, MemorySegment params) 417 throws IOException { 418 try { 419 return (int) setup_fn.invokeExact(NR_io_uring_setup, 420 entries, params); 421 } catch (Throwable t) { 422 throw ioexception(t); 423 } 424 } 425 426 private static int io_uring_enter(int fd, int to_submit, int min_complete, 427 int flags) throws IOException { 428 try { 429 return (int) enter_fn.invokeExact(NR_io_uring_enter, 430 fd, to_submit, min_complete, flags, MemorySegment.NULL); 431 } catch (Throwable t) { 432 throw ioexception(t); 433 } 434 } 435 436 static IOException ioexception(Throwable t) { 437 if (t instanceof IOException ioe) { 438 return ioe; 439 } else { 440 return new IOException(t); 441 } 442 } 443 444 int checkAndGetIndexFor(ByteBuffer buffer) { 445 return mappedBuffers.checkAndGetIndexForBuffer(buffer); 446 } 447 448 /** 449 * Returns a mapped direct ByteBuffer or {@code null} if none available. 450 * Mapped buffers must be used with some IOUring operations such as 451 * {@code IORING_OP_WRITE_FIXED} and {@code IORING_OP_READ_FIXED}. 452 * Buffers must be returned after use with 453 * {@link #returnRegisteredBuffer(ByteBuffer)}. 454 * 455 * @return 456 */ 457 public ByteBuffer getRegisteredBuffer() { 458 return mappedBuffers.getRegisteredBuffer(); 459 } 460 461 /** 462 * Returns a previously allocated registered buffer. 463 * 464 * @param buffer 465 */ 466 public void returnRegisteredBuffer(ByteBuffer buffer) { 467 mappedBuffers.returnRegisteredBuffer(buffer); 468 } 469 470 /** 471 * Common capabilities of SubmissionQueue and CompletionQueue 472 */ 473 sealed abstract class QueueImplBase permits SubmissionQueue, CompletionQueue { 474 protected final MemorySegment ringSeg; 475 private final MemorySegment head, tail; 476 protected final int ringMask; 477 protected final MemoryLayout ringLayout; 478 protected final int ringLayoutSize; 479 protected final int ringLayoutAlignment; 480 protected final int ringSize; 481 482 // For accessing head and tail as volatile 483 protected final VarHandle addrH; 484 485 /** 486 * 487 * @param ringSeg The mapped segment 488 * @param head The head pointer 489 * @param tail The tail pointer 490 * @param ringMask 491 * @param ringLayout 492 */ 493 QueueImplBase(MemorySegment ringSeg, MemorySegment head, 494 MemorySegment tail, int ringMask, 495 MemoryLayout ringLayout) { 496 this.ringSeg = ringSeg; 497 this.head = head; 498 this.tail = tail; 499 this.ringMask = ringMask; 500 this.ringSize = ringMask + 1; 501 this.ringLayout = ringLayout; 502 this.ringLayoutSize = (int)ringLayout.byteSize(); 503 this.ringLayoutAlignment = (int)ringLayout.byteAlignment(); 504 this.addrH = ValueLayout.JAVA_INT.varHandle(); 505 } 506 507 abstract int nEntries(); 508 509 boolean ringFull() { 510 return nEntries() == ringSize; 511 } 512 513 int nAvail() { 514 return ringSize - nEntries(); 515 } 516 517 protected int getHead(boolean withAcquire) { 518 int val = (int)(withAcquire 519 ? addrH.getAcquire(head, 0L) : addrH.get(head, 0L)); 520 return val; 521 } 522 523 protected int getTail(boolean withAcquire) { 524 int val = (int)(withAcquire 525 ? addrH.getAcquire(tail, 0L) : addrH.get(tail, 0L)); 526 return val; 527 } 528 529 // Used by CompletionQueue 530 protected void setHead(int val) { 531 addrH.setRelease(head, 0L, val); 532 } 533 534 // Used by SubmissionQueue 535 protected void setTail(int val) { 536 addrH.setRelease(tail, 0L, val); 537 } 538 } 539 540 final class SubmissionQueue extends QueueImplBase { 541 final MemorySegment sqes; 542 final MemorySegment flags; 543 final int n_sqes; 544 final VarHandle flagsH; // handle for accessing flags 545 final boolean polling; 546 547 static final int sqe_layout_size = 548 (int)io_uring_sqe.$LAYOUT().byteSize(); 549 550 static final int sqe_alignment = 551 (int)io_uring_sqe.$LAYOUT().byteAlignment(); 552 553 SubmissionQueue(MemorySegment ringSeg, MemorySegment head, 554 MemorySegment tail, int mask, 555 MemorySegment flags, boolean polling, 556 MemorySegment sqes) { 557 super(ringSeg, head, tail, mask, ValueLayout.JAVA_INT); 558 this.sqes = sqes; 559 this.flags = flags; 560 this.polling = polling; 561 this.flagsH = ValueLayout.JAVA_INT.varHandle(); 562 this.n_sqes = (int) (sqes.byteSize() / sqe_layout_size); 563 } 564 565 /** 566 * Submits an Sqe to Submission Q. 567 * @param sqe 568 * @return true if the submission succeeded, false if the Q is full 569 */ 570 public boolean submit(Sqe sqe) throws IOException { 571 if (ringFull()) { 572 return false; 573 } 574 575 int tailVal = getTail(false); 576 int tailIndex = tailVal & ringMask; 577 578 MemorySegment slot = sqes.asSlice( 579 (long) tailIndex * sqe_layout_size, 580 sqe_layout_size, sqe_alignment).fill((byte)0); 581 if (slot == null) 582 throw new IOException("Q full"); // shouldn't happen 583 // Populate the slot as an io_uring_sqe 584 // Note. Sqe has already validated that overlapping fields not set 585 io_uring_sqe.user_data(slot, sqe.user_data()); 586 io_uring_sqe.fd(slot, sqe.fd()); 587 io_uring_sqe.opcode(slot, (byte)sqe.opcode()); 588 // This statement handles the large flags union 589 // For simplicity all __u32 variants are handled 590 // as xxx_flags. poll_events (__u16) are special 591 sqe.xxx_flags().ifPresentOrElse( 592 u32 -> io_uring_sqe.open_flags(slot, u32), 593 // xxx_flags not present, poll_events may be 594 () -> sqe.poll_events().ifPresent( 595 u16 -> io_uring_sqe.poll_events(slot, (short)u16))); 596 597 io_uring_sqe.flags(slot, (byte)sqe.flags()); 598 io_uring_sqe.addr(slot, sqe.addr() 599 .orElse(MemorySegment.NULL).address()); 600 io_uring_sqe.addr2(slot, sqe.addr2() 601 .orElse(MemorySegment.NULL).address()); 602 io_uring_sqe.buf_index(slot, (short)sqe.buf_index().orElse(0)); 603 io_uring_sqe.off(slot, sqe.off().orElse(0L)); 604 io_uring_sqe.len(slot, sqe.len().orElse(0)); 605 // Populate the tail slot 606 ringSeg.setAtIndex(ValueLayout.JAVA_INT, tailIndex, tailIndex); 607 setTail(++tailVal); 608 return true; 609 } 610 611 /* 612 * Returns the SQ flags for this ring. Currently this is only used 613 * to read the IORING_SQ_NEED_WAKEUP if submission Q being used in 614 * SQPOLL mode. The kernel sets this flag if the kernel polling 615 * thread needs to be woken up. 616 */ 617 public int getSQFlags() { 618 int res = (int)flagsH.getOpaque(flags, 0); 619 return res; 620 } 621 622 @Override 623 int nEntries() { 624 int n = Math.abs(getTail(false) - getHead(true)); 625 return n; 626 } 627 628 /** 629 * Returns whether this Submission Q is using polling 630 */ 631 public boolean polling() { 632 return this.polling; 633 } 634 } 635 636 final class CompletionQueue extends QueueImplBase { 637 CompletionQueue(MemorySegment ringSeg, MemorySegment head, 638 MemorySegment tail, int mask) { 639 super(ringSeg, head, tail, mask, io_uring_cqe.$LAYOUT()); 640 } 641 642 public Cqe pollHead() { 643 int headVal = getHead(false); 644 if (headVal != getTail(true)) { 645 int index = headVal & ringMask; 646 int offset = index * ringLayoutSize; 647 MemorySegment seg = ringSeg.asSlice(offset, 648 ringLayoutSize, ringLayoutAlignment); 649 var res = new Cqe( 650 io_uring_cqe.user_data(seg), 651 io_uring_cqe.res(seg), 652 io_uring_cqe.flags(seg)); 653 headVal++; 654 setHead(headVal); 655 return res; 656 } else { 657 return null; 658 } 659 } 660 661 @Override 662 int nEntries() { 663 int n = Math.abs(getTail(true) - getHead(false)); 664 return n; 665 } 666 }; 667 668 /** 669 * Adds the given fd to this ring's epoll(7) instance 670 * and creates the epoll instance if it hasn't already been created 671 * 672 * If using the EPOLLONESHOT mode (in flags) the opaque field 673 * can be used to return the "id" of the specific operation that was 674 * kicked off. 675 * 676 * @param fd target fd to manage 677 * @param poll_events bit mask of events to activate 678 * @param opaque a 64 bit value to return with event notifications. 679 * A value of -1L is ignored. 680 * @throws IOException 681 * @throws InterruptedException 682 */ 683 public void epoll_add(int fd, int poll_events, long opaque) 684 throws IOException, InterruptedException { 685 epoll_op(fd, poll_events, opaque, EPOLL_CTL_ADD()); 686 } 687 688 public void epoll_del(int fd, int poll_events) 689 throws IOException, InterruptedException { 690 epoll_op(fd, poll_events, -1L, EPOLL_CTL_DEL()); 691 } 692 693 public void epoll_mod(int fd, int poll_events, long opaque) 694 throws IOException, InterruptedException { 695 epoll_op(fd, poll_events, opaque, EPOLL_CTL_DEL()); 696 } 697 698 private void epoll_op(int fd, int poll_events, long opaque, int op) 699 throws IOException, InterruptedException { 700 if (this.epollfd == -1) { 701 this.epollfd = initEpoll(); 702 } 703 704 MemorySegment targetfd = 705 arena.allocateFrom(ValueLayout.OfInt.JAVA_INT, fd); 706 707 Sqe request = new Sqe() 708 .opcode(IORING_OP_EPOLL_CTL()) 709 .fd(epollfd) 710 .addr(targetfd) 711 .xxx_flags(poll_events) 712 .len(op); 713 714 if (opaque != -1L) { 715 MemorySegment event = arena.allocate(epoll_event.$LAYOUT()); 716 epoll_event.events(event, poll_events); 717 var dataSlice = epoll_event.data(event); 718 epoll_data_t.u64(dataSlice, opaque); 719 request = request.off(event.address()); 720 } 721 submit(request); 722 } 723 724 static MemorySegment getSegmentFor(MemoryLayout layout) { 725 return arena.allocate(layout.byteSize(), layout.byteAlignment()) 726 .fill((byte)0); 727 } 728 729 static String errorString(int errno) { 730 errno = -errno; 731 return "Error: " + strerror(errno); 732 } 733 734 // This is obsolete. There is a better way of doing a timed 735 // poll by providing a timeval to io_uring_enter 736 public Sqe getTimeoutSqe(Duration maxwait, int opcode, int completionCount) { 737 MemorySegment seg = 738 arena.allocate(__kernel_timespec.$LAYOUT()).fill((byte)(0)); 739 740 __kernel_timespec.tv_sec(seg, maxwait.getSeconds()); 741 __kernel_timespec.tv_nsec(seg, maxwait.getNano()); 742 return new Sqe() 743 .opcode(opcode) 744 .addr(seg) 745 .xxx_flags(0) // timeout_flags 746 .off(completionCount) 747 .len(1); 748 } 749 750 private final static ValueLayout POINTER = 751 ValueLayout.ADDRESS.withTargetLayout( 752 MemoryLayout.sequenceLayout(Long.MAX_VALUE, JAVA_BYTE) 753 ); 754 755 private static final MethodHandle mmap_fn = locateStdHandle( 756 "mmap", FunctionDescriptor.of( 757 POINTER, 758 //ValueLayout.JAVA_LONG, // returned address 759 ValueLayout.JAVA_LONG, // input address, usually zero 760 ValueLayout.JAVA_LONG, // size_t 761 ValueLayout.JAVA_INT, // int prot (PROT_READ | PROT_WRITE) 762 ValueLayout.JAVA_INT, // int flags (MAP_SHARED|MAP_POPULATE) 763 ValueLayout.JAVA_INT, // int fd 764 ValueLayout.JAVA_LONG // off_t (64bit?) 765 ) 766 ); 767 768 private static final MethodHandle epoll_create_fn = locateStdHandle( 769 "epoll_create", FunctionDescriptor.of( 770 ValueLayout.JAVA_INT, // returned fd 771 ValueLayout.JAVA_INT // int size (ignored) 772 ), SystemCallContext.errnoLinkerOption() 773 ); 774 775 private static final MethodHandle close_fn = locateStdHandle( 776 "close", 777 FunctionDescriptor.of(ValueLayout.JAVA_INT, ValueLayout.JAVA_INT), 778 SystemCallContext.errnoLinkerOption() 779 ); 780 781 private static final MethodHandle eventfd_fn = locateStdHandle( 782 "eventfd", 783 FunctionDescriptor.of( 784 ValueLayout.JAVA_INT, 785 ValueLayout.JAVA_INT, 786 ValueLayout.JAVA_INT), 787 SystemCallContext.errnoLinkerOption() 788 ); 789 790 // Linux syscall numbers. Allows to invoke the system call 791 // directly in systems where there are no wrappers 792 // for these functions in libc or liburing. 793 // Also means we no longer use liburing 794 795 private static final int NR_io_uring_setup = 425; 796 private static final int NR_io_uring_enter = 426; 797 private static final int NR_io_uring_register = 427; 798 799 private static final MethodHandle setup_fn = locateStdHandle( 800 "syscall", FunctionDescriptor.of( 801 ValueLayout.JAVA_INT, 802 ValueLayout.JAVA_INT, 803 ValueLayout.JAVA_INT, 804 ValueLayout.ADDRESS) 805 ); 806 807 private static final MethodHandle enter_fn = locateStdHandle( 808 "syscall", FunctionDescriptor.of(ValueLayout.JAVA_INT, 809 ValueLayout.JAVA_INT, 810 ValueLayout.JAVA_INT, 811 ValueLayout.JAVA_INT, 812 ValueLayout.JAVA_INT, 813 ValueLayout.JAVA_INT, 814 ValueLayout.ADDRESS) // sigset_t UNUSED for now 815 ); 816 817 // io_uring_register specifically for 818 // IORING_REGISTER_EVENTFD and IORING_UNREGISTER_EVENTFD 819 private static final MethodHandle evregister_fn = locateStdHandle( 820 "syscall", 821 FunctionDescriptor.of(ValueLayout.JAVA_INT, // result 822 ValueLayout.JAVA_INT, // syscall 823 ValueLayout.JAVA_INT, // ring fd 824 ValueLayout.JAVA_INT, // opcode 825 INT_POINTER, // pointer to fd 826 ValueLayout.JAVA_INT),// integer value 1 827 SystemCallContext.errnoLinkerOption() 828 ); 829 830 // mmap constants used internally 831 private static final int PROT_READ = 1; 832 private static final int PROT_WRITE = 2; 833 private static final int MAP_SHARED = 1; 834 private static final int MAP_POPULATE = 0x8000; 835 836 /** 837 * offset (when mapping IOURING segments) must be one of: 838 * jdk.internal.ffi.generated.iouring.iouring_h.IORING_OFF_SQ_RING() 839 * jdk.internal.ffi.generated.iouring.iouring_h.IORING_OFF_CQ_RING() 840 * jdk.internal.ffi.generated.iouring.iouring_h.IORING_OFF_SQES() 841 * 842 * @param size 843 * @param fd 844 * @param offset 845 * @return 846 */ 847 private static MemorySegment mmap(long size, int fd, long offset) { 848 MemorySegment seg = null; 849 try { 850 seg = (MemorySegment)mmap_fn 851 .invokeExact(0L, size, 852 PROT_READ | PROT_WRITE, 853 MAP_SHARED | MAP_POPULATE, 854 fd, 855 offset 856 ); 857 } catch (Throwable e) { 858 throw new RuntimeException(e); 859 } 860 long addr = seg.address(); 861 return seg.reinterpret(size); 862 } 863 864 int ringFd() { 865 return fd; 866 } 867 }