1 /*
  2  * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package sun.nio.ch;
 26 
 27 import java.lang.foreign.Arena;
 28 import java.lang.foreign.MemorySegment;
 29 import java.lang.foreign.ValueLayout;
 30 import java.lang.ref.Cleaner.Cleanable;
 31 import java.io.IOException;
 32 import java.util.Map;
 33 import java.util.concurrent.*;
 34 import java.util.concurrent.locks.LockSupport;
 35 import java.util.function.BooleanSupplier;
 36 import jdk.internal.ref.CleanerFactory;
 37 import sun.nio.ch.iouring.IOUring;
 38 import sun.nio.ch.iouring.Cqe;
 39 import sun.nio.ch.iouring.Sqe;
 40 import jdk.internal.ffi.generated.iouring.*;
 41 import static jdk.internal.ffi.generated.iouring.iouring_h.*;
 42 
 43 /**
 44  * Poller implementation based io_uring.
 45  *
 46  * @apiNote This implementation is experimental. There are many design choices, esp.
 47  * around buffer management, that have to be explored.
 48  */
 49 
 50 public class IoUringPoller extends Poller {
 51     private static final Arena ARENA = Arena.ofAuto();
 52     private static final long ADDRESS_SIZE = ValueLayout.ADDRESS.byteSize();
 53     private static final int MAX_BUF_SIZE = 16384;
 54 
 55     // submission queue polling enabled if kernel thread idle time > 0 millis
 56     private static final int SQPOLL_IDLE_TIME = Integer.getInteger("jdk.io_uring.sqpoll_idle", 0);
 57 
 58     // submition and completion queue sizes
 59     private static final int DEFAULT_SQ_SIZE = (SQPOLL_IDLE_TIME > 0) ? 64 : 4;
 60     private static final int SQ_SIZE = Integer.getInteger("jdk.io_uring.sqsize", DEFAULT_SQ_SIZE);
 61     private static final int CQ_SIZE = Math.max(SQ_SIZE + 1, 1024);
 62 
 63     // max completion events to consume in a blocking poll and non-blocking subpoll
 64     private static final int MAX_EVENTS_PER_POLL    = 64;
 65     private static final int MAX_EVENTS_PER_SUBPOLL = 8;
 66 
 67     private final int event;
 68     private final IOUring ring;
 69     private final EventFD readyEvent;   // completion events posted to CQ ring
 70     private final EventFD wakeupEvent;  // wakeup event, used for shutdown
 71 
 72     // close action, and cleaner if this is subpoller
 73     private final Runnable closer;
 74     private final Cleanable cleaner;
 75 
 76     // used to coordinate access to submission queue
 77     private final Object submitLock = new Object();
 78 
 79     // maps file descriptor to Thread when cancelling poll
 80     private final Map<Integer, Thread> cancels = new ConcurrentHashMap<>();
 81 
 82     // the map value for in-progress read/write ops
 83     private static class Op {
 84         final Thread thread;
 85         volatile int result;
 86         Op(Thread thread) {
 87             this.thread = thread;
 88         }
 89         Thread thread() {
 90             return thread;
 91         }
 92         int result() {
 93             return result;
 94         }
 95         void setResult(int result) {
 96             this.result = result;
 97         }
 98     }
 99 
100     // maps user data to in-progress read/write ops
101     private final Map<Long, Op> ops;
102 
103     // per poller cache of memory segments used for read/write ops
104     private final BlockingQueue<MemorySegment> memorySegmentCache;
105 
106     IoUringPoller(Poller.Mode mode,
107                   boolean subPoller,
108                   boolean read,
109                   boolean supportIoOps) throws IOException {
110         IOUring ring = new IOUring(SQ_SIZE, CQ_SIZE, 0, 0, 0, SQPOLL_IDLE_TIME);
111         EventFD wakeupEvent = null;
112         EventFD readyEvent = null;
113 
114         if (subPoller) {
115             try {
116                 // event to allow registering with master poller
117                 readyEvent = new EventFD();
118                 ring.register_eventfd(readyEvent.efd());
119 
120                 // wakeup event to allow for shutdown
121                 if (mode == Poller.Mode.POLLER_PER_CARRIER) {
122                     wakeupEvent = new EventFD();
123                     int efd = wakeupEvent.efd();
124                     IOUtil.configureBlocking(efd, false);
125                     submitPollAdd(ring, efd, Net.POLLIN, efd);
126                     enter(ring, 1);
127                 }
128             } catch (Throwable e) {
129                 ring.close();
130                 if (readyEvent != null) readyEvent.close();
131                 if (wakeupEvent != null) wakeupEvent.close();
132                 throw e;
133             }
134         }
135 
136         this.event = (read) ? Net.POLLIN : Net.POLLOUT;
137         this.ring = ring;
138         this.readyEvent = readyEvent;
139         this.wakeupEvent = wakeupEvent;
140 
141         // setup if supporting read/write ops
142         if (supportIoOps) {
143             this.ops = new ConcurrentHashMap<>();
144             this.memorySegmentCache = new LinkedTransferQueue<>();
145         } else {
146             this.ops = null;
147             this.memorySegmentCache = null;
148         }
149 
150         // create action to close io_uring instance, register cleaner if this is a subpoller
151         this.closer = closer(ring, readyEvent, wakeupEvent);
152         if (subPoller) {
153             this.cleaner = CleanerFactory.cleaner().register(this, closer);
154         } else {
155             this.cleaner = null;
156         }
157     }
158 
159     /**
160      * Returns an action to close the io_uring and release other resources.
161      */
162     private static Runnable closer(IOUring ring, EventFD readyEvent, EventFD wakeupEvent) {
163         return () -> {
164             try {
165                 ring.close();
166                 if (readyEvent != null) readyEvent.close();
167                 if (wakeupEvent != null) wakeupEvent.close();
168             } catch (IOException _) { }
169         };
170     }
171 
172     @Override
173     void close() throws IOException {
174         if (cleaner != null) {
175             cleaner.clean();
176         } else {
177             closer.run();
178         }
179     }
180 
181     @Override
182     int fdVal() {
183         if (readyEvent == null) {
184             throw new UnsupportedOperationException();
185         }
186         return readyEvent.efd();
187     }
188 
189     @Override
190     void pollerPolled() throws IOException {
191         readyEvent.reset();
192     }
193 
194     /**
195      * Submits a IORING_OP_POLL_ADD op to poll a file descriptor for read or write.
196      */
197     @Override
198     void implStartPoll(int fd) throws IOException {
199         assert fd > 0;  // fd == 0 used for wakeup
200 
201         synchronized (submitLock) {
202             // fd is the user data for IORING_OP_POLL_ADD request
203             submitPollAdd(ring, fd, event, fd);
204             enter(1);
205         }
206     }
207 
208     /**
209      * Submits a IORING_OP_POLL_REMOVE op, and waits for it complete, to stop polling
210      * a file descriptor. A no-op if already polled.
211      */
212     @Override
213     void implStopPoll(int fd, boolean polled) throws IOException {
214         if (!polled && !isShutdown()) {
215             cancels.put(fd, Thread.currentThread());
216 
217             synchronized (submitLock) {
218                 // TBD if SQPOLL enabled, need IORING_OP_POLL_ADD to be processed
219 
220                 // fd was the user data for IORING_OP_POLL_ADD request
221                 // -fd is the user data for IORING_OP_POLL_REMOVE request
222                 submitPollRemove(fd, -fd);
223                 enter(1);
224             }
225 
226             while (cancels.containsKey(fd) && !isShutdown()) {
227                 LockSupport.park();
228             }
229         }
230     }
231 
232     /**
233      * Uses IORING_OP_READ op to read bytes into a byte array.
234      */
235     @Override
236     int implRead(int fd, byte[] b, int off, int len, long nanos, BooleanSupplier isOpen) throws IOException {
237         // off-heap buffer for read op
238         MemorySegment buf = takeMemorySegment();
239         len = Math.min(len, (int) buf.byteSize());
240 
241         // use the buf address as the user_data
242         long udata = buf.address();
243         var op = new Op(Thread.currentThread());
244         ops.put(udata, op);
245 
246         int res = 0;
247         try {
248             synchronized (submitLock) {
249                 submitRead(fd, buf, len, -1L, udata);
250             }
251             if (isOpen.getAsBoolean() && !isShutdown()) {
252                 if (nanos > 0) {
253                     LockSupport.parkNanos(nanos);
254                 } else {
255                     LockSupport.park();
256                 }
257             }
258         } finally {
259             Op previous = ops.remove(udata);
260             assert previous == op;
261 
262             res = op.result();
263             try {
264                 if (res > 0) {
265                     // copy bytes into the byte array
266                     assert res <= len;
267                     MemorySegment dst = MemorySegment.ofArray(b);
268                     MemorySegment.copy(buf, 0, dst, off, res);
269                 } else if (res < 0) {
270                     // EOF or read failed
271                     if (res != -1) {
272                         throw new IOException("IORING_OP_READ failed errno=" + (-res));
273                     }
274                 } else {
275                     // TBD if SQPOLL enabled, need the request to be processed before cancel
276 
277                     // read did not complete, need to cancel. If the cancel fails then
278                     // we can't return the buffer to the cache.
279                     cancelOp(fd, udata);
280                     res = IOStatus.UNAVAILABLE;
281                 }
282             } finally {
283                 if (res != 0) {
284                     offerMemorySegment(buf);
285                 }
286             }
287         }
288         return res;
289     }
290 
291     /**
292      * Uses IORING_OP_WRITE op to write bytes from a byte array.
293      */
294     @Override
295     int implWrite(int fd, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException {
296         // off-heap buffer for write op
297         MemorySegment buf = takeMemorySegment();
298         len = Math.min(len, (int) buf.byteSize());
299 
300         // copy the bytes from the byte array into the buffer
301         MemorySegment src = MemorySegment.ofArray(b);
302         MemorySegment.copy(src, off, buf, 0, len);
303 
304         // use the buffer address as the user_data
305         long udata = buf.address();
306         var op = new Op(Thread.currentThread());
307         ops.put(udata, op);
308 
309         int res = 0;
310         try {
311             synchronized (submitLock) {
312                 submitWrite(fd, buf, len, -1L, udata);
313             }
314             if (isOpen.getAsBoolean() && !isShutdown()) {
315                 LockSupport.park();
316             }
317         } finally {
318             Op previous = ops.remove(udata);
319             assert previous == op;
320 
321             res = op.result();
322             try {
323                 if (res > 0) {
324                     assert res <= len;
325                 } else if (res < 0) {
326                     throw new IOException("IORING_OP_WRITE failed errno=" + (-res));
327                 } else {
328                     // TBD if SQPOLL enabled, need the request to be processed before cancel
329 
330                     // write did not complete, need to cancel. If the cancel fails then
331                     // we can't return the buffer to the cache.
332                     cancelOp(fd, udata);
333                     res = IOStatus.UNAVAILABLE;
334                 }
335             } finally {
336                 if (res != 0) {
337                     offerMemorySegment(buf);
338                 }
339             }
340         }
341         return res;
342     }
343 
344     @Override
345     void wakeupPoller() throws IOException {
346         if (wakeupEvent == null) {
347             throw new UnsupportedOperationException();
348         }
349 
350         // causes subpoller to wakeup
351         wakeupEvent.set();
352     }
353 
354     @Override
355     int poll(int timeout) throws IOException {
356         if (timeout > 0) {
357             // timed polls not supported by this Poller
358             throw new UnsupportedOperationException();
359         }
360         boolean block = (timeout == -1);
361         int max = block ? MAX_EVENTS_PER_POLL : MAX_EVENTS_PER_SUBPOLL;
362         int polled = tryPoll(max);
363         if (polled > 0 || !block) {
364             return polled;
365         } else {
366             int ret = ring.enter(0, 1, 0);  // wait for at least one completion
367             if (ret < 0) {
368                 throw new IOException("io_uring_enter failed, " +
369                     IOUring.getError(ret));
370             }
371             return tryPoll(max);
372         }
373     }
374 
375     /**
376      * Poll or handle completions up to the given max without blocking. This method also
377      * handles the completion of any cancelled operations.
378      * @retutn the number of sockets polled and I/O operations completed
379      */
380     private int tryPoll(int max) {
381         int polled = 0;
382         Cqe cqe;
383         while (polled < max && ((cqe = ring.pollCompletion()) != null)) {
384             long udata = cqe.user_data();
385 
386             // handle read/write ops
387             if (ops != null) {
388                 Op op = ops.get(udata);
389                 if (op != null) {
390                     int res = cqe.res();
391                     op.setResult((res != 0) ? res : -1);   // map 0 to -1 at EOF
392                     LockSupport.unpark(op.thread());
393                     polled++;
394                     continue;
395                 }
396             }
397 
398             // handle poll and cancls ops, user data is fd or -fd
399             int fd = (int) udata;
400             if (fd > 0 && (wakeupEvent == null || fd != wakeupEvent.efd())) {
401                 // poll done
402                 polled(fd);
403                 polled++;
404             } else if (fd < 0) {
405                 // cancel done
406                 Thread t = cancels.remove(-fd);
407                 if (t != null) {
408                     LockSupport.unpark(t);
409                 }
410             }
411         }
412         return polled;
413     }
414 
415     /**
416      * Invoke io_uring_enter to submit the SQE entries
417      */
418     private static void enter(IOUring ring, int n) throws IOException {
419         if (SQPOLL_IDLE_TIME > 0) {
420             ring.pollingEnter();
421         } else {
422             int ret = ring.enter(n, 0, 0);
423             if (ret < 0) {
424                 throw new IOException("io_uring_enter failed, " +
425                     IOUring.getError(ret));
426             }
427             assert ret == n;
428         }
429     }
430 
431     private void enter(int n) throws IOException {
432         enter(ring, n);
433     }
434 
435     /**
436      * Submit IORING_OP_POLL_ADD operation.
437      */
438     private static void submitPollAdd(IOUring ring,
439                                       int fd,
440                                       int events,
441                                       long udata) throws IOException {
442         Sqe sqe = new Sqe()
443                 .opcode(IORING_OP_POLL_ADD())
444                 .fd(fd)
445                 .user_data(udata)
446                 .poll_events(events);
447         ring.submit(sqe);
448     }
449 
450     private void submitPollAdd(int fd, int events, long udata) throws IOException {
451         submitPollAdd(ring, fd, events, udata);
452     }
453 
454     /**
455      * Submit IORING_OP_POLL_REMOVE operation.
456      * @param req_udata the user data to identify the original POLL_ADD
457      * @param udata the user data for the POLL_REMOVE op
458      */
459     private void submitPollRemove(long req_udata, long udata) throws IOException {
460         @SuppressWarnings("restricted")
461         MemorySegment address = MemorySegment.ofAddress(req_udata).reinterpret(ADDRESS_SIZE);
462         Sqe sqe = new Sqe()
463                 .opcode(IORING_OP_POLL_REMOVE())
464                 .addr(address)
465                 .user_data(udata);
466         ring.submit(sqe);
467     }
468 
469     /**
470      * Submit IORING_OP_READ operation.
471      */
472     private void submitRead(int fd, MemorySegment buf, int len, long pos, long udata) throws IOException {
473         Sqe sqe = new Sqe()
474                 .opcode(IORING_OP_READ())
475                 .fd(fd)
476                 .addr(buf)
477                 .len(len)
478                 .off(pos)  // file position or -1L
479                 .user_data(udata);
480         ring.submit(sqe);
481         enter(1);
482     }
483 
484     /**
485      * Submit IORING_OP_WRITE operation.
486      */
487     private void submitWrite(int fd, MemorySegment buf, int len, long pos, long udata) throws IOException {
488         Sqe sqe = new Sqe()
489                 .opcode(IORING_OP_WRITE())
490                 .fd(fd)
491                 .addr(buf)
492                 .len(len)
493                 .off(pos)  // file position or -1L
494                 .user_data(udata);
495         ring.submit(sqe);
496         enter(1);
497     }
498 
499     /**
500      * Cancels an operation submitted with the given user_data.
501      */
502     private void cancelOp(int fd, long req_udata) throws IOException {
503         @SuppressWarnings("restricted")
504         MemorySegment address = MemorySegment.ofAddress(req_udata).reinterpret(ADDRESS_SIZE);
505         cancels.put(fd, Thread.currentThread());
506         synchronized (submitLock) {
507             Sqe sqe = new Sqe()
508                     .opcode(IORING_OP_ASYNC_CANCEL())
509                     .addr(address)
510                     .user_data(-fd);   // user data for IORING_OP_ASYNC_CANCEL
511             ring.submit(sqe);
512             enter(1);
513         }
514         while (cancels.containsKey(fd) && !isShutdown()) {
515             LockSupport.park();
516         }
517     }
518 
519     private MemorySegment takeMemorySegment() {
520         MemorySegment buf = memorySegmentCache.poll();
521         if (buf == null) {
522             buf = ARENA.allocate(MAX_BUF_SIZE);
523         }
524         return buf;
525     }
526 
527     private void offerMemorySegment(MemorySegment buf) {
528         memorySegmentCache.offer(buf);
529     }
530 }