1 /*
2 * Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package sun.nio.ch.iouring;
27
28 import jdk.internal.ffi.generated.iouring.*;
29
30 import java.io.IOException;
31 import java.lang.foreign.*;
32 import java.lang.invoke.MethodHandle;
33 import java.lang.invoke.VarHandle;
34 import java.nio.ByteBuffer;
35 import java.time.Duration;
36
37 import static java.lang.foreign.ValueLayout.JAVA_BYTE;
38 import static sun.nio.ch.iouring.Util.strerror;
39 import static sun.nio.ch.iouring.Util.locateHandleFromLib;
40 import static sun.nio.ch.iouring.Util.locateStdHandle;
41 import static sun.nio.ch.iouring.Util.INT_POINTER;
42 import static jdk.internal.ffi.generated.iouring.iouring_h.*;
43 import static jdk.internal.ffi.generated.iouring.iouring_h_1.IORING_REGISTER_EVENTFD;
44 import static jdk.internal.ffi.generated.iouring.iouring_h_1.IORING_UNREGISTER_EVENTFD;
45
46 /**
47 * Low level interface to a Linux io_uring. It provides an asynchronous
48 * interface. Requests are submitted through the {@link #submit(Sqe)} method.
49 * Completion events can be awaited by calling {@link #enter(int, int, int)}.
50 * Completions represented by {@link Cqe} are then obtained by calling
51 * {@link #pollCompletion()}. Completions are linked to submissions by the
52 * {@link Cqe#user_data()} field of the {@code Cqe} which contains the
53 * same 64-bit (long) value that was supplied in the submitted {@link Sqe}.
54 * <p>
55 * Some IOUring operations work with kernel registered direct ByteBuffers.
56 * When creating an IOUring instance, a number of these buffers can be
57 * created in a pool. Registered buffers are not used with regular
58 * IOUring read/write operations.
59 */
60 @SuppressWarnings("restricted")
61 public class IOUring {
62 private static final Arena arena = Arena.ofAuto();
63
64 private static final boolean TRACE = System
65 .getProperty("jdk.io_uring.trace", "false")
66 .equalsIgnoreCase("true");
67 private final SubmissionQueue sq;
68 private final CompletionQueue cq;
69 private final int fd; // The ringfd
70 private int epollfd = -1; // The epoll(7) if set
71 private static final int INT_SIZE = (int)ValueLayout.JAVA_INT.byteSize();
72
73 private final Arena autoArena = Arena.ofAuto();
74
75 private final KMappedBuffers mappedBuffers;
76
77 /**
78 * Creates an IOURing and initializes the ring structures. {@code entries}
79 * (or the next higher power of 2) is the size of the Submission Queue.
80 * Currently, the completion queue returned will be double the size
81 * of the Submission queue.
82 */
83 public IOUring(int entries) throws IOException {
84 this(entries, 0, 0);
85 }
86
87 /**
88 * Creates an IOURing and initializes the ring structures.
89 * @param sq_entries the number of submission queue entries to allocate
90 * @param cq_entries the number of completion queue entries to allocate
91 * @param flags io_uring_params flags
92 * @throws IOException if an IOException occurs
93 */
94 public IOUring(int sq_entries, int cq_entries, int flags) throws IOException {
95 this(sq_entries, cq_entries, 0, 0, -1, 0);
96 }
97
98 /**
99 * Creates an IOURing initializes the ring structures and allocates a
100 * number of direct {@link ByteBuffer}s which are additionally mapped
101 * into the kernel address space.
102 *
103 * @param sq_entries the number of submission queue entries to allocate
104 * @param cq_entries the number of completion queue entries to allocate
105 * @param flags io_uring_params flags
106 * @param nmappedBuffers number of mapped direct ByteBuffers to create
107 * @param mappedBufsize size of each buffer in bytes
108 * @param poll_idle_time the number of milliseconds to allow kernel polling
109 * thread to remain idle. {@code 0} means polling disabled.
110 * @throws IOException if an IOException occurs
111 */
112 public IOUring(int sq_entries,
113 int cq_entries,
114 int flags,
115 int nmappedBuffers,
116 int mappedBufsize,
117 int poll_idle_time) throws IOException {
118 if (TRACE) {
119 System.out.printf("IOUring (sq:%d, cq:%d, flg:%d, nbufs:%d, bufsiz:%d, sqidle:%d)\n",
120 sq_entries, cq_entries, flags, nmappedBuffers, mappedBufsize, poll_idle_time);
121 }
122
123 MemorySegment params_seg = getSegmentFor(io_uring_params.$LAYOUT());
124
125 if (cq_entries > 0) {
126 io_uring_params.cq_entries(params_seg, cq_entries);
127 flags |= IORING_SETUP_CQSIZE();
128 }
129
130 boolean polling = false;
131
132 if (poll_idle_time > 0) {
133 io_uring_params.sq_thread_idle(params_seg, poll_idle_time);
134 flags |= IORING_SETUP_SQPOLL();
135 polling = true;
136 }
137
138 if (flags != 0) {
139 io_uring_params.flags(params_seg, flags);
140 }
141
142 try {
143 // call setup
144 fd = io_uring_setup(sq_entries, params_seg);
145 if (fd < 0) {
146 throw new IOException(errorString(fd));
147 }
148 } catch (Throwable t) {
149 if (TRACE)
150 t.printStackTrace();
151 throw t;
152 }
153
154 if (poll_idle_time > 0) {
155 int i = io_uring_params.sq_thread_idle(params_seg);
156 if (TRACE) System.out.printf("poll_idle_time = %d\n", i);
157 }
158 mappedBuffers = new KMappedBuffers(nmappedBuffers, mappedBufsize);
159 if (nmappedBuffers > 0) {
160 mappedBuffers.register(fd);
161 }
162 // Offsets segments
163 MemorySegment cq_off_seg = io_uring_params.cq_off(params_seg);
164 MemorySegment sq_off_seg = io_uring_params.sq_off(params_seg);
165
166 // Offsets to cqe array and the sqe index array
167 int cq_off_cqes = io_cqring_offsets.cqes(cq_off_seg);
168 int sq_off_array = io_sqring_offsets.array(sq_off_seg);
169 int sq_off_flags = io_sqring_offsets.flags(sq_off_seg);
170
171 // Acual number of entries in each Q
172 sq_entries = io_uring_params.sq_entries(params_seg);
173 cq_entries = io_uring_params.cq_entries(params_seg);
174
175 int sq_size = sq_off_array + sq_entries * INT_SIZE;
176 int cq_size = cq_off_cqes + cq_entries * (int)io_uring_cqe.sizeof();
177
178 boolean singleMmap = (io_uring_params.features(params_seg)
179 & IORING_FEAT_SINGLE_MMAP()) != 0;
180
181 if (singleMmap) {
182 if (cq_size > sq_size)
183 sq_size = cq_size;
184 cq_size = sq_size;
185 }
186 var sqe_seg = mmap(sq_size, fd, IORING_OFF_SQ_RING());
187
188 MemorySegment cqes_seg;
189 if (singleMmap) {
190 cqes_seg = sqe_seg;
191 } else {
192 cqes_seg = mmap(cq_size, fd, IORING_OFF_CQ_RING());
193 }
194
195 // Masks
196 int sq_mask = sqe_seg.get(ValueLayout.JAVA_INT,
197 io_sqring_offsets.ring_mask(sq_off_seg));
198 int cq_mask = cqes_seg.get(ValueLayout.JAVA_INT,
199 io_cqring_offsets.ring_mask(cq_off_seg));
200
201 var sqes = mmap(sq_entries * io_uring_sqe.sizeof(),
202 fd, IORING_OFF_SQES());
203
204 cq = new CompletionQueue(cqes_seg.asSlice(cq_off_cqes),
205 cqes_seg.asSlice(io_cqring_offsets.head(cq_off_seg)),
206 cqes_seg.asSlice(io_cqring_offsets.tail(cq_off_seg)),
207 cq_mask);
208
209 sq = new SubmissionQueue(sqe_seg.asSlice(sq_off_array),
210 sqe_seg.asSlice(io_sqring_offsets.head(sq_off_seg)),
211 sqe_seg.asSlice(io_sqring_offsets.tail(sq_off_seg)),
212 sq_mask, sqe_seg.asSlice(sq_off_flags), polling,
213 sqes);
214 if (TRACE)
215 System.out.printf("IOUring: ringfd: %d\n", fd);
216 }
217
218
219 public void close() throws IOException {
220 int ret;
221 SystemCallContext ctx = SystemCallContext.get();
222 try {
223 ret = (int)close_fn.invokeExact(ctx.errnoCaptureSegment(),
224 ringFd());
225 } catch (Throwable e) {
226 throw new RuntimeException(e);
227 }
228 ctx.throwIOExceptionOnError(ret);
229
230 }
231
232 public int eventfd() throws IOException {
233 int ret;
234 SystemCallContext ctx = SystemCallContext.get();
235 try {
236 ret = (int)eventfd_fn.invokeExact(ctx.errnoCaptureSegment(),
237 0, 0);
238 } catch (Throwable e) {
239 throw new RuntimeException(e);
240 }
241 return ctx.throwIOExceptionOnError(ret);
242 }
243
244 private int initEpoll() throws IOException {
245 int ret;
246 SystemCallContext ctx = SystemCallContext.get();
247 try {
248 ret = (int)epoll_create_fn.invokeExact(ctx.errnoCaptureSegment(),
249 ringFd(), 1);
250 } catch (Throwable e) {
251 throw new RuntimeException(e);
252 }
253 return ctx.throwIOExceptionOnError(ret);
254 }
255
256 public void register_eventfd(int efd) throws IOException {
257 int ret;
258 SystemCallContext ctx = SystemCallContext.get();
259 MemorySegment fdseg =
260 arena.allocateFrom(ValueLayout.JAVA_INT, efd);
261
262 try {
263 ret = (int)evregister_fn
264 .invokeExact(
265 ctx.errnoCaptureSegment(),
266 NR_io_uring_register,
267 fd, IORING_REGISTER_EVENTFD(),
268 fdseg, 1
269 );
270 } catch (Throwable e) {
271 throw new RuntimeException(e);
272 }
273 ctx.throwIOExceptionOnError(ret);
274 }
275
276 public void unregister_eventfd() throws IOException {
277 int ret;
278 SystemCallContext ctx = SystemCallContext.get();
279
280 try {
281 ret = (int)evregister_fn
282 .invokeExact(
283 ctx.errnoCaptureSegment(),
284 NR_io_uring_register,
285 fd, IORING_UNREGISTER_EVENTFD(),
286 MemorySegment.NULL, 0
287 );
288 } catch (Throwable e) {
289 throw new RuntimeException(e);
290 }
291 ctx.throwIOExceptionOnError(ret);
292
293 }
294
295 /**
296 * Asynchronously submits an Sqe to this IOUring. Can be called
297 * multiple times before enter().
298 *
299 * @param sqe
300 * @throws IOException if submission q full
301 */
302 public void submit(Sqe sqe) throws IOException {
303 if (!sq.submit(sqe)) {
304 enter(0, 0, IORING_ENTER_SQ_WAIT());
305 if (!sq.submit(sqe)) {
306 throw new IOException("Submission Queue full: wait failed");
307 }
308 }
309 if (TRACE)
310 System.out.printf("submit: %s \n", sqe);
311 }
312
313 /**
314 * Notifies the kernel of entries on the Submission Q and waits for a
315 * number of responses (completion events). If this returns normally
316 * with value {@code n > 0}, this means that n requests have been accepted
317 * by the kernel. A normal return also means that the requested number of
318 * completion events have been received {@link #pollCompletion()} can be
319 * called {@code nreceive} times to obtain the results.
320 *
321 * @param nsubmit number of requests to submit
322 * @param nreceive block until this number of events received
323 * @param flags flags to pass to io_uring_enter
324 *
325 * @return if return value less than 0 means an error occurred. Otherwise,
326 * the number of Sqes successfully submitted.
327 */
328 public int enter(int nsubmit, int nreceive, int flags) throws IOException {
329 if (TRACE) System.out.printf("enter([fd:%d] %d, %d, %d) called\n",
330 this.fd, nsubmit, nreceive, flags);
331
332 if (nreceive > 0) {
333 flags |= IORING_ENTER_GETEVENTS();
334 }
335 int res = io_uring_enter(this.fd, nsubmit, nreceive, flags);
336 if (TRACE) System.out.printf("enter [fd:%d] returns %s\n",
337 this.fd, getError(res));
338 return res;
339 }
340
341 /**
342 * Return a String that is either the integer value
343 * if it is greater than or equal to zero, or
344 * a description of the error if less than zero
345 */
346 public static String getError(int retcode) {
347 if (retcode >= 0) {
348 return Integer.toString(retcode);
349 } else {
350 return strerror(-retcode);
351 }
352 }
353
354 /**
355 * In polling mode, use this instead of enter() on the submission side
356 * to check if kernel poller needs to be woken up. It checks if the kernel
357 * polling thread has exited, and if so it restarts it.
358 */
359 public void pollingEnter() throws IOException {
360 if (TRACE) System.out.printf("pollingEnter([fd:%d]) called\n", this.fd);
361 if (!sq.polling())
362 throw new IllegalStateException("IOUring not in polling mode");
363
364 if ((sq.getSQFlags() & IORING_SQ_NEED_WAKEUP()) > 0) {
365 if (TRACE) System.out.println("pollingEnter: waking up kernel");
366 enter(0, 0, IORING_ENTER_SQ_WAKEUP());
367 }
368 if (TRACE) System.out.printf("pollingEnter [fd:%d] return\n", this.fd);
369 }
370
371 /**
372 * Returns the allocated size of the Submission Q. If the requested size
373 * was not a power of 2, then the allocated size will be the next highest
374 * power of 2.
375 *
376 * @return
377 */
378 public int sqsize() {
379 return sq.ringSize;
380 }
381
382 /**
383 * Returns the number of free entries in the Submission Q
384 */
385 public int sqfree() {
386 return sq.nAvail();
387 }
388
389 /**
390 * Returns whether the completion Q is empty or not.
391 *
392 * @return
393 */
394 public boolean cqempty() {
395 return cq.nEntries() == 0;
396 }
397
398 /**
399 * Returns the allocated size of the Completion Q.
400 * Currently, double the size of the Submission Q
401 *
402 * @return
403 */
404 public int cqsize() {
405 return cq.ringSize;
406 }
407
408 public int epoll_fd() {
409 return epollfd;
410 }
411
412 /**
413 * Polls the Completion Queue for results.
414 *
415 * @return a Cqe if available or {@code null}
416 */
417 public Cqe pollCompletion() {
418 Cqe cqe = cq.pollHead();
419 if (TRACE)
420 System.out.printf("pollCompletion: -> %s\n", cqe);
421 return cqe;
422 }
423
424 /**
425 * Returns a String description of the given errno value
426 *
427 * @param errno
428 * @return
429 */
430 public static String strerror(int errno) {
431 return Util.strerror(errno);
432 }
433
434 private static int io_uring_setup(int entries, MemorySegment params)
435 throws IOException {
436 SystemCallContext ctx = SystemCallContext.get();
437 int ret;
438 try {
439 ret = (int)setup_fn.invokeExact(ctx.errnoCaptureSegment(),
440 NR_io_uring_setup, entries, params);
441 } catch (Throwable t) {
442 throw ioexception(t);
443 }
444 return ctx.throwIOExceptionOnError(ret);
445 }
446
447 private static final int EINTR = 4;
448
449 private static int io_uring_enter(int fd, int to_submit, int min_complete,
450 int flags) throws IOException {
451 SystemCallContext ctx = SystemCallContext.get();
452 int ret;
453 try {
454 do {
455 ret = (int) enter_fn.invokeExact(ctx.errnoCaptureSegment(),
456 NR_io_uring_enter, fd, to_submit, min_complete,
457 flags, MemorySegment.NULL);
458 } while (ret == -1 && ctx.lastErrno() == EINTR);
459 } catch (Throwable t) {
460 throw ioexception(t);
461 }
462 return ctx.throwIOExceptionOnError(ret);
463 }
464
465 static IOException ioexception(Throwable t) {
466 if (t instanceof IOException ioe) {
467 return ioe;
468 } else {
469 return new IOException(t);
470 }
471 }
472
473 int checkAndGetIndexFor(ByteBuffer buffer) {
474 return mappedBuffers.checkAndGetIndexForBuffer(buffer);
475 }
476
477 /**
478 * Returns a mapped direct ByteBuffer or {@code null} if none available.
479 * Mapped buffers must be used with some IOUring operations such as
480 * {@code IORING_OP_WRITE_FIXED} and {@code IORING_OP_READ_FIXED}.
481 * Buffers must be returned after use with
482 * {@link #returnRegisteredBuffer(ByteBuffer)}.
483 *
484 * @return
485 */
486 public ByteBuffer getRegisteredBuffer() {
487 return mappedBuffers.getRegisteredBuffer();
488 }
489
490 /**
491 * Returns a previously allocated registered buffer.
492 *
493 * @param buffer
494 */
495 public void returnRegisteredBuffer(ByteBuffer buffer) {
496 mappedBuffers.returnRegisteredBuffer(buffer);
497 }
498
499 /**
500 * Common capabilities of SubmissionQueue and CompletionQueue
501 */
502 sealed abstract class QueueImplBase permits SubmissionQueue, CompletionQueue {
503 protected final MemorySegment ringSeg;
504 private final MemorySegment head, tail;
505 protected final int ringMask;
506 protected final MemoryLayout ringLayout;
507 protected final int ringLayoutSize;
508 protected final int ringLayoutAlignment;
509 protected final int ringSize;
510
511 // For accessing head and tail as volatile
512 protected final VarHandle addrH;
513
514 /**
515 *
516 * @param ringSeg The mapped segment
517 * @param head The head pointer
518 * @param tail The tail pointer
519 * @param ringMask
520 * @param ringLayout
521 */
522 QueueImplBase(MemorySegment ringSeg, MemorySegment head,
523 MemorySegment tail, int ringMask,
524 MemoryLayout ringLayout) {
525 this.ringSeg = ringSeg;
526 this.head = head;
527 this.tail = tail;
528 this.ringMask = ringMask;
529 this.ringSize = ringMask + 1;
530 this.ringLayout = ringLayout;
531 this.ringLayoutSize = (int)ringLayout.byteSize();
532 this.ringLayoutAlignment = (int)ringLayout.byteAlignment();
533 this.addrH = ValueLayout.JAVA_INT.varHandle();
534 }
535
536 abstract int nEntries();
537
538 boolean ringFull() {
539 return nEntries() == ringSize;
540 }
541
542 int nAvail() {
543 return ringSize - nEntries();
544 }
545
546 protected int getHead(boolean withAcquire) {
547 int val = (int)(withAcquire
548 ? addrH.getAcquire(head, 0L) : addrH.get(head, 0L));
549 return val;
550 }
551
552 protected int getTail(boolean withAcquire) {
553 int val = (int)(withAcquire
554 ? addrH.getAcquire(tail, 0L) : addrH.get(tail, 0L));
555 return val;
556 }
557
558 // Used by CompletionQueue
559 protected void setHead(int val) {
560 addrH.setRelease(head, 0L, val);
561 }
562
563 // Used by SubmissionQueue
564 protected void setTail(int val) {
565 addrH.setRelease(tail, 0L, val);
566 }
567 }
568
569 final class SubmissionQueue extends QueueImplBase {
570 final MemorySegment sqes;
571 final MemorySegment flags;
572 final int n_sqes;
573 final VarHandle flagsH; // handle for accessing flags
574 final boolean polling;
575
576 static final int sqe_layout_size =
577 (int)io_uring_sqe.$LAYOUT().byteSize();
578
579 static final int sqe_alignment =
580 (int)io_uring_sqe.$LAYOUT().byteAlignment();
581
582 SubmissionQueue(MemorySegment ringSeg, MemorySegment head,
583 MemorySegment tail, int mask,
584 MemorySegment flags, boolean polling,
585 MemorySegment sqes) {
586 super(ringSeg, head, tail, mask, ValueLayout.JAVA_INT);
587 this.sqes = sqes;
588 this.flags = flags;
589 this.polling = polling;
590 this.flagsH = ValueLayout.JAVA_INT.varHandle();
591 this.n_sqes = (int) (sqes.byteSize() / sqe_layout_size);
592 }
593
594 /**
595 * Submits an Sqe to Submission Q.
596 * @param sqe
597 * @return true if the submission succeeded, false if the Q is full
598 */
599 public boolean submit(Sqe sqe) throws IOException {
600 if (ringFull()) {
601 return false;
602 }
603
604 int tailVal = getTail(false);
605 int tailIndex = tailVal & ringMask;
606
607 MemorySegment slot = sqes.asSlice(
608 (long) tailIndex * sqe_layout_size,
609 sqe_layout_size, sqe_alignment).fill((byte)0);
610 if (slot == null)
611 throw new IOException("Q full"); // shouldn't happen
612 // Populate the slot as an io_uring_sqe
613 // Note. Sqe has already validated that overlapping fields not set
614 io_uring_sqe.user_data(slot, sqe.user_data());
615 io_uring_sqe.fd(slot, sqe.fd());
616 io_uring_sqe.opcode(slot, (byte)sqe.opcode());
617 // This statement handles the large flags union
618 // For simplicity all __u32 variants are handled
619 // as xxx_flags. poll_events (__u16) are special
620 sqe.xxx_flags().ifPresentOrElse(
621 u32 -> io_uring_sqe.open_flags(slot, u32),
622 // xxx_flags not present, poll_events may be
623 () -> sqe.poll_events().ifPresent(
624 u16 -> io_uring_sqe.poll_events(slot, (short)u16)));
625
626 io_uring_sqe.flags(slot, (byte)sqe.flags());
627 io_uring_sqe.addr(slot, sqe.addr()
628 .orElse(MemorySegment.NULL).address());
629 io_uring_sqe.addr2(slot, sqe.addr2()
630 .orElse(MemorySegment.NULL).address());
631 io_uring_sqe.buf_index(slot, (short)sqe.buf_index().orElse(0));
632 io_uring_sqe.off(slot, sqe.off().orElse(0L));
633 io_uring_sqe.len(slot, sqe.len().orElse(0));
634 // Populate the tail slot
635 ringSeg.setAtIndex(ValueLayout.JAVA_INT, tailIndex, tailIndex);
636 setTail(++tailVal);
637 return true;
638 }
639
640 /*
641 * Returns the SQ flags for this ring. Currently this is only used
642 * to read the IORING_SQ_NEED_WAKEUP if submission Q being used in
643 * SQPOLL mode. The kernel sets this flag if the kernel polling
644 * thread needs to be woken up.
645 */
646 public int getSQFlags() {
647 int res = (int)flagsH.getOpaque(flags, 0);
648 return res;
649 }
650
651 @Override
652 int nEntries() {
653 int n = Math.abs(getTail(false) - getHead(true));
654 return n;
655 }
656
657 /**
658 * Returns whether this Submission Q is using polling
659 */
660 public boolean polling() {
661 return this.polling;
662 }
663 }
664
665 final class CompletionQueue extends QueueImplBase {
666 CompletionQueue(MemorySegment ringSeg, MemorySegment head,
667 MemorySegment tail, int mask) {
668 super(ringSeg, head, tail, mask, io_uring_cqe.$LAYOUT());
669 }
670
671 public Cqe pollHead() {
672 int headVal = getHead(false);
673 if (headVal != getTail(true)) {
674 int index = headVal & ringMask;
675 int offset = index * ringLayoutSize;
676 MemorySegment seg = ringSeg.asSlice(offset,
677 ringLayoutSize, ringLayoutAlignment);
678 var res = new Cqe(
679 io_uring_cqe.user_data(seg),
680 io_uring_cqe.res(seg),
681 io_uring_cqe.flags(seg));
682 headVal++;
683 setHead(headVal);
684 return res;
685 } else {
686 return null;
687 }
688 }
689
690 @Override
691 int nEntries() {
692 int n = Math.abs(getTail(true) - getHead(false));
693 return n;
694 }
695 };
696
697 /**
698 * Adds the given fd to this ring's epoll(7) instance
699 * and creates the epoll instance if it hasn't already been created
700 *
701 * If using the EPOLLONESHOT mode (in flags) the opaque field
702 * can be used to return the "id" of the specific operation that was
703 * kicked off.
704 *
705 * @param fd target fd to manage
706 * @param poll_events bit mask of events to activate
707 * @param opaque a 64 bit value to return with event notifications.
708 * A value of -1L is ignored.
709 * @throws IOException
710 * @throws InterruptedException
711 */
712 public void epoll_add(int fd, int poll_events, long opaque)
713 throws IOException, InterruptedException {
714 epoll_op(fd, poll_events, opaque, EPOLL_CTL_ADD());
715 }
716
717 public void epoll_del(int fd, int poll_events)
718 throws IOException, InterruptedException {
719 epoll_op(fd, poll_events, -1L, EPOLL_CTL_DEL());
720 }
721
722 public void epoll_mod(int fd, int poll_events, long opaque)
723 throws IOException, InterruptedException {
724 epoll_op(fd, poll_events, opaque, EPOLL_CTL_DEL());
725 }
726
727 private void epoll_op(int fd, int poll_events, long opaque, int op)
728 throws IOException, InterruptedException {
729 if (this.epollfd == -1) {
730 this.epollfd = initEpoll();
731 }
732
733 MemorySegment targetfd =
734 arena.allocateFrom(ValueLayout.OfInt.JAVA_INT, fd);
735
736 Sqe request = new Sqe()
737 .opcode(IORING_OP_EPOLL_CTL())
738 .fd(epollfd)
739 .addr(targetfd)
740 .xxx_flags(poll_events)
741 .len(op);
742
743 if (opaque != -1L) {
744 MemorySegment event = arena.allocate(epoll_event.$LAYOUT());
745 epoll_event.events(event, poll_events);
746 var dataSlice = epoll_event.data(event);
747 epoll_data_t.u64(dataSlice, opaque);
748 request = request.off(event.address());
749 }
750 submit(request);
751 }
752
753 static MemorySegment getSegmentFor(MemoryLayout layout) {
754 return arena.allocate(layout.byteSize(), layout.byteAlignment())
755 .fill((byte)0);
756 }
757
758 static String errorString(int errno) {
759 errno = -errno;
760 return "Error: " + strerror(errno);
761 }
762
763 // This is obsolete. There is a better way of doing a timed
764 // poll by providing a timeval to io_uring_enter
765 public Sqe getTimeoutSqe(Duration maxwait, int opcode, int completionCount) {
766 MemorySegment seg =
767 arena.allocate(__kernel_timespec.$LAYOUT()).fill((byte)(0));
768
769 __kernel_timespec.tv_sec(seg, maxwait.getSeconds());
770 __kernel_timespec.tv_nsec(seg, maxwait.getNano());
771 return new Sqe()
772 .opcode(opcode)
773 .addr(seg)
774 .xxx_flags(0) // timeout_flags
775 .off(completionCount)
776 .len(1);
777 }
778
779 private final static ValueLayout POINTER =
780 ValueLayout.ADDRESS.withTargetLayout(
781 MemoryLayout.sequenceLayout(Long.MAX_VALUE, JAVA_BYTE)
782 );
783
784 private static final MethodHandle mmap_fn = locateStdHandle(
785 "mmap", FunctionDescriptor.of(
786 POINTER,
787 //ValueLayout.JAVA_LONG, // returned address
788 ValueLayout.JAVA_LONG, // input address, usually zero
789 ValueLayout.JAVA_LONG, // size_t
790 ValueLayout.JAVA_INT, // int prot (PROT_READ | PROT_WRITE)
791 ValueLayout.JAVA_INT, // int flags (MAP_SHARED|MAP_POPULATE)
792 ValueLayout.JAVA_INT, // int fd
793 ValueLayout.JAVA_LONG // off_t (64bit?)
794 )
795 );
796
797 private static final MethodHandle epoll_create_fn = locateStdHandle(
798 "epoll_create", FunctionDescriptor.of(
799 ValueLayout.JAVA_INT, // returned fd
800 ValueLayout.JAVA_INT // int size (ignored)
801 ), SystemCallContext.errnoLinkerOption()
802 );
803
804 private static final MethodHandle close_fn = locateStdHandle(
805 "close",
806 FunctionDescriptor.of(ValueLayout.JAVA_INT, ValueLayout.JAVA_INT),
807 SystemCallContext.errnoLinkerOption()
808 );
809
810 private static final MethodHandle eventfd_fn = locateStdHandle(
811 "eventfd",
812 FunctionDescriptor.of(
813 ValueLayout.JAVA_INT,
814 ValueLayout.JAVA_INT,
815 ValueLayout.JAVA_INT),
816 SystemCallContext.errnoLinkerOption()
817 );
818
819 // Linux syscall numbers. Allows to invoke the system call
820 // directly in systems where there are no wrappers
821 // for these functions in libc or liburing.
822 // Also means we no longer use liburing
823
824 private static final int NR_io_uring_setup = 425;
825 private static final int NR_io_uring_enter = 426;
826 private static final int NR_io_uring_register = 427;
827
828 private static final MethodHandle setup_fn = locateStdHandle(
829 "syscall", FunctionDescriptor.of(
830 ValueLayout.JAVA_INT,
831 ValueLayout.JAVA_INT,
832 ValueLayout.JAVA_INT,
833 ValueLayout.ADDRESS),
834 SystemCallContext.errnoLinkerOption()
835 );
836
837 private static final MethodHandle enter_fn = locateStdHandle(
838 "syscall", FunctionDescriptor.of(ValueLayout.JAVA_INT,
839 ValueLayout.JAVA_INT,
840 ValueLayout.JAVA_INT,
841 ValueLayout.JAVA_INT,
842 ValueLayout.JAVA_INT,
843 ValueLayout.JAVA_INT,
844 ValueLayout.ADDRESS),// sigset_t UNUSED for now
845 SystemCallContext.errnoLinkerOption()
846 );
847
848 // io_uring_register specifically for
849 // IORING_REGISTER_EVENTFD and IORING_UNREGISTER_EVENTFD
850 private static final MethodHandle evregister_fn = locateStdHandle(
851 "syscall",
852 FunctionDescriptor.of(ValueLayout.JAVA_INT, // result
853 ValueLayout.JAVA_INT, // syscall
854 ValueLayout.JAVA_INT, // ring fd
855 ValueLayout.JAVA_INT, // opcode
856 INT_POINTER, // pointer to fd
857 ValueLayout.JAVA_INT),// integer value 1
858 SystemCallContext.errnoLinkerOption()
859 );
860
861 // mmap constants used internally
862 private static final int PROT_READ = 1;
863 private static final int PROT_WRITE = 2;
864 private static final int MAP_SHARED = 1;
865 private static final int MAP_POPULATE = 0x8000;
866
867 /**
868 * offset (when mapping IOURING segments) must be one of:
869 * jdk.internal.ffi.generated.iouring.iouring_h.IORING_OFF_SQ_RING()
870 * jdk.internal.ffi.generated.iouring.iouring_h.IORING_OFF_CQ_RING()
871 * jdk.internal.ffi.generated.iouring.iouring_h.IORING_OFF_SQES()
872 *
873 * @param size
874 * @param fd
875 * @param offset
876 * @return
877 */
878 private static MemorySegment mmap(long size, int fd, long offset) {
879 MemorySegment seg = null;
880 try {
881 seg = (MemorySegment)mmap_fn
882 .invokeExact(0L, size,
883 PROT_READ | PROT_WRITE,
884 MAP_SHARED | MAP_POPULATE,
885 fd,
886 offset
887 );
888 } catch (Throwable e) {
889 throw new RuntimeException(e);
890 }
891 long addr = seg.address();
892 return seg.reinterpret(size);
893 }
894
895 int ringFd() {
896 return fd;
897 }
898 }