1 /*
  2  * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package sun.nio.ch;
 26 
 27 import java.lang.foreign.MemorySegment;
 28 import java.lang.foreign.ValueLayout;
 29 import java.lang.ref.Cleaner.Cleanable;
 30 import java.io.IOException;
 31 import java.util.Map;
 32 import java.util.concurrent.ConcurrentHashMap;
 33 import java.util.concurrent.locks.LockSupport;
 34 import jdk.internal.ref.CleanerFactory;
 35 import sun.nio.ch.iouring.IOUringImpl;
 36 import sun.nio.ch.iouring.Cqe;
 37 import sun.nio.ch.iouring.Sqe;
 38 import static jdk.internal.ffi.generated.iouring.iouring_h.IORING_OP_POLL_ADD;
 39 import static jdk.internal.ffi.generated.iouring.iouring_h.IORING_OP_POLL_REMOVE;
 40 
 41 /**
 42  * Poller implementation based io_uring.
 43  */
 44 
 45 public class IoUringPoller extends Poller {
 46     private static final long ADDRESS_SIZE = ValueLayout.ADDRESS.byteSize();
 47 
 48     // submition and completion queue sizes
 49     private static final int SQ_SIZE = 8;
 50     private static final int CQ_SIZE = Math.max(SQ_SIZE + 1, 1024);
 51 
 52     // max completion events to consume in a blocking poll and non-blocking subpoll
 53     private static final int MAX_EVENTS_PER_POLL    = 64;
 54     private static final int MAX_EVENTS_PER_SUBPOLL = 8;
 55 
 56     private final int event;
 57     private final IOUringImpl ring;
 58     private final EventFD readyEvent;   // completion events posted to CQ ring
 59     private final EventFD wakeupEvent;  // wakeup event, used for shutdown
 60 
 61     // close action, and cleaner if this is subpoller
 62     private final Runnable closer;
 63     private final Cleanable cleaner;
 64 
 65     // used to coordinate access to submission queue
 66     private final Object submitLock = new Object();
 67 
 68     // maps file descriptor to Thread when cancelling poll
 69     private final Map<Integer, Thread> cancels = new ConcurrentHashMap<>();
 70 
 71     IoUringPoller(boolean subPoller, boolean read) throws IOException {
 72         IOUringImpl ring = null;
 73         EventFD wakeupEvent = null;
 74         EventFD readyEvent = null;
 75         try {
 76             ring = new IOUringImpl(SQ_SIZE, CQ_SIZE);
 77 
 78             // need event to register with master poller
 79             if (subPoller) {
 80                 readyEvent = new EventFD();
 81                 ring.register_eventfd(readyEvent.efd());
 82             }
 83 
 84             // register event with io_uring to allow for wakeup
 85             wakeupEvent = new EventFD();
 86             int efd = wakeupEvent.efd();
 87             IOUtil.configureBlocking(efd, false);
 88             submitPollAdd(ring, efd, Net.POLLIN, efd);
 89         } catch (Throwable e) {
 90             if (ring != null) ring.close();
 91             if (readyEvent != null) readyEvent.close();
 92             if (wakeupEvent != null) wakeupEvent.close();
 93             throw e;
 94         }
 95 
 96         this.event = (read) ? Net.POLLIN : Net.POLLOUT;
 97         this.ring = ring;
 98         this.readyEvent = readyEvent;
 99         this.wakeupEvent = wakeupEvent;
100 
101         // create action to io_uring instance, register cleaner if this is a subpoller
102         this.closer = closer(ring, readyEvent, wakeupEvent);
103         if (subPoller) {
104             this.cleaner = CleanerFactory.cleaner().register(this, closer);
105         } else {
106             this.cleaner = null;
107         }
108     }
109 
110     /**
111      * Returns an action to close the io_uring and release other resources.
112      */
113     private static Runnable closer(IOUringImpl ring, EventFD readyEvent, EventFD wakeupEvent) {
114         return () -> {
115             try {
116                 ring.close();
117                 if (readyEvent != null) readyEvent.close();
118                 wakeupEvent.close();
119             } catch (IOException _) { }
120         };
121     }
122 
123     @Override
124     void close() throws IOException {
125         if (cleaner != null) {
126             cleaner.clean();
127         } else {
128             closer.run();
129         }
130     }
131 
132     @Override
133     int fdVal() {
134         if (readyEvent == null) {
135             throw new UnsupportedOperationException();
136         }
137         return readyEvent.efd();
138     }
139 
140     @Override
141     void pollerPolled() throws IOException {
142         readyEvent.reset();
143     }
144 
145     @Override
146     void implRegister(int fd) throws IOException {
147         assert fd != 0;
148         synchronized (submitLock) {
149             // fd is the user data for IORING_OP_POLL_ADD request
150             submitPollAdd(ring, fd, event, fd);
151         }
152     }
153 
154     @Override
155     void implDeregister(int fd, boolean polled) throws IOException {
156         if (!polled && !isShutdown()) {
157             cancels.put(fd, Thread.currentThread());
158             synchronized (submitLock) {
159                 // fd was the user data for IORING_OP_POLL_ADD request
160                 // -fd is the user data for IORING_OP_POLL_REMOVE request
161                 submitPollRemove(ring, fd, -fd);
162             }
163             while (cancels.containsKey(fd) && !isShutdown()) {
164                 LockSupport.park();
165             }
166         }
167     }
168 
169     @Override
170     void wakeupPoller() throws IOException {
171         wakeupEvent.set();
172     }
173 
174     @Override
175     int poll(int timeout) throws IOException {
176         if (timeout > 0) {
177             throw new UnsupportedOperationException();
178         }
179         boolean block = (timeout == -1);
180         int max = block ? MAX_EVENTS_PER_POLL : MAX_EVENTS_PER_SUBPOLL;
181         int polled = tryPoll(max);
182         if (polled > 0 || !block) {
183             return polled;
184         } else {
185             ring.enter(0, 1, 0);  // wait for at least one completion
186             return tryPoll(max);
187         }
188     }
189 
190     /**
191      * Poll up to max sockets without blocking. Also handles the completion of any
192      * POLL_REMOVE operations.
193      * @retutn the number of sockets polled
194      */
195     private int tryPoll(int max) {
196         int polled = 0;
197         Cqe cqe;
198         while (polled < max && ((cqe = ring.pollCompletion()) != null)) {
199             // user data is fd or -fd
200             int fd = (int) cqe.user_data();
201             if (fd > 0 && fd != wakeupEvent.efd()) {
202                 // poll done
203                 polled(fd);
204                 polled++;
205             } else if (fd < 0) {
206                 // cancel done
207                 Thread t = cancels.remove(-fd);
208                 if (t != null) {
209                     LockSupport.unpark(t);
210                 }
211             }
212         }
213         return polled;
214     }
215 
216     /**
217      * Submit IORING_OP_POLL_ADD operation.
218      */
219     private static void submitPollAdd(IOUringImpl ring,
220                                       int fd,
221                                       int events,
222                                       long udata) throws IOException {
223         Sqe sqe = new Sqe()
224                 .opcode(IORING_OP_POLL_ADD())
225                 .fd(fd)
226                 .user_data(udata)
227                 .poll_events(events);
228         ring.submit(sqe);
229         int ret = ring.enter(1, 0, 0);  // submit 1
230         if (ret < 1) {
231             throw new IOException("io_uring_enter failed, ret=" + ret);
232         }
233     }
234 
235     /**
236      * Submit IORING_OP_POLL_REMOVE operation.
237      * @param req_udata the user data to identify the original POLL_ADD
238      * @param udata the user data for the POLL_REMOVE op
239      */
240     private static void submitPollRemove(IOUringImpl ring,
241                                          long req_udata,
242                                          long udata) throws IOException {
243         @SuppressWarnings("restricted")
244         MemorySegment address = MemorySegment.ofAddress(req_udata).reinterpret(ADDRESS_SIZE);
245         Sqe sqe = new Sqe()
246                 .opcode(IORING_OP_POLL_REMOVE())
247                 .addr(address)
248                 .user_data(udata);
249         ring.submit(sqe);
250         int ret = ring.enter(1, 0, 0);  // submit 1
251         if (ret < 1) {
252             throw new IOException("io_uring_enter failed, ret=" + ret);
253         }
254     }
255 }