1 /*
  2  * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package sun.nio.ch;
 26 
 27 import java.lang.foreign.MemorySegment;
 28 import java.lang.foreign.ValueLayout;
 29 import java.lang.ref.Cleaner.Cleanable;
 30 import java.io.IOException;
 31 import java.util.Map;
 32 import java.util.concurrent.*;
 33 import java.util.concurrent.locks.LockSupport;
 34 import jdk.internal.ref.CleanerFactory;
 35 import sun.nio.ch.iouring.IOUringImpl;
 36 import sun.nio.ch.iouring.Cqe;
 37 import sun.nio.ch.iouring.Sqe;
 38 import static jdk.internal.ffi.generated.iouring.iouring_h.IORING_OP_POLL_ADD;
 39 import static jdk.internal.ffi.generated.iouring.iouring_h.IORING_OP_POLL_REMOVE;
 40 
 41 /**
 42  * Poller implementation based io_uring.
 43  */
 44 
 45 public class IoUringPoller extends Poller {
 46     private static final long ADDRESS_SIZE = ValueLayout.ADDRESS.byteSize();
 47 
 48     // submition and completion queue sizes
 49     private static final int SQ_SIZE = 16;
 50     private static final int CQ_SIZE = Math.max(SQ_SIZE + 1, 1024);
 51 
 52     // max completion events to consume in a blocking poll and non-blocking subpoll
 53     private static final int MAX_EVENTS_PER_POLL    = 64;
 54     private static final int MAX_EVENTS_PER_SUBPOLL = 8;
 55 
 56     private final int event;
 57     private final IOUringImpl ring;
 58     private final EventFD readyEvent;   // completion events posted to CQ ring
 59     private final EventFD wakeupEvent;  // wakeup event, used for shutdown
 60 
 61     // close action, and cleaner if this is subpoller
 62     private final Runnable closer;
 63     private final Cleanable cleaner;
 64 
 65     // used to coordinate access to submission queue
 66     private final Object submitLock = new Object();
 67 
 68     // maps file descriptor to Thread when cancelling poll
 69     private final Map<Integer, Thread> cancels = new ConcurrentHashMap<>();
 70 
 71     IoUringPoller(Poller.Mode mode, boolean subPoller, boolean read) throws IOException {
 72         IOUringImpl ring = new IOUringImpl(SQ_SIZE, CQ_SIZE, 0);
 73         EventFD wakeupEvent = null;
 74         EventFD readyEvent = null;
 75 
 76         if (subPoller) {
 77             try {
 78                 // event to allow registering with master poller
 79                 readyEvent = new EventFD();
 80                 ring.register_eventfd(readyEvent.efd());
 81 
 82                 // wakeup event to allow for shutdown
 83                 if (mode == Poller.Mode.POLLER_PER_CARRIER) {
 84                     wakeupEvent = new EventFD();
 85                     int efd = wakeupEvent.efd();
 86                     IOUtil.configureBlocking(efd, false);
 87                     submitPollAdd(ring, efd, Net.POLLIN, efd);
 88                     enter(ring, 1);
 89                 }
 90             } catch (Throwable e) {
 91                 ring.close();
 92                 if (readyEvent != null) readyEvent.close();
 93                 if (wakeupEvent != null) wakeupEvent.close();
 94                 throw e;
 95             }
 96         }
 97 
 98         this.event = (read) ? Net.POLLIN : Net.POLLOUT;
 99         this.ring = ring;
100         this.readyEvent = readyEvent;
101         this.wakeupEvent = wakeupEvent;
102 
103         // create action to close io_uring instance, register cleaner if this is a subpoller
104         this.closer = closer(ring, readyEvent, wakeupEvent);
105         if (subPoller) {
106             this.cleaner = CleanerFactory.cleaner().register(this, closer);
107         } else {
108             this.cleaner = null;
109         }
110     }
111 
112     /**
113      * Returns an action to close the io_uring and release other resources.
114      */
115     private static Runnable closer(IOUringImpl ring, EventFD readyEvent, EventFD wakeupEvent) {
116         return () -> {
117             try {
118                 ring.close();
119                 if (readyEvent != null) readyEvent.close();
120                 if (wakeupEvent != null) wakeupEvent.close();
121             } catch (IOException _) { }
122         };
123     }
124 
125     @Override
126     void close() throws IOException {
127         if (cleaner != null) {
128             cleaner.clean();
129         } else {
130             closer.run();
131         }
132     }
133 
134     @Override
135     int fdVal() {
136         if (readyEvent == null) {
137             throw new UnsupportedOperationException();
138         }
139         return readyEvent.efd();
140     }
141 
142     @Override
143     void pollerPolled() throws IOException {
144         readyEvent.reset();
145     }
146 
147     @Override
148     void implRegister(int fd) throws IOException {
149         assert fd > 0;  // fd == 0 used for wakeup
150 
151         synchronized (submitLock) {
152             // fd is the user data for IORING_OP_POLL_ADD request
153             submitPollAdd(ring, fd, event, fd);
154             enter(ring, 1);
155         }
156     }
157 
158     @Override
159     void implDeregister(int fd, boolean polled) throws IOException {
160         if (!polled && !isShutdown()) {
161             cancels.put(fd, Thread.currentThread());
162 
163             synchronized (submitLock) {
164                 // fd was the user data for IORING_OP_POLL_ADD request
165                 // -fd is the user data for IORING_OP_POLL_REMOVE request
166                 submitPollRemove(ring, fd, -fd);
167                 enter(ring, 1);
168             }
169 
170             while (cancels.containsKey(fd) && !isShutdown()) {
171                 LockSupport.park();
172             }
173         }
174     }
175 
176     @Override
177     void wakeupPoller() throws IOException {
178         if (wakeupEvent == null) {
179             throw new UnsupportedOperationException();
180         }
181 
182         // causes subpoller to wakeup
183         wakeupEvent.set();
184     }
185 
186     @Override
187     int poll(int timeout) throws IOException {
188         if (timeout > 0) {
189             // timed polls not supported by this Poller
190             throw new UnsupportedOperationException();
191         }
192         boolean block = (timeout == -1);
193         int max = block ? MAX_EVENTS_PER_POLL : MAX_EVENTS_PER_SUBPOLL;
194         int polled = tryPoll(max);
195         if (polled > 0 || !block) {
196             return polled;
197         } else {
198             int ret = ring.enter(0, 1, 0);  // wait for at least one completion
199             if (ret < 0) {
200                 throw new IOException("io_uring_enter failed, ret=" + ret);
201             }
202             return tryPoll(max);
203         }
204     }
205 
206     /**
207      * Poll up to max sockets without blocking. Also handles the completion of any
208      * POLL_REMOVE operations.
209      * @retutn the number of sockets polled
210      */
211     private int tryPoll(int max) {
212         int polled = 0;
213         Cqe cqe;
214         while (polled < max && ((cqe = ring.pollCompletion()) != null)) {
215             // user data is fd or -fd
216             int fd = (int) cqe.user_data();
217             if (fd > 0 && (wakeupEvent == null || fd != wakeupEvent.efd())) {
218                 // poll done
219                 polled(fd);
220                 polled++;
221             } else if (fd < 0) {
222                 // cancel done
223                 Thread t = cancels.remove(-fd);
224                 if (t != null) {
225                     LockSupport.unpark(t);
226                 }
227             }
228         }
229         return polled;
230     }
231 
232     /**
233      * Invoke io_uring_enter to submit the SQE entries
234      */
235     private static void enter(IOUringImpl ring, int n) throws IOException {
236         int ret = ring.enter(n, 0, 0);
237         if (ret < 0) {
238             throw new IOException("io_uring_enter failed, ret=" + ret);
239         }
240         assert ret == n;
241     }
242 
243     /**
244      * Submit IORING_OP_POLL_ADD operation.
245      */
246     private static void submitPollAdd(IOUringImpl ring,
247                                       int fd,
248                                       int events,
249                                       long udata) throws IOException {
250         Sqe sqe = new Sqe()
251                 .opcode(IORING_OP_POLL_ADD())
252                 .fd(fd)
253                 .user_data(udata)
254                 .poll_events(events);
255         ring.submit(sqe);
256     }
257 
258     /**
259      * Submit IORING_OP_POLL_REMOVE operation.
260      * @param req_udata the user data to identify the original POLL_ADD
261      * @param udata the user data for the POLL_REMOVE op
262      */
263     private static void submitPollRemove(IOUringImpl ring,
264                                          long req_udata,
265                                          long udata) throws IOException {
266         @SuppressWarnings("restricted")
267         MemorySegment address = MemorySegment.ofAddress(req_udata).reinterpret(ADDRESS_SIZE);
268         Sqe sqe = new Sqe()
269                 .opcode(IORING_OP_POLL_REMOVE())
270                 .addr(address)
271                 .user_data(udata);
272         ring.submit(sqe);
273     }
274 }