1 /* 2 * Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch.iouring; 27 28 import jdk.internal.ffi.generated.iouring.*; 29 30 import java.io.IOException; 31 import java.lang.foreign.*; 32 import java.lang.invoke.MethodHandle; 33 import java.lang.invoke.VarHandle; 34 import java.nio.ByteBuffer; 35 import java.time.Duration; 36 37 import static java.lang.foreign.ValueLayout.JAVA_BYTE; 38 import static sun.nio.ch.iouring.Util.strerror; 39 import static sun.nio.ch.iouring.Util.locateHandleFromLib; 40 import static sun.nio.ch.iouring.Util.locateStdHandle; 41 import static sun.nio.ch.iouring.Util.INT_POINTER; 42 import static jdk.internal.ffi.generated.iouring.iouring_h.*; 43 import static jdk.internal.ffi.generated.iouring.iouring_h_1.IORING_REGISTER_EVENTFD; 44 import static jdk.internal.ffi.generated.iouring.iouring_h_1.IORING_UNREGISTER_EVENTFD; 45 46 /** 47 * Low level interface to a Linux io_uring. It provides an asynchronous 48 * interface. Requests are submitted through the {@link #submit(Sqe)} method. 49 * Completion events can be awaited by calling {@link #enter(int, int, int)}. 50 * Completions represented by {@link Cqe} are then obtained by calling 51 * {@link #pollCompletion()}. Completions are linked to submissions by the 52 * {@link Cqe#user_data()} field of the {@code Cqe} which contains the 53 * same 64-bit (long) value that was supplied in the submitted {@link Sqe}. 54 * <p> 55 * Some IOUring operations work with kernel registered direct ByteBuffers. 56 * When creating an IOUring instance, a number of these buffers can be 57 * created in a pool. Registered buffers are not used with regular 58 * IOUring read/write operations. 59 */ 60 @SuppressWarnings("restricted") 61 public class IOUring { 62 private static final Arena arena = Arena.ofAuto(); 63 64 private static final boolean TRACE = System 65 .getProperty("jdk.io_uring.trace", "false") 66 .equalsIgnoreCase("true"); 67 private final SubmissionQueue sq; 68 private final CompletionQueue cq; 69 private final int fd; // The ringfd 70 private int epollfd = -1; // The epoll(7) if set 71 private static final int INT_SIZE = (int)ValueLayout.JAVA_INT.byteSize(); 72 73 private final Arena autoArena = Arena.ofAuto(); 74 75 private final KMappedBuffers mappedBuffers; 76 77 /** 78 * Creates an IOURing and initializes the ring structures. {@code entries} 79 * (or the next higher power of 2) is the size of the Submission Queue. 80 * Currently, the completion queue returned will be double the size 81 * of the Submission queue. 82 */ 83 public IOUring(int entries) throws IOException { 84 this(entries, 0, 0); 85 } 86 87 /** 88 * Creates an IOURing and initializes the ring structures. 89 * @param sq_entries the number of submission queue entries to allocate 90 * @param cq_entries the number of completion queue entries to allocate 91 * @param flags io_uring_params flags 92 * @throws IOException if an IOException occurs 93 */ 94 public IOUring(int sq_entries, int cq_entries, int flags) throws IOException { 95 this(sq_entries, cq_entries, 0, 0, -1, 0); 96 } 97 98 /** 99 * Creates an IOURing initializes the ring structures and allocates a 100 * number of direct {@link ByteBuffer}s which are additionally mapped 101 * into the kernel address space. 102 * 103 * @param sq_entries the number of submission queue entries to allocate 104 * @param cq_entries the number of completion queue entries to allocate 105 * @param flags io_uring_params flags 106 * @param nmappedBuffers number of mapped direct ByteBuffers to create 107 * @param mappedBufsize size of each buffer in bytes 108 * @param poll_idle_time the number of milliseconds to allow kernel polling 109 * thread to remain idle. {@code 0} means polling disabled. 110 * @throws IOException if an IOException occurs 111 */ 112 public IOUring(int sq_entries, 113 int cq_entries, 114 int flags, 115 int nmappedBuffers, 116 int mappedBufsize, 117 int poll_idle_time) throws IOException { 118 if (TRACE) { 119 System.out.printf("IOUring (sq:%d, cq:%d, flg:%d, nbufs:%d, bufsiz:%d, sqidle:%d)\n", 120 sq_entries, cq_entries, flags, nmappedBuffers, mappedBufsize, poll_idle_time); 121 } 122 123 MemorySegment params_seg = getSegmentFor(io_uring_params.$LAYOUT()); 124 125 if (cq_entries > 0) { 126 io_uring_params.cq_entries(params_seg, cq_entries); 127 flags |= IORING_SETUP_CQSIZE(); 128 } 129 130 boolean polling = false; 131 132 if (poll_idle_time > 0) { 133 io_uring_params.sq_thread_idle(params_seg, poll_idle_time); 134 flags |= IORING_SETUP_SQPOLL(); 135 polling = true; 136 } 137 138 if (flags != 0) { 139 io_uring_params.flags(params_seg, flags); 140 } 141 142 try { 143 // call setup 144 fd = io_uring_setup(sq_entries, params_seg); 145 if (fd < 0) { 146 throw new IOException(errorString(fd)); 147 } 148 } catch (Throwable t) { 149 if (TRACE) 150 t.printStackTrace(); 151 throw t; 152 } 153 154 if (poll_idle_time > 0) { 155 int i = io_uring_params.sq_thread_idle(params_seg); 156 if (TRACE) System.out.printf("poll_idle_time = %d\n", i); 157 } 158 mappedBuffers = new KMappedBuffers(nmappedBuffers, mappedBufsize); 159 if (nmappedBuffers > 0) { 160 mappedBuffers.register(fd); 161 } 162 // Offsets segments 163 MemorySegment cq_off_seg = io_uring_params.cq_off(params_seg); 164 MemorySegment sq_off_seg = io_uring_params.sq_off(params_seg); 165 166 // Offsets to cqe array and the sqe index array 167 int cq_off_cqes = io_cqring_offsets.cqes(cq_off_seg); 168 int sq_off_array = io_sqring_offsets.array(sq_off_seg); 169 int sq_off_flags = io_sqring_offsets.flags(sq_off_seg); 170 171 // Acual number of entries in each Q 172 sq_entries = io_uring_params.sq_entries(params_seg); 173 cq_entries = io_uring_params.cq_entries(params_seg); 174 175 int sq_size = sq_off_array + sq_entries * INT_SIZE; 176 int cq_size = cq_off_cqes + cq_entries * (int)io_uring_cqe.sizeof(); 177 178 boolean singleMmap = (io_uring_params.features(params_seg) 179 & IORING_FEAT_SINGLE_MMAP()) != 0; 180 181 if (singleMmap) { 182 if (cq_size > sq_size) 183 sq_size = cq_size; 184 cq_size = sq_size; 185 } 186 var sqe_seg = mmap(sq_size, fd, IORING_OFF_SQ_RING()); 187 188 MemorySegment cqes_seg; 189 if (singleMmap) { 190 cqes_seg = sqe_seg; 191 } else { 192 cqes_seg = mmap(cq_size, fd, IORING_OFF_CQ_RING()); 193 } 194 195 // Masks 196 int sq_mask = sqe_seg.get(ValueLayout.JAVA_INT, 197 io_sqring_offsets.ring_mask(sq_off_seg)); 198 int cq_mask = cqes_seg.get(ValueLayout.JAVA_INT, 199 io_cqring_offsets.ring_mask(cq_off_seg)); 200 201 var sqes = mmap(sq_entries * io_uring_sqe.sizeof(), 202 fd, IORING_OFF_SQES()); 203 204 cq = new CompletionQueue(cqes_seg.asSlice(cq_off_cqes), 205 cqes_seg.asSlice(io_cqring_offsets.head(cq_off_seg)), 206 cqes_seg.asSlice(io_cqring_offsets.tail(cq_off_seg)), 207 cq_mask); 208 209 sq = new SubmissionQueue(sqe_seg.asSlice(sq_off_array), 210 sqe_seg.asSlice(io_sqring_offsets.head(sq_off_seg)), 211 sqe_seg.asSlice(io_sqring_offsets.tail(sq_off_seg)), 212 sq_mask, sqe_seg.asSlice(sq_off_flags), polling, 213 sqes); 214 if (TRACE) 215 System.out.printf("IOUring: ringfd: %d\n", fd); 216 } 217 218 219 public void close() throws IOException { 220 int ret; 221 SystemCallContext ctx = SystemCallContext.get(); 222 try { 223 ret = (int)close_fn.invokeExact(ctx.errnoCaptureSegment(), 224 ringFd()); 225 } catch (Throwable e) { 226 throw new RuntimeException(e); 227 } 228 ctx.throwIOExceptionOnError(ret); 229 230 } 231 232 public int eventfd() throws IOException { 233 int ret; 234 SystemCallContext ctx = SystemCallContext.get(); 235 try { 236 ret = (int)eventfd_fn.invokeExact(ctx.errnoCaptureSegment(), 237 0, 0); 238 } catch (Throwable e) { 239 throw new RuntimeException(e); 240 } 241 return ctx.throwIOExceptionOnError(ret); 242 } 243 244 private int initEpoll() throws IOException { 245 int ret; 246 SystemCallContext ctx = SystemCallContext.get(); 247 try { 248 ret = (int)epoll_create_fn.invokeExact(ctx.errnoCaptureSegment(), 249 ringFd(), 1); 250 } catch (Throwable e) { 251 throw new RuntimeException(e); 252 } 253 return ctx.throwIOExceptionOnError(ret); 254 } 255 256 public void register_eventfd(int efd) throws IOException { 257 int ret; 258 SystemCallContext ctx = SystemCallContext.get(); 259 MemorySegment fdseg = 260 arena.allocateFrom(ValueLayout.JAVA_INT, efd); 261 262 try { 263 ret = (int)evregister_fn 264 .invokeExact( 265 ctx.errnoCaptureSegment(), 266 NR_io_uring_register, 267 fd, IORING_REGISTER_EVENTFD(), 268 fdseg, 1 269 ); 270 } catch (Throwable e) { 271 throw new RuntimeException(e); 272 } 273 ctx.throwIOExceptionOnError(ret); 274 } 275 276 public void unregister_eventfd() throws IOException { 277 int ret; 278 SystemCallContext ctx = SystemCallContext.get(); 279 280 try { 281 ret = (int)evregister_fn 282 .invokeExact( 283 ctx.errnoCaptureSegment(), 284 NR_io_uring_register, 285 fd, IORING_UNREGISTER_EVENTFD(), 286 MemorySegment.NULL, 0 287 ); 288 } catch (Throwable e) { 289 throw new RuntimeException(e); 290 } 291 ctx.throwIOExceptionOnError(ret); 292 293 } 294 295 /** 296 * Asynchronously submits an Sqe to this IOUring. Can be called 297 * multiple times before enter(). 298 * 299 * @param sqe 300 * @throws IOException if submission q full 301 */ 302 public void submit(Sqe sqe) throws IOException { 303 if (!sq.submit(sqe)) { 304 enter(0, 0, IORING_ENTER_SQ_WAIT()); 305 if (!sq.submit(sqe)) { 306 throw new IOException("Submission Queue full: wait failed"); 307 } 308 } 309 if (TRACE) 310 System.out.printf("submit: %s \n", sqe); 311 } 312 313 /** 314 * Notifies the kernel of entries on the Submission Q and waits for a 315 * number of responses (completion events). If this returns normally 316 * with value {@code n > 0}, this means that n requests have been accepted 317 * by the kernel. A normal return also means that the requested number of 318 * completion events have been received {@link #pollCompletion()} can be 319 * called {@code nreceive} times to obtain the results. 320 * 321 * @param nsubmit number of requests to submit 322 * @param nreceive block until this number of events received 323 * @param flags flags to pass to io_uring_enter 324 * 325 * @return if return value less than 0 means an error occurred. Otherwise, 326 * the number of Sqes successfully submitted. 327 */ 328 public int enter(int nsubmit, int nreceive, int flags) throws IOException { 329 if (TRACE) System.out.printf("enter([fd:%d] %d, %d, %d) called\n", 330 this.fd, nsubmit, nreceive, flags); 331 332 if (nreceive > 0) { 333 flags |= IORING_ENTER_GETEVENTS(); 334 } 335 int res = io_uring_enter(this.fd, nsubmit, nreceive, flags); 336 if (TRACE) System.out.printf("enter [fd:%d] returns %s\n", 337 this.fd, getError(res)); 338 return res; 339 } 340 341 /** 342 * Return a String that is either the integer value 343 * if it is greater than or equal to zero, or 344 * a description of the error if less than zero 345 */ 346 public static String getError(int retcode) { 347 if (retcode >= 0) { 348 return Integer.toString(retcode); 349 } else { 350 return strerror(-retcode); 351 } 352 } 353 354 /** 355 * In polling mode, use this instead of enter() on the submission side 356 * to check if kernel poller needs to be woken up. It checks if the kernel 357 * polling thread has exited, and if so it restarts it. 358 */ 359 public void pollingEnter() throws IOException { 360 if (TRACE) System.out.printf("pollingEnter([fd:%d]) called\n", this.fd); 361 if (!sq.polling()) 362 throw new IllegalStateException("IOUring not in polling mode"); 363 364 if ((sq.getSQFlags() & IORING_SQ_NEED_WAKEUP()) > 0) { 365 if (TRACE) System.out.println("pollingEnter: waking up kernel"); 366 enter(0, 0, IORING_ENTER_SQ_WAKEUP()); 367 } 368 if (TRACE) System.out.printf("pollingEnter [fd:%d] return\n", this.fd); 369 } 370 371 /** 372 * Returns the allocated size of the Submission Q. If the requested size 373 * was not a power of 2, then the allocated size will be the next highest 374 * power of 2. 375 * 376 * @return 377 */ 378 public int sqsize() { 379 return sq.ringSize; 380 } 381 382 /** 383 * Returns the number of free entries in the Submission Q 384 */ 385 public int sqfree() { 386 return sq.nAvail(); 387 } 388 389 /** 390 * Returns whether the completion Q is empty or not. 391 * 392 * @return 393 */ 394 public boolean cqempty() { 395 return cq.nEntries() == 0; 396 } 397 398 /** 399 * Returns the allocated size of the Completion Q. 400 * Currently, double the size of the Submission Q 401 * 402 * @return 403 */ 404 public int cqsize() { 405 return cq.ringSize; 406 } 407 408 public int epoll_fd() { 409 return epollfd; 410 } 411 412 /** 413 * Polls the Completion Queue for results. 414 * 415 * @return a Cqe if available or {@code null} 416 */ 417 public Cqe pollCompletion() { 418 Cqe cqe = cq.pollHead(); 419 if (TRACE) 420 System.out.printf("pollCompletion: -> %s\n", cqe); 421 return cqe; 422 } 423 424 /** 425 * Returns a String description of the given errno value 426 * 427 * @param errno 428 * @return 429 */ 430 public static String strerror(int errno) { 431 return Util.strerror(errno); 432 } 433 434 private static int io_uring_setup(int entries, MemorySegment params) 435 throws IOException { 436 SystemCallContext ctx = SystemCallContext.get(); 437 int ret; 438 try { 439 ret = (int)setup_fn.invokeExact(ctx.errnoCaptureSegment(), 440 NR_io_uring_setup, entries, params); 441 } catch (Throwable t) { 442 throw ioexception(t); 443 } 444 return ctx.throwIOExceptionOnError(ret); 445 } 446 447 private static final int EINTR = 4; 448 449 private static int io_uring_enter(int fd, int to_submit, int min_complete, 450 int flags) throws IOException { 451 SystemCallContext ctx = SystemCallContext.get(); 452 int ret; 453 try { 454 do { 455 ret = (int) enter_fn.invokeExact(ctx.errnoCaptureSegment(), 456 NR_io_uring_enter, fd, to_submit, min_complete, 457 flags, MemorySegment.NULL); 458 } while (ret == -1 && ctx.lastErrno() == EINTR); 459 } catch (Throwable t) { 460 throw ioexception(t); 461 } 462 return ctx.throwIOExceptionOnError(ret); 463 } 464 465 static IOException ioexception(Throwable t) { 466 if (t instanceof IOException ioe) { 467 return ioe; 468 } else { 469 return new IOException(t); 470 } 471 } 472 473 int checkAndGetIndexFor(ByteBuffer buffer) { 474 return mappedBuffers.checkAndGetIndexForBuffer(buffer); 475 } 476 477 /** 478 * Returns a mapped direct ByteBuffer or {@code null} if none available. 479 * Mapped buffers must be used with some IOUring operations such as 480 * {@code IORING_OP_WRITE_FIXED} and {@code IORING_OP_READ_FIXED}. 481 * Buffers must be returned after use with 482 * {@link #returnRegisteredBuffer(ByteBuffer)}. 483 * 484 * @return 485 */ 486 public ByteBuffer getRegisteredBuffer() { 487 return mappedBuffers.getRegisteredBuffer(); 488 } 489 490 /** 491 * Returns a previously allocated registered buffer. 492 * 493 * @param buffer 494 */ 495 public void returnRegisteredBuffer(ByteBuffer buffer) { 496 mappedBuffers.returnRegisteredBuffer(buffer); 497 } 498 499 /** 500 * Common capabilities of SubmissionQueue and CompletionQueue 501 */ 502 sealed abstract class QueueImplBase permits SubmissionQueue, CompletionQueue { 503 protected final MemorySegment ringSeg; 504 private final MemorySegment head, tail; 505 protected final int ringMask; 506 protected final MemoryLayout ringLayout; 507 protected final int ringLayoutSize; 508 protected final int ringLayoutAlignment; 509 protected final int ringSize; 510 511 // For accessing head and tail as volatile 512 protected final VarHandle addrH; 513 514 /** 515 * 516 * @param ringSeg The mapped segment 517 * @param head The head pointer 518 * @param tail The tail pointer 519 * @param ringMask 520 * @param ringLayout 521 */ 522 QueueImplBase(MemorySegment ringSeg, MemorySegment head, 523 MemorySegment tail, int ringMask, 524 MemoryLayout ringLayout) { 525 this.ringSeg = ringSeg; 526 this.head = head; 527 this.tail = tail; 528 this.ringMask = ringMask; 529 this.ringSize = ringMask + 1; 530 this.ringLayout = ringLayout; 531 this.ringLayoutSize = (int)ringLayout.byteSize(); 532 this.ringLayoutAlignment = (int)ringLayout.byteAlignment(); 533 this.addrH = ValueLayout.JAVA_INT.varHandle(); 534 } 535 536 abstract int nEntries(); 537 538 boolean ringFull() { 539 return nEntries() == ringSize; 540 } 541 542 int nAvail() { 543 return ringSize - nEntries(); 544 } 545 546 protected int getHead(boolean withAcquire) { 547 int val = (int)(withAcquire 548 ? addrH.getAcquire(head, 0L) : addrH.get(head, 0L)); 549 return val; 550 } 551 552 protected int getTail(boolean withAcquire) { 553 int val = (int)(withAcquire 554 ? addrH.getAcquire(tail, 0L) : addrH.get(tail, 0L)); 555 return val; 556 } 557 558 // Used by CompletionQueue 559 protected void setHead(int val) { 560 addrH.setRelease(head, 0L, val); 561 } 562 563 // Used by SubmissionQueue 564 protected void setTail(int val) { 565 addrH.setRelease(tail, 0L, val); 566 } 567 } 568 569 final class SubmissionQueue extends QueueImplBase { 570 final MemorySegment sqes; 571 final MemorySegment flags; 572 final int n_sqes; 573 final VarHandle flagsH; // handle for accessing flags 574 final boolean polling; 575 576 static final int sqe_layout_size = 577 (int)io_uring_sqe.$LAYOUT().byteSize(); 578 579 static final int sqe_alignment = 580 (int)io_uring_sqe.$LAYOUT().byteAlignment(); 581 582 SubmissionQueue(MemorySegment ringSeg, MemorySegment head, 583 MemorySegment tail, int mask, 584 MemorySegment flags, boolean polling, 585 MemorySegment sqes) { 586 super(ringSeg, head, tail, mask, ValueLayout.JAVA_INT); 587 this.sqes = sqes; 588 this.flags = flags; 589 this.polling = polling; 590 this.flagsH = ValueLayout.JAVA_INT.varHandle(); 591 this.n_sqes = (int) (sqes.byteSize() / sqe_layout_size); 592 } 593 594 /** 595 * Submits an Sqe to Submission Q. 596 * @param sqe 597 * @return true if the submission succeeded, false if the Q is full 598 */ 599 public boolean submit(Sqe sqe) throws IOException { 600 if (ringFull()) { 601 return false; 602 } 603 604 int tailVal = getTail(false); 605 int tailIndex = tailVal & ringMask; 606 607 MemorySegment slot = sqes.asSlice( 608 (long) tailIndex * sqe_layout_size, 609 sqe_layout_size, sqe_alignment).fill((byte)0); 610 if (slot == null) 611 throw new IOException("Q full"); // shouldn't happen 612 // Populate the slot as an io_uring_sqe 613 // Note. Sqe has already validated that overlapping fields not set 614 io_uring_sqe.user_data(slot, sqe.user_data()); 615 io_uring_sqe.fd(slot, sqe.fd()); 616 io_uring_sqe.opcode(slot, (byte)sqe.opcode()); 617 // This statement handles the large flags union 618 // For simplicity all __u32 variants are handled 619 // as xxx_flags. poll_events (__u16) are special 620 sqe.xxx_flags().ifPresentOrElse( 621 u32 -> io_uring_sqe.open_flags(slot, u32), 622 // xxx_flags not present, poll_events may be 623 () -> sqe.poll_events().ifPresent( 624 u16 -> io_uring_sqe.poll_events(slot, (short)u16))); 625 626 io_uring_sqe.flags(slot, (byte)sqe.flags()); 627 io_uring_sqe.addr(slot, sqe.addr() 628 .orElse(MemorySegment.NULL).address()); 629 io_uring_sqe.addr2(slot, sqe.addr2() 630 .orElse(MemorySegment.NULL).address()); 631 io_uring_sqe.buf_index(slot, (short)sqe.buf_index().orElse(0)); 632 io_uring_sqe.off(slot, sqe.off().orElse(0L)); 633 io_uring_sqe.len(slot, sqe.len().orElse(0)); 634 // Populate the tail slot 635 ringSeg.setAtIndex(ValueLayout.JAVA_INT, tailIndex, tailIndex); 636 setTail(++tailVal); 637 return true; 638 } 639 640 /* 641 * Returns the SQ flags for this ring. Currently this is only used 642 * to read the IORING_SQ_NEED_WAKEUP if submission Q being used in 643 * SQPOLL mode. The kernel sets this flag if the kernel polling 644 * thread needs to be woken up. 645 */ 646 public int getSQFlags() { 647 int res = (int)flagsH.getOpaque(flags, 0); 648 return res; 649 } 650 651 @Override 652 int nEntries() { 653 int n = Math.abs(getTail(false) - getHead(true)); 654 return n; 655 } 656 657 /** 658 * Returns whether this Submission Q is using polling 659 */ 660 public boolean polling() { 661 return this.polling; 662 } 663 } 664 665 final class CompletionQueue extends QueueImplBase { 666 CompletionQueue(MemorySegment ringSeg, MemorySegment head, 667 MemorySegment tail, int mask) { 668 super(ringSeg, head, tail, mask, io_uring_cqe.$LAYOUT()); 669 } 670 671 public Cqe pollHead() { 672 int headVal = getHead(false); 673 if (headVal != getTail(true)) { 674 int index = headVal & ringMask; 675 int offset = index * ringLayoutSize; 676 MemorySegment seg = ringSeg.asSlice(offset, 677 ringLayoutSize, ringLayoutAlignment); 678 var res = new Cqe( 679 io_uring_cqe.user_data(seg), 680 io_uring_cqe.res(seg), 681 io_uring_cqe.flags(seg)); 682 headVal++; 683 setHead(headVal); 684 return res; 685 } else { 686 return null; 687 } 688 } 689 690 @Override 691 int nEntries() { 692 int n = Math.abs(getTail(true) - getHead(false)); 693 return n; 694 } 695 }; 696 697 /** 698 * Adds the given fd to this ring's epoll(7) instance 699 * and creates the epoll instance if it hasn't already been created 700 * 701 * If using the EPOLLONESHOT mode (in flags) the opaque field 702 * can be used to return the "id" of the specific operation that was 703 * kicked off. 704 * 705 * @param fd target fd to manage 706 * @param poll_events bit mask of events to activate 707 * @param opaque a 64 bit value to return with event notifications. 708 * A value of -1L is ignored. 709 * @throws IOException 710 * @throws InterruptedException 711 */ 712 public void epoll_add(int fd, int poll_events, long opaque) 713 throws IOException, InterruptedException { 714 epoll_op(fd, poll_events, opaque, EPOLL_CTL_ADD()); 715 } 716 717 public void epoll_del(int fd, int poll_events) 718 throws IOException, InterruptedException { 719 epoll_op(fd, poll_events, -1L, EPOLL_CTL_DEL()); 720 } 721 722 public void epoll_mod(int fd, int poll_events, long opaque) 723 throws IOException, InterruptedException { 724 epoll_op(fd, poll_events, opaque, EPOLL_CTL_DEL()); 725 } 726 727 private void epoll_op(int fd, int poll_events, long opaque, int op) 728 throws IOException, InterruptedException { 729 if (this.epollfd == -1) { 730 this.epollfd = initEpoll(); 731 } 732 733 MemorySegment targetfd = 734 arena.allocateFrom(ValueLayout.OfInt.JAVA_INT, fd); 735 736 Sqe request = new Sqe() 737 .opcode(IORING_OP_EPOLL_CTL()) 738 .fd(epollfd) 739 .addr(targetfd) 740 .xxx_flags(poll_events) 741 .len(op); 742 743 if (opaque != -1L) { 744 MemorySegment event = arena.allocate(epoll_event.$LAYOUT()); 745 epoll_event.events(event, poll_events); 746 var dataSlice = epoll_event.data(event); 747 epoll_data_t.u64(dataSlice, opaque); 748 request = request.off(event.address()); 749 } 750 submit(request); 751 } 752 753 static MemorySegment getSegmentFor(MemoryLayout layout) { 754 return arena.allocate(layout.byteSize(), layout.byteAlignment()) 755 .fill((byte)0); 756 } 757 758 static String errorString(int errno) { 759 errno = -errno; 760 return "Error: " + strerror(errno); 761 } 762 763 // This is obsolete. There is a better way of doing a timed 764 // poll by providing a timeval to io_uring_enter 765 public Sqe getTimeoutSqe(Duration maxwait, int opcode, int completionCount) { 766 MemorySegment seg = 767 arena.allocate(__kernel_timespec.$LAYOUT()).fill((byte)(0)); 768 769 __kernel_timespec.tv_sec(seg, maxwait.getSeconds()); 770 __kernel_timespec.tv_nsec(seg, maxwait.getNano()); 771 return new Sqe() 772 .opcode(opcode) 773 .addr(seg) 774 .xxx_flags(0) // timeout_flags 775 .off(completionCount) 776 .len(1); 777 } 778 779 private final static ValueLayout POINTER = 780 ValueLayout.ADDRESS.withTargetLayout( 781 MemoryLayout.sequenceLayout(Long.MAX_VALUE, JAVA_BYTE) 782 ); 783 784 private static final MethodHandle mmap_fn = locateStdHandle( 785 "mmap", FunctionDescriptor.of( 786 POINTER, 787 //ValueLayout.JAVA_LONG, // returned address 788 ValueLayout.JAVA_LONG, // input address, usually zero 789 ValueLayout.JAVA_LONG, // size_t 790 ValueLayout.JAVA_INT, // int prot (PROT_READ | PROT_WRITE) 791 ValueLayout.JAVA_INT, // int flags (MAP_SHARED|MAP_POPULATE) 792 ValueLayout.JAVA_INT, // int fd 793 ValueLayout.JAVA_LONG // off_t (64bit?) 794 ) 795 ); 796 797 private static final MethodHandle epoll_create_fn = locateStdHandle( 798 "epoll_create", FunctionDescriptor.of( 799 ValueLayout.JAVA_INT, // returned fd 800 ValueLayout.JAVA_INT // int size (ignored) 801 ), SystemCallContext.errnoLinkerOption() 802 ); 803 804 private static final MethodHandle close_fn = locateStdHandle( 805 "close", 806 FunctionDescriptor.of(ValueLayout.JAVA_INT, ValueLayout.JAVA_INT), 807 SystemCallContext.errnoLinkerOption() 808 ); 809 810 private static final MethodHandle eventfd_fn = locateStdHandle( 811 "eventfd", 812 FunctionDescriptor.of( 813 ValueLayout.JAVA_INT, 814 ValueLayout.JAVA_INT, 815 ValueLayout.JAVA_INT), 816 SystemCallContext.errnoLinkerOption() 817 ); 818 819 // Linux syscall numbers. Allows to invoke the system call 820 // directly in systems where there are no wrappers 821 // for these functions in libc or liburing. 822 // Also means we no longer use liburing 823 824 private static final int NR_io_uring_setup = 425; 825 private static final int NR_io_uring_enter = 426; 826 private static final int NR_io_uring_register = 427; 827 828 private static final MethodHandle setup_fn = locateStdHandle( 829 "syscall", FunctionDescriptor.of( 830 ValueLayout.JAVA_INT, 831 ValueLayout.JAVA_INT, 832 ValueLayout.JAVA_INT, 833 ValueLayout.ADDRESS), 834 SystemCallContext.errnoLinkerOption() 835 ); 836 837 private static final MethodHandle enter_fn = locateStdHandle( 838 "syscall", FunctionDescriptor.of(ValueLayout.JAVA_INT, 839 ValueLayout.JAVA_INT, 840 ValueLayout.JAVA_INT, 841 ValueLayout.JAVA_INT, 842 ValueLayout.JAVA_INT, 843 ValueLayout.JAVA_INT, 844 ValueLayout.ADDRESS),// sigset_t UNUSED for now 845 SystemCallContext.errnoLinkerOption() 846 ); 847 848 // io_uring_register specifically for 849 // IORING_REGISTER_EVENTFD and IORING_UNREGISTER_EVENTFD 850 private static final MethodHandle evregister_fn = locateStdHandle( 851 "syscall", 852 FunctionDescriptor.of(ValueLayout.JAVA_INT, // result 853 ValueLayout.JAVA_INT, // syscall 854 ValueLayout.JAVA_INT, // ring fd 855 ValueLayout.JAVA_INT, // opcode 856 INT_POINTER, // pointer to fd 857 ValueLayout.JAVA_INT),// integer value 1 858 SystemCallContext.errnoLinkerOption() 859 ); 860 861 // mmap constants used internally 862 private static final int PROT_READ = 1; 863 private static final int PROT_WRITE = 2; 864 private static final int MAP_SHARED = 1; 865 private static final int MAP_POPULATE = 0x8000; 866 867 /** 868 * offset (when mapping IOURING segments) must be one of: 869 * jdk.internal.ffi.generated.iouring.iouring_h.IORING_OFF_SQ_RING() 870 * jdk.internal.ffi.generated.iouring.iouring_h.IORING_OFF_CQ_RING() 871 * jdk.internal.ffi.generated.iouring.iouring_h.IORING_OFF_SQES() 872 * 873 * @param size 874 * @param fd 875 * @param offset 876 * @return 877 */ 878 private static MemorySegment mmap(long size, int fd, long offset) { 879 MemorySegment seg = null; 880 try { 881 seg = (MemorySegment)mmap_fn 882 .invokeExact(0L, size, 883 PROT_READ | PROT_WRITE, 884 MAP_SHARED | MAP_POPULATE, 885 fd, 886 offset 887 ); 888 } catch (Throwable e) { 889 throw new RuntimeException(e); 890 } 891 long addr = seg.address(); 892 return seg.reinterpret(size); 893 } 894 895 int ringFd() { 896 return fd; 897 } 898 }