1 /*
  2  * Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 
 26 package sun.nio.ch.iouring;
 27 
 28 import jdk.internal.ffi.generated.iouring.*;
 29 
 30 import java.io.IOException;
 31 import java.lang.foreign.*;
 32 import java.lang.invoke.MethodHandle;
 33 import java.lang.invoke.VarHandle;
 34 import java.nio.ByteBuffer;
 35 import java.time.Duration;
 36 
 37 import static java.lang.foreign.ValueLayout.JAVA_BYTE;
 38 import static sun.nio.ch.iouring.Util.strerror;
 39 import static sun.nio.ch.iouring.Util.locateHandleFromLib;
 40 import static sun.nio.ch.iouring.Util.locateStdHandle;
 41 import static sun.nio.ch.iouring.Util.INT_POINTER;
 42 import static jdk.internal.ffi.generated.iouring.iouring_h.*;
 43 import static jdk.internal.ffi.generated.iouring.iouring_h_1.IORING_REGISTER_EVENTFD;
 44 import static jdk.internal.ffi.generated.iouring.iouring_h_1.IORING_UNREGISTER_EVENTFD;
 45 
 46 /**
 47  * Low level interface to a Linux io_uring. It provides an asynchronous
 48  * interface. Requests are submitted through the {@link #submit(Sqe)} method.
 49  * Completion events can be awaited by calling {@link #enter(int, int, int)}.
 50  * Completions represented by {@link Cqe} are then obtained by calling
 51  * {@link #pollCompletion()}. Completions are linked to submissions by the
 52  * {@link Cqe#user_data()} field of the {@code Cqe} which contains the
 53  * same 64-bit (long) value that was supplied in the submitted {@link Sqe}.
 54  * <p>
 55  * Some IOUring operations work with kernel registered direct ByteBuffers.
 56  * When creating an IOUring instance, a number of these buffers can be
 57  * created in a pool. Registered buffers are not used with regular
 58  * IOUring read/write operations.
 59  */
 60 @SuppressWarnings("restricted")
 61 public class IOUring {
 62     private static final Arena arena = Arena.ofAuto();
 63 
 64     private static final boolean TRACE = System
 65             .getProperty("jdk.io_uring.trace", "false")
 66             .equalsIgnoreCase("true");
 67     private final SubmissionQueue sq;
 68     private final CompletionQueue cq;
 69     private final int fd;               // The ringfd
 70     private int epollfd = -1;           // The epoll(7) if set
 71     private static final int INT_SIZE = (int)ValueLayout.JAVA_INT.byteSize();
 72 
 73     private final Arena autoArena = Arena.ofAuto();
 74 
 75     private final KMappedBuffers mappedBuffers;
 76 
 77     /**
 78      * Creates an IOURing and initializes the ring structures. {@code entries}
 79      * (or the next higher power of 2) is the size of the Submission Queue.
 80      * Currently, the completion queue returned will be double the size
 81      * of the Submission queue.
 82      */
 83     public IOUring(int entries) throws IOException {
 84         this(entries, 0, 0);
 85     }
 86 
 87     /**
 88      * Creates an IOURing and initializes the ring structures.
 89      * @param sq_entries the number of submission queue entries to allocate
 90      * @param cq_entries the number of completion queue entries to allocate
 91      * @param flags io_uring_params flags
 92      * @throws IOException if an IOException occurs
 93      */
 94     public IOUring(int sq_entries, int cq_entries, int flags) throws IOException {
 95         this(sq_entries, cq_entries, 0, 0, -1, 0);
 96     }
 97 
 98     /**
 99      * Creates an IOURing initializes the ring structures and allocates a
100      * number of direct {@link ByteBuffer}s which are additionally mapped
101      * into the kernel address space.
102      *
103      * @param sq_entries the number of submission queue entries to allocate
104      * @param cq_entries the number of completion queue entries to allocate
105      * @param flags io_uring_params flags
106      * @param nmappedBuffers number of mapped direct ByteBuffers to create
107      * @param mappedBufsize size of each buffer in bytes
108      * @param poll_idle_time the number of milliseconds to allow kernel polling
109      *        thread to remain idle. {@code 0} means polling disabled.
110      * @throws IOException if an IOException occurs
111      */
112     public IOUring(int sq_entries,
113                        int cq_entries,
114                        int flags,
115                        int nmappedBuffers,
116                        int mappedBufsize,
117                        int poll_idle_time) throws IOException {
118         if (TRACE) {
119             System.out.printf("IOUring (sq:%d, cq:%d, flg:%d, nbufs:%d, bufsiz:%d, sqidle:%d)\n",
120                 sq_entries, cq_entries, flags, nmappedBuffers, mappedBufsize, poll_idle_time);
121         }
122 
123         MemorySegment params_seg = getSegmentFor(io_uring_params.$LAYOUT());
124 
125         if (cq_entries > 0) {
126             io_uring_params.cq_entries(params_seg, cq_entries);
127             flags |= IORING_SETUP_CQSIZE();
128         }
129 
130         boolean polling = false;
131 
132         if (poll_idle_time > 0) {
133             io_uring_params.sq_thread_idle(params_seg, poll_idle_time);
134             flags |= IORING_SETUP_SQPOLL();
135             polling = true;
136         }
137 
138         if (flags != 0) {
139             io_uring_params.flags(params_seg, flags);
140         }
141 
142         try {
143             // call setup
144             fd = io_uring_setup(sq_entries, params_seg);
145             if (fd < 0) {
146                 throw new IOException(errorString(fd));
147             }
148         } catch (Throwable t) {
149             if (TRACE)
150                 t.printStackTrace();
151             throw t;
152         }
153 
154         if (poll_idle_time > 0) {
155             int i = io_uring_params.sq_thread_idle(params_seg);
156             if (TRACE) System.out.printf("poll_idle_time = %d\n", i);
157         }
158         mappedBuffers = new KMappedBuffers(nmappedBuffers, mappedBufsize);
159         if (nmappedBuffers > 0) {
160             mappedBuffers.register(fd);
161         }
162         // Offsets segments
163         MemorySegment cq_off_seg = io_uring_params.cq_off(params_seg);
164         MemorySegment sq_off_seg = io_uring_params.sq_off(params_seg);
165 
166         // Offsets to cqe array and the sqe index array
167         int cq_off_cqes = io_cqring_offsets.cqes(cq_off_seg);
168         int sq_off_array = io_sqring_offsets.array(sq_off_seg);
169         int sq_off_flags = io_sqring_offsets.flags(sq_off_seg);
170 
171         // Acual number of entries in each Q
172         sq_entries = io_uring_params.sq_entries(params_seg);
173         cq_entries = io_uring_params.cq_entries(params_seg);
174 
175         int sq_size = sq_off_array + sq_entries * INT_SIZE;
176         int cq_size = cq_off_cqes + cq_entries * (int)io_uring_cqe.sizeof();
177 
178         boolean singleMmap = (io_uring_params.features(params_seg)
179                 & IORING_FEAT_SINGLE_MMAP()) != 0;
180 
181         if (singleMmap) {
182             if (cq_size > sq_size)
183                 sq_size = cq_size;
184             cq_size = sq_size;
185         }
186         var sqe_seg = mmap(sq_size, fd, IORING_OFF_SQ_RING());
187 
188         MemorySegment cqes_seg;
189         if (singleMmap) {
190             cqes_seg = sqe_seg;
191         } else {
192             cqes_seg = mmap(cq_size, fd, IORING_OFF_CQ_RING());
193         }
194 
195         // Masks
196         int sq_mask = sqe_seg.get(ValueLayout.JAVA_INT,
197                                   io_sqring_offsets.ring_mask(sq_off_seg));
198         int cq_mask = cqes_seg.get(ValueLayout.JAVA_INT,
199                                    io_cqring_offsets.ring_mask(cq_off_seg));
200 
201         var sqes = mmap(sq_entries * io_uring_sqe.sizeof(),
202                         fd, IORING_OFF_SQES());
203 
204         cq = new CompletionQueue(cqes_seg.asSlice(cq_off_cqes),
205                 cqes_seg.asSlice(io_cqring_offsets.head(cq_off_seg)),
206                 cqes_seg.asSlice(io_cqring_offsets.tail(cq_off_seg)),
207                 cq_mask);
208 
209         sq = new SubmissionQueue(sqe_seg.asSlice(sq_off_array),
210                 sqe_seg.asSlice(io_sqring_offsets.head(sq_off_seg)),
211                 sqe_seg.asSlice(io_sqring_offsets.tail(sq_off_seg)),
212                 sq_mask, sqe_seg.asSlice(sq_off_flags), polling,
213                 sqes);
214         if (TRACE)
215             System.out.printf("IOUring: ringfd: %d\n", fd);
216     }
217 
218 
219     public void close() throws IOException {
220         int ret;
221         SystemCallContext ctx = SystemCallContext.get();
222         try {
223             ret = (int)close_fn.invokeExact(ctx.errnoCaptureSegment(),
224                                             ringFd());
225         } catch (Throwable e) {
226             throw new RuntimeException(e);
227         }
228         ctx.throwIOExceptionOnError(ret);
229 
230     }
231 
232     public int eventfd() throws IOException {
233         int ret;
234         SystemCallContext ctx = SystemCallContext.get();
235         try {
236             ret = (int)eventfd_fn.invokeExact(ctx.errnoCaptureSegment(),
237                                             0, 0);
238         } catch (Throwable e) {
239             throw new RuntimeException(e);
240         }
241         return ctx.throwIOExceptionOnError(ret);
242     }
243 
244     private int initEpoll() throws IOException {
245         int ret;
246         SystemCallContext ctx = SystemCallContext.get();
247         try {
248             ret = (int)epoll_create_fn.invokeExact(ctx.errnoCaptureSegment(),
249                                                    ringFd(), 1);
250         } catch (Throwable e) {
251             throw new RuntimeException(e);
252         }
253         return ctx.throwIOExceptionOnError(ret);
254     }
255 
256     public void register_eventfd(int efd) throws IOException {
257         int ret;
258         SystemCallContext ctx = SystemCallContext.get();
259         MemorySegment fdseg =
260             arena.allocateFrom(ValueLayout.JAVA_INT, efd);
261 
262         try {
263             ret = (int)evregister_fn
264                     .invokeExact(
265                             ctx.errnoCaptureSegment(),
266                             NR_io_uring_register,
267                             fd, IORING_REGISTER_EVENTFD(),
268                             fdseg, 1
269                     );
270         } catch (Throwable e) {
271             throw new RuntimeException(e);
272         }
273         ctx.throwIOExceptionOnError(ret);
274     }
275 
276     public void unregister_eventfd() throws IOException {
277         int ret;
278         SystemCallContext ctx = SystemCallContext.get();
279 
280         try {
281             ret = (int)evregister_fn
282                     .invokeExact(
283                             ctx.errnoCaptureSegment(),
284                             NR_io_uring_register,
285                             fd, IORING_UNREGISTER_EVENTFD(),
286                             MemorySegment.NULL, 0
287                     );
288         } catch (Throwable e) {
289             throw new RuntimeException(e);
290         }
291         ctx.throwIOExceptionOnError(ret);
292 
293     }
294 
295     /**
296      * Asynchronously submits an Sqe to this IOUring. Can be called
297      * multiple times before enter().
298      *
299      * @param sqe
300      * @throws IOException if submission q full
301      */
302     public void submit(Sqe sqe) throws IOException {
303         if (!sq.submit(sqe)) {
304             enter(0, 0, IORING_ENTER_SQ_WAIT());
305             if (!sq.submit(sqe)) {
306                 throw new IOException("Submission Queue full: wait failed");
307             }
308         }
309         if (TRACE)
310             System.out.printf("submit: %s \n", sqe);
311     }
312 
313     /**
314      * Notifies the kernel of entries on the Submission Q and waits for a
315      * number of responses (completion events). If this returns normally
316      * with value {@code n > 0}, this means that n requests have been accepted
317      * by the kernel. A normal return also means that the requested number of
318      * completion events have been received {@link #pollCompletion()} can be
319      * called {@code nreceive} times to obtain the results.
320      *
321      * @param nsubmit number of requests to submit
322      * @param nreceive block until this number of events received
323      * @param flags flags to pass to io_uring_enter
324      *
325      * @return if return value less than 0 means an error occurred. Otherwise,
326      *         the number of Sqes successfully submitted.
327      */
328     public int enter(int nsubmit, int nreceive, int flags) throws IOException {
329         if (TRACE) System.out.printf("enter([fd:%d] %d, %d, %d) called\n",
330             this.fd, nsubmit, nreceive, flags);
331 
332         if (nreceive > 0) {
333             flags |= IORING_ENTER_GETEVENTS();
334         }
335         int res = io_uring_enter(this.fd, nsubmit, nreceive, flags);
336         if (TRACE) System.out.printf("enter [fd:%d] returns %s\n",
337                                      this.fd, getError(res));
338         return res;
339     }
340 
341     /**
342      * Return a String that is either the integer value
343      * if it is greater than or equal to zero, or
344      * a description of the error if less than zero
345      */
346     public static String getError(int retcode) {
347         if (retcode >= 0) {
348             return Integer.toString(retcode);
349         } else {
350             return strerror(-retcode);
351         }
352     }
353 
354     /**
355      * In polling mode, use this instead of enter() on the submission side
356      * to check if kernel poller needs to be woken up. It checks if the kernel
357      * polling thread has exited, and if so it restarts it.
358      */
359     public void pollingEnter() throws IOException {
360         if (TRACE) System.out.printf("pollingEnter([fd:%d]) called\n", this.fd);
361         if (!sq.polling())
362             throw new IllegalStateException("IOUring not in polling mode");
363 
364         if ((sq.getSQFlags() & IORING_SQ_NEED_WAKEUP()) > 0) {
365             if (TRACE) System.out.println("pollingEnter: waking up kernel");
366             enter(0, 0, IORING_ENTER_SQ_WAKEUP());
367         }
368         if (TRACE) System.out.printf("pollingEnter [fd:%d] return\n", this.fd);
369     }
370 
371     /**
372      * Returns the allocated size of the Submission Q. If the requested size
373      * was not a power of 2, then the allocated size will be the next highest
374      * power of 2.
375      *
376      * @return
377      */
378     public int sqsize() {
379         return sq.ringSize;
380     }
381 
382     /**
383      * Returns the number of free entries in the Submission Q
384      */
385     public int sqfree() {
386         return sq.nAvail();
387     }
388 
389     /**
390      * Returns whether the completion Q is empty or not.
391      *
392      * @return
393      */
394     public boolean cqempty() {
395         return cq.nEntries() == 0;
396     }
397 
398     /**
399      * Returns the allocated size of the Completion Q.
400      * Currently, double the size of the Submission Q
401      *
402      * @return
403      */
404     public int cqsize() {
405         return cq.ringSize;
406     }
407 
408     public int epoll_fd() {
409         return epollfd;
410     }
411 
412     /**
413      * Polls the Completion Queue for results.
414      *
415      * @return a Cqe if available or {@code null}
416      */
417     public Cqe pollCompletion() {
418         Cqe cqe = cq.pollHead();
419         if (TRACE)
420             System.out.printf("pollCompletion: -> %s\n", cqe);
421         return cqe;
422     }
423 
424     /**
425      * Returns a String description of the given errno value
426      *
427      * @param errno
428      * @return
429      */
430     public static String strerror(int errno) {
431         return Util.strerror(errno);
432     }
433 
434     private static int io_uring_setup(int entries, MemorySegment params)
435             throws IOException {
436         SystemCallContext ctx = SystemCallContext.get();
437         int ret;
438         try {
439             ret = (int)setup_fn.invokeExact(ctx.errnoCaptureSegment(),
440                                             NR_io_uring_setup, entries, params);
441         } catch (Throwable t) {
442             throw ioexception(t);
443         }
444         return ctx.throwIOExceptionOnError(ret);
445     }
446 
447     private static final int EINTR = 4;
448 
449     private static int io_uring_enter(int fd, int to_submit, int min_complete,
450                                       int flags) throws IOException {
451         SystemCallContext ctx = SystemCallContext.get();
452         int ret;
453         try {
454             do {
455                 ret = (int) enter_fn.invokeExact(ctx.errnoCaptureSegment(),
456                             NR_io_uring_enter, fd, to_submit, min_complete,
457                             flags, MemorySegment.NULL);
458             } while (ret == -1 && ctx.lastErrno() == EINTR);
459         } catch (Throwable t) {
460             throw ioexception(t);
461         }
462         return ctx.throwIOExceptionOnError(ret);
463     }
464 
465     static IOException ioexception(Throwable t) {
466         if (t instanceof IOException ioe) {
467             return ioe;
468         } else {
469             return new IOException(t);
470         }
471     }
472 
473     int checkAndGetIndexFor(ByteBuffer buffer) {
474         return mappedBuffers.checkAndGetIndexForBuffer(buffer);
475     }
476 
477     /**
478      * Returns a mapped direct ByteBuffer or {@code null} if none available.
479      * Mapped buffers must be used with some IOUring operations such as
480      * {@code IORING_OP_WRITE_FIXED} and {@code IORING_OP_READ_FIXED}.
481      * Buffers must be returned after use with
482      * {@link #returnRegisteredBuffer(ByteBuffer)}.
483      *
484      * @return
485      */
486     public ByteBuffer getRegisteredBuffer() {
487         return mappedBuffers.getRegisteredBuffer();
488     }
489 
490     /**
491      * Returns a previously allocated registered buffer.
492      *
493      * @param buffer
494      */
495     public void returnRegisteredBuffer(ByteBuffer buffer) {
496         mappedBuffers.returnRegisteredBuffer(buffer);
497     }
498 
499     /**
500      * Common capabilities of SubmissionQueue and CompletionQueue
501      */
502     sealed abstract class QueueImplBase permits SubmissionQueue, CompletionQueue {
503         protected final MemorySegment ringSeg;
504         private final MemorySegment head, tail;
505         protected final int ringMask;
506         protected final MemoryLayout ringLayout;
507         protected final int ringLayoutSize;
508         protected final int ringLayoutAlignment;
509         protected final int ringSize;
510 
511         // For accessing head and tail as volatile
512         protected final VarHandle addrH;
513 
514         /**
515          *
516          * @param ringSeg The mapped segment
517          * @param head The head pointer
518          * @param tail The tail pointer
519          * @param ringMask
520          * @param ringLayout
521          */
522         QueueImplBase(MemorySegment ringSeg, MemorySegment head,
523                       MemorySegment tail, int ringMask,
524                       MemoryLayout ringLayout) {
525             this.ringSeg = ringSeg;
526             this.head = head;
527             this.tail = tail;
528             this.ringMask = ringMask;
529             this.ringSize = ringMask + 1;
530             this.ringLayout = ringLayout;
531             this.ringLayoutSize = (int)ringLayout.byteSize();
532             this.ringLayoutAlignment = (int)ringLayout.byteAlignment();
533             this.addrH = ValueLayout.JAVA_INT.varHandle();
534         }
535 
536         abstract int nEntries();
537 
538         boolean ringFull() {
539             return nEntries() == ringSize;
540         }
541 
542         int nAvail() {
543             return ringSize - nEntries();
544         }
545 
546         protected int getHead(boolean withAcquire) {
547             int val = (int)(withAcquire
548                 ? addrH.getAcquire(head, 0L) : addrH.get(head, 0L));
549             return val;
550         }
551 
552         protected int getTail(boolean withAcquire) {
553             int val = (int)(withAcquire
554                 ? addrH.getAcquire(tail, 0L) : addrH.get(tail, 0L));
555             return val;
556         }
557 
558         // Used by CompletionQueue
559         protected void setHead(int val) {
560             addrH.setRelease(head, 0L, val);
561         }
562 
563         // Used by SubmissionQueue
564         protected void setTail(int val) {
565             addrH.setRelease(tail, 0L, val);
566         }
567     }
568 
569     final class SubmissionQueue extends QueueImplBase {
570         final MemorySegment sqes;
571         final MemorySegment flags;
572         final int n_sqes;
573         final VarHandle flagsH;  // handle for accessing flags
574         final boolean polling;
575 
576         static final int sqe_layout_size =
577             (int)io_uring_sqe.$LAYOUT().byteSize();
578 
579         static final int sqe_alignment =
580             (int)io_uring_sqe.$LAYOUT().byteAlignment();
581 
582         SubmissionQueue(MemorySegment ringSeg, MemorySegment head,
583                         MemorySegment tail, int mask,
584                         MemorySegment flags, boolean polling,
585                         MemorySegment sqes) {
586             super(ringSeg, head, tail, mask, ValueLayout.JAVA_INT);
587             this.sqes = sqes;
588             this.flags = flags;
589             this.polling = polling;
590             this.flagsH = ValueLayout.JAVA_INT.varHandle();
591             this.n_sqes = (int) (sqes.byteSize() / sqe_layout_size);
592         }
593 
594         /**
595          * Submits an Sqe to Submission Q.
596          * @param sqe
597          * @return true if the submission succeeded, false if the Q is full
598          */
599         public boolean submit(Sqe sqe) throws IOException {
600             if (ringFull()) {
601                 return false;
602             }
603 
604             int tailVal = getTail(false);
605             int tailIndex = tailVal & ringMask;
606 
607             MemorySegment slot = sqes.asSlice(
608                     (long) tailIndex * sqe_layout_size,
609                     sqe_layout_size, sqe_alignment).fill((byte)0);
610             if (slot == null)
611                 throw new IOException("Q full"); // shouldn't happen
612             // Populate the slot as an io_uring_sqe
613             // Note. Sqe has already validated that overlapping fields not set
614             io_uring_sqe.user_data(slot, sqe.user_data());
615             io_uring_sqe.fd(slot, sqe.fd());
616             io_uring_sqe.opcode(slot, (byte)sqe.opcode());
617             // This statement handles the large flags union
618             // For simplicity all __u32 variants are handled
619             // as xxx_flags. poll_events (__u16) are special
620             sqe.xxx_flags().ifPresentOrElse(
621                 u32 -> io_uring_sqe.open_flags(slot, u32),
622                 // xxx_flags not present, poll_events may be
623                 () -> sqe.poll_events().ifPresent(
624                     u16 -> io_uring_sqe.poll_events(slot, (short)u16)));
625 
626             io_uring_sqe.flags(slot, (byte)sqe.flags());
627             io_uring_sqe.addr(slot, sqe.addr()
628                         .orElse(MemorySegment.NULL).address());
629             io_uring_sqe.addr2(slot, sqe.addr2()
630                         .orElse(MemorySegment.NULL).address());
631             io_uring_sqe.buf_index(slot, (short)sqe.buf_index().orElse(0));
632             io_uring_sqe.off(slot, sqe.off().orElse(0L));
633             io_uring_sqe.len(slot, sqe.len().orElse(0));
634             // Populate the tail slot
635             ringSeg.setAtIndex(ValueLayout.JAVA_INT, tailIndex, tailIndex);
636             setTail(++tailVal);
637             return true;
638         }
639 
640         /*
641          * Returns the SQ flags for this ring. Currently this is only used
642          * to read the IORING_SQ_NEED_WAKEUP if submission Q being used in
643          * SQPOLL mode. The kernel sets this flag if the kernel polling
644          * thread needs to be woken up.
645          */
646         public int getSQFlags() {
647             int res = (int)flagsH.getOpaque(flags, 0);
648             return res;
649         }
650 
651         @Override
652         int nEntries() {
653             int n = Math.abs(getTail(false) - getHead(true));
654             return n;
655         }
656 
657         /**
658          * Returns whether this Submission Q is using polling
659          */
660         public boolean polling() {
661             return this.polling;
662         }
663     }
664 
665     final class CompletionQueue extends QueueImplBase {
666         CompletionQueue(MemorySegment ringSeg, MemorySegment head,
667                         MemorySegment tail, int mask) {
668             super(ringSeg, head, tail, mask, io_uring_cqe.$LAYOUT());
669         }
670 
671         public Cqe pollHead() {
672             int headVal = getHead(false);
673             if (headVal != getTail(true)) {
674                 int index = headVal & ringMask;
675                 int offset = index * ringLayoutSize;
676                 MemorySegment seg = ringSeg.asSlice(offset,
677                         ringLayoutSize, ringLayoutAlignment);
678                 var res = new Cqe(
679                         io_uring_cqe.user_data(seg),
680                         io_uring_cqe.res(seg),
681                         io_uring_cqe.flags(seg));
682                 headVal++;
683                 setHead(headVal);
684                 return res;
685             } else {
686                 return null;
687             }
688         }
689 
690         @Override
691         int nEntries() {
692             int n = Math.abs(getTail(true) - getHead(false));
693             return n;
694         }
695     };
696 
697     /**
698      * Adds the given fd to this ring's epoll(7) instance
699      * and creates the epoll instance if it hasn't already been created
700      *
701      * If using the EPOLLONESHOT mode (in flags) the opaque field
702      * can be used to return the "id" of the specific operation that was
703      * kicked off.
704      *
705      * @param fd target fd to manage
706      * @param poll_events bit mask of events to activate
707      * @param opaque a 64 bit value to return with event notifications.
708      *               A value of -1L is ignored.
709      * @throws IOException
710      * @throws InterruptedException
711      */
712     public void epoll_add(int fd, int poll_events, long opaque)
713             throws IOException, InterruptedException {
714         epoll_op(fd, poll_events, opaque, EPOLL_CTL_ADD());
715     }
716 
717     public void epoll_del(int fd, int poll_events)
718             throws IOException, InterruptedException {
719         epoll_op(fd, poll_events, -1L, EPOLL_CTL_DEL());
720     }
721 
722     public void epoll_mod(int fd, int poll_events, long opaque)
723             throws IOException, InterruptedException {
724         epoll_op(fd, poll_events, opaque, EPOLL_CTL_DEL());
725     }
726 
727     private void epoll_op(int fd, int poll_events, long opaque, int op)
728             throws IOException, InterruptedException {
729         if (this.epollfd == -1) {
730             this.epollfd = initEpoll();
731         }
732 
733         MemorySegment targetfd =
734             arena.allocateFrom(ValueLayout.OfInt.JAVA_INT, fd);
735 
736         Sqe request = new Sqe()
737                 .opcode(IORING_OP_EPOLL_CTL())
738                 .fd(epollfd)
739                 .addr(targetfd)
740                 .xxx_flags(poll_events)
741                 .len(op);
742 
743         if (opaque != -1L) {
744             MemorySegment event = arena.allocate(epoll_event.$LAYOUT());
745             epoll_event.events(event, poll_events);
746             var dataSlice = epoll_event.data(event);
747             epoll_data_t.u64(dataSlice, opaque);
748             request = request.off(event.address());
749         }
750         submit(request);
751     }
752 
753     static MemorySegment getSegmentFor(MemoryLayout layout) {
754         return arena.allocate(layout.byteSize(), layout.byteAlignment())
755                     .fill((byte)0);
756     }
757 
758     static String errorString(int errno) {
759         errno = -errno;
760         return "Error: " + strerror(errno);
761     }
762 
763     // This is obsolete. There is a better way of doing a timed
764     // poll by providing a timeval to io_uring_enter
765     public Sqe getTimeoutSqe(Duration maxwait, int opcode, int completionCount) {
766         MemorySegment seg =
767             arena.allocate(__kernel_timespec.$LAYOUT()).fill((byte)(0));
768 
769         __kernel_timespec.tv_sec(seg, maxwait.getSeconds());
770         __kernel_timespec.tv_nsec(seg, maxwait.getNano());
771         return new Sqe()
772                 .opcode(opcode)
773                 .addr(seg)
774                 .xxx_flags(0)  // timeout_flags
775                 .off(completionCount)
776                 .len(1);
777     }
778 
779     private final static ValueLayout POINTER =
780         ValueLayout.ADDRESS.withTargetLayout(
781             MemoryLayout.sequenceLayout(Long.MAX_VALUE, JAVA_BYTE)
782     );
783 
784     private static final MethodHandle mmap_fn = locateStdHandle(
785         "mmap", FunctionDescriptor.of(
786                 POINTER,
787                 //ValueLayout.JAVA_LONG, // returned address
788                 ValueLayout.JAVA_LONG, // input address, usually zero
789                 ValueLayout.JAVA_LONG, // size_t
790                 ValueLayout.JAVA_INT, // int prot (PROT_READ | PROT_WRITE)
791                 ValueLayout.JAVA_INT, // int flags (MAP_SHARED|MAP_POPULATE)
792                 ValueLayout.JAVA_INT, // int fd
793                 ValueLayout.JAVA_LONG // off_t (64bit?)
794         )
795     );
796 
797     private static final MethodHandle epoll_create_fn = locateStdHandle(
798         "epoll_create", FunctionDescriptor.of(
799                 ValueLayout.JAVA_INT, // returned fd
800                 ValueLayout.JAVA_INT // int size (ignored)
801         ), SystemCallContext.errnoLinkerOption()
802     );
803 
804     private static final MethodHandle close_fn = locateStdHandle(
805         "close",
806         FunctionDescriptor.of(ValueLayout.JAVA_INT, ValueLayout.JAVA_INT),
807         SystemCallContext.errnoLinkerOption()
808     );
809 
810     private static final MethodHandle eventfd_fn = locateStdHandle(
811         "eventfd",
812         FunctionDescriptor.of(
813             ValueLayout.JAVA_INT,
814             ValueLayout.JAVA_INT,
815             ValueLayout.JAVA_INT),
816         SystemCallContext.errnoLinkerOption()
817     );
818 
819     // Linux syscall numbers. Allows to invoke the system call
820     // directly in systems where there are no wrappers
821     // for these functions in libc or liburing.
822     // Also means we no longer use liburing
823 
824     private static final int NR_io_uring_setup = 425;
825     private static final int NR_io_uring_enter = 426;
826     private static final int NR_io_uring_register = 427;
827 
828     private static final MethodHandle setup_fn = locateStdHandle(
829         "syscall", FunctionDescriptor.of(
830                 ValueLayout.JAVA_INT,
831                 ValueLayout.JAVA_INT,
832                 ValueLayout.JAVA_INT,
833                 ValueLayout.ADDRESS),
834         SystemCallContext.errnoLinkerOption()
835     );
836 
837     private static final MethodHandle enter_fn = locateStdHandle(
838         "syscall", FunctionDescriptor.of(ValueLayout.JAVA_INT,
839                 ValueLayout.JAVA_INT,
840                 ValueLayout.JAVA_INT,
841                 ValueLayout.JAVA_INT,
842                 ValueLayout.JAVA_INT,
843                 ValueLayout.JAVA_INT,
844                 ValueLayout.ADDRESS),// sigset_t UNUSED for now
845         SystemCallContext.errnoLinkerOption()
846     );
847 
848     // io_uring_register specifically for
849     // IORING_REGISTER_EVENTFD and IORING_UNREGISTER_EVENTFD
850     private static final MethodHandle evregister_fn = locateStdHandle(
851             "syscall",
852             FunctionDescriptor.of(ValueLayout.JAVA_INT,  // result
853                     ValueLayout.JAVA_INT, // syscall
854                     ValueLayout.JAVA_INT, // ring fd
855                     ValueLayout.JAVA_INT, // opcode
856                     INT_POINTER,          // pointer to fd
857                     ValueLayout.JAVA_INT),// integer value 1
858             SystemCallContext.errnoLinkerOption()
859     );
860 
861     // mmap constants used internally
862     private static final int PROT_READ = 1;
863     private static final int PROT_WRITE = 2;
864     private static final int MAP_SHARED = 1;
865     private static final int MAP_POPULATE = 0x8000;
866 
867     /**
868      * offset (when mapping IOURING segments) must be one of:
869      *      jdk.internal.ffi.generated.iouring.iouring_h.IORING_OFF_SQ_RING()
870      *      jdk.internal.ffi.generated.iouring.iouring_h.IORING_OFF_CQ_RING()
871      *      jdk.internal.ffi.generated.iouring.iouring_h.IORING_OFF_SQES()
872      *
873      * @param size
874      * @param fd
875      * @param offset
876      * @return
877      */
878     private static MemorySegment mmap(long size, int fd, long offset) {
879         MemorySegment seg = null;
880         try {
881             seg = (MemorySegment)mmap_fn
882                     .invokeExact(0L, size,
883                             PROT_READ | PROT_WRITE,
884                             MAP_SHARED | MAP_POPULATE,
885                             fd,
886                             offset
887                     );
888         } catch (Throwable e) {
889             throw new RuntimeException(e);
890         }
891         long addr = seg.address();
892         return seg.reinterpret(size);
893     }
894 
895     int ringFd() {
896         return fd;
897     }
898 }