1 /* 2 * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.nio.ch; 26 27 import java.lang.foreign.MemorySegment; 28 import java.lang.foreign.ValueLayout; 29 import java.lang.ref.Cleaner.Cleanable; 30 import java.io.IOException; 31 import java.util.Map; 32 import java.util.concurrent.*; 33 import java.util.concurrent.locks.LockSupport; 34 import jdk.internal.ref.CleanerFactory; 35 import sun.nio.ch.iouring.IOUringImpl; 36 import sun.nio.ch.iouring.Cqe; 37 import sun.nio.ch.iouring.Sqe; 38 import static jdk.internal.ffi.generated.iouring.iouring_h.IORING_OP_POLL_ADD; 39 import static jdk.internal.ffi.generated.iouring.iouring_h.IORING_OP_POLL_REMOVE; 40 41 /** 42 * Poller implementation based io_uring. 43 */ 44 45 public class IoUringPoller extends Poller { 46 private static final long ADDRESS_SIZE = ValueLayout.ADDRESS.byteSize(); 47 48 // submition and completion queue sizes 49 private static final int SQ_SIZE = 16; 50 private static final int CQ_SIZE = Math.max(SQ_SIZE + 1, 1024); 51 52 // max completion events to consume in a blocking poll and non-blocking subpoll 53 private static final int MAX_EVENTS_PER_POLL = 64; 54 private static final int MAX_EVENTS_PER_SUBPOLL = 8; 55 56 private final int event; 57 private final IOUringImpl ring; 58 private final EventFD readyEvent; // completion events posted to CQ ring 59 private final EventFD wakeupEvent; // wakeup event, used for shutdown 60 61 // close action, and cleaner if this is subpoller 62 private final Runnable closer; 63 private final Cleanable cleaner; 64 65 // used to coordinate access to submission queue 66 private final Object submitLock = new Object(); 67 68 // maps file descriptor to Thread when cancelling poll 69 private final Map<Integer, Thread> cancels = new ConcurrentHashMap<>(); 70 71 IoUringPoller(Poller.Mode mode, boolean subPoller, boolean read) throws IOException { 72 IOUringImpl ring = new IOUringImpl(SQ_SIZE, CQ_SIZE, 0); 73 EventFD wakeupEvent = null; 74 EventFD readyEvent = null; 75 76 if (subPoller) { 77 try { 78 // event to allow registering with master poller 79 readyEvent = new EventFD(); 80 ring.register_eventfd(readyEvent.efd()); 81 82 // wakeup event to allow for shutdown 83 if (mode == Poller.Mode.POLLER_PER_CARRIER) { 84 wakeupEvent = new EventFD(); 85 int efd = wakeupEvent.efd(); 86 IOUtil.configureBlocking(efd, false); 87 submitPollAdd(ring, efd, Net.POLLIN, efd); 88 enter(ring, 1); 89 } 90 } catch (Throwable e) { 91 ring.close(); 92 if (readyEvent != null) readyEvent.close(); 93 if (wakeupEvent != null) wakeupEvent.close(); 94 throw e; 95 } 96 } 97 98 this.event = (read) ? Net.POLLIN : Net.POLLOUT; 99 this.ring = ring; 100 this.readyEvent = readyEvent; 101 this.wakeupEvent = wakeupEvent; 102 103 // create action to close io_uring instance, register cleaner if this is a subpoller 104 this.closer = closer(ring, readyEvent, wakeupEvent); 105 if (subPoller) { 106 this.cleaner = CleanerFactory.cleaner().register(this, closer); 107 } else { 108 this.cleaner = null; 109 } 110 } 111 112 /** 113 * Returns an action to close the io_uring and release other resources. 114 */ 115 private static Runnable closer(IOUringImpl ring, EventFD readyEvent, EventFD wakeupEvent) { 116 return () -> { 117 try { 118 ring.close(); 119 if (readyEvent != null) readyEvent.close(); 120 if (wakeupEvent != null) wakeupEvent.close(); 121 } catch (IOException _) { } 122 }; 123 } 124 125 @Override 126 void close() throws IOException { 127 if (cleaner != null) { 128 cleaner.clean(); 129 } else { 130 closer.run(); 131 } 132 } 133 134 @Override 135 int fdVal() { 136 if (readyEvent == null) { 137 throw new UnsupportedOperationException(); 138 } 139 return readyEvent.efd(); 140 } 141 142 @Override 143 void pollerPolled() throws IOException { 144 readyEvent.reset(); 145 } 146 147 @Override 148 void implRegister(int fd) throws IOException { 149 assert fd > 0; // fd == 0 used for wakeup 150 151 synchronized (submitLock) { 152 // fd is the user data for IORING_OP_POLL_ADD request 153 submitPollAdd(ring, fd, event, fd); 154 enter(ring, 1); 155 } 156 } 157 158 @Override 159 void implDeregister(int fd, boolean polled) throws IOException { 160 if (!polled && !isShutdown()) { 161 cancels.put(fd, Thread.currentThread()); 162 163 synchronized (submitLock) { 164 // fd was the user data for IORING_OP_POLL_ADD request 165 // -fd is the user data for IORING_OP_POLL_REMOVE request 166 submitPollRemove(ring, fd, -fd); 167 enter(ring, 1); 168 } 169 170 while (cancels.containsKey(fd) && !isShutdown()) { 171 LockSupport.park(); 172 } 173 } 174 } 175 176 @Override 177 void wakeupPoller() throws IOException { 178 if (wakeupEvent == null) { 179 throw new UnsupportedOperationException(); 180 } 181 182 // causes subpoller to wakeup 183 wakeupEvent.set(); 184 } 185 186 @Override 187 int poll(int timeout) throws IOException { 188 if (timeout > 0) { 189 // timed polls not supported by this Poller 190 throw new UnsupportedOperationException(); 191 } 192 boolean block = (timeout == -1); 193 int max = block ? MAX_EVENTS_PER_POLL : MAX_EVENTS_PER_SUBPOLL; 194 int polled = tryPoll(max); 195 if (polled > 0 || !block) { 196 return polled; 197 } else { 198 int ret = ring.enter(0, 1, 0); // wait for at least one completion 199 if (ret < 0) { 200 throw new IOException("io_uring_enter failed, ret=" + ret); 201 } 202 return tryPoll(max); 203 } 204 } 205 206 /** 207 * Poll up to max sockets without blocking. Also handles the completion of any 208 * POLL_REMOVE operations. 209 * @retutn the number of sockets polled 210 */ 211 private int tryPoll(int max) { 212 int polled = 0; 213 Cqe cqe; 214 while (polled < max && ((cqe = ring.pollCompletion()) != null)) { 215 // user data is fd or -fd 216 int fd = (int) cqe.user_data(); 217 if (fd > 0 && (wakeupEvent == null || fd != wakeupEvent.efd())) { 218 // poll done 219 polled(fd); 220 polled++; 221 } else if (fd < 0) { 222 // cancel done 223 Thread t = cancels.remove(-fd); 224 if (t != null) { 225 LockSupport.unpark(t); 226 } 227 } 228 } 229 return polled; 230 } 231 232 /** 233 * Invoke io_uring_enter to submit the SQE entries 234 */ 235 private static void enter(IOUringImpl ring, int n) throws IOException { 236 int ret = ring.enter(n, 0, 0); 237 if (ret < 0) { 238 throw new IOException("io_uring_enter failed, ret=" + ret); 239 } 240 assert ret == n; 241 } 242 243 /** 244 * Submit IORING_OP_POLL_ADD operation. 245 */ 246 private static void submitPollAdd(IOUringImpl ring, 247 int fd, 248 int events, 249 long udata) throws IOException { 250 Sqe sqe = new Sqe() 251 .opcode(IORING_OP_POLL_ADD()) 252 .fd(fd) 253 .user_data(udata) 254 .poll_events(events); 255 ring.submit(sqe); 256 } 257 258 /** 259 * Submit IORING_OP_POLL_REMOVE operation. 260 * @param req_udata the user data to identify the original POLL_ADD 261 * @param udata the user data for the POLL_REMOVE op 262 */ 263 private static void submitPollRemove(IOUringImpl ring, 264 long req_udata, 265 long udata) throws IOException { 266 @SuppressWarnings("restricted") 267 MemorySegment address = MemorySegment.ofAddress(req_udata).reinterpret(ADDRESS_SIZE); 268 Sqe sqe = new Sqe() 269 .opcode(IORING_OP_POLL_REMOVE()) 270 .addr(address) 271 .user_data(udata); 272 ring.submit(sqe); 273 } 274 }