1 /* 2 * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.nio.ch; 26 27 import java.lang.foreign.MemorySegment; 28 import java.lang.foreign.ValueLayout; 29 import java.lang.ref.Cleaner.Cleanable; 30 import java.io.IOException; 31 import java.util.Map; 32 import java.util.concurrent.ConcurrentHashMap; 33 import java.util.concurrent.locks.LockSupport; 34 import jdk.internal.ref.CleanerFactory; 35 import sun.nio.ch.iouring.IOUringImpl; 36 import sun.nio.ch.iouring.Cqe; 37 import sun.nio.ch.iouring.Sqe; 38 import static jdk.internal.ffi.generated.iouring.iouring_h.IORING_OP_POLL_ADD; 39 import static jdk.internal.ffi.generated.iouring.iouring_h.IORING_OP_POLL_REMOVE; 40 41 /** 42 * Poller implementation based io_uring. 43 */ 44 45 public class IoUringPoller extends Poller { 46 private static final long ADDRESS_SIZE = ValueLayout.ADDRESS.byteSize(); 47 48 // submition and completion queue sizes 49 private static final int SQ_SIZE = 8; 50 private static final int CQ_SIZE = Math.max(SQ_SIZE + 1, 1024); 51 52 // max completion events to consume in a blocking poll and non-blocking subpoll 53 private static final int MAX_EVENTS_PER_POLL = 64; 54 private static final int MAX_EVENTS_PER_SUBPOLL = 8; 55 56 private final int event; 57 private final IOUringImpl ring; 58 private final EventFD readyEvent; // completion events posted to CQ ring 59 private final EventFD wakeupEvent; // wakeup event, used for shutdown 60 61 // close action, and cleaner if this is subpoller 62 private final Runnable closer; 63 private final Cleanable cleaner; 64 65 // used to coordinate access to submission queue 66 private final Object submitLock = new Object(); 67 68 // maps file descriptor to Thread when cancelling poll 69 private final Map<Integer, Thread> cancels = new ConcurrentHashMap<>(); 70 71 IoUringPoller(boolean subPoller, boolean read) throws IOException { 72 IOUringImpl ring = null; 73 EventFD wakeupEvent = null; 74 EventFD readyEvent = null; 75 try { 76 ring = new IOUringImpl(SQ_SIZE, CQ_SIZE); 77 78 // need event to register with master poller 79 if (subPoller) { 80 readyEvent = new EventFD(); 81 ring.register_eventfd(readyEvent.efd()); 82 } 83 84 // register event with io_uring to allow for wakeup 85 wakeupEvent = new EventFD(); 86 int efd = wakeupEvent.efd(); 87 IOUtil.configureBlocking(efd, false); 88 submitPollAdd(ring, efd, Net.POLLIN, efd); 89 } catch (Throwable e) { 90 if (ring != null) ring.close(); 91 if (readyEvent != null) readyEvent.close(); 92 if (wakeupEvent != null) wakeupEvent.close(); 93 throw e; 94 } 95 96 this.event = (read) ? Net.POLLIN : Net.POLLOUT; 97 this.ring = ring; 98 this.readyEvent = readyEvent; 99 this.wakeupEvent = wakeupEvent; 100 101 // create action to io_uring instance, register cleaner if this is a subpoller 102 this.closer = closer(ring, readyEvent, wakeupEvent); 103 if (subPoller) { 104 this.cleaner = CleanerFactory.cleaner().register(this, closer); 105 } else { 106 this.cleaner = null; 107 } 108 } 109 110 /** 111 * Returns an action to close the io_uring and release other resources. 112 */ 113 private static Runnable closer(IOUringImpl ring, EventFD readyEvent, EventFD wakeupEvent) { 114 return () -> { 115 try { 116 ring.close(); 117 if (readyEvent != null) readyEvent.close(); 118 wakeupEvent.close(); 119 } catch (IOException _) { } 120 }; 121 } 122 123 @Override 124 void close() throws IOException { 125 if (cleaner != null) { 126 cleaner.clean(); 127 } else { 128 closer.run(); 129 } 130 } 131 132 @Override 133 int fdVal() { 134 if (readyEvent == null) { 135 throw new UnsupportedOperationException(); 136 } 137 return readyEvent.efd(); 138 } 139 140 @Override 141 void pollerPolled() throws IOException { 142 readyEvent.reset(); 143 } 144 145 @Override 146 void implRegister(int fd) throws IOException { 147 assert fd != 0; 148 synchronized (submitLock) { 149 // fd is the user data for IORING_OP_POLL_ADD request 150 submitPollAdd(ring, fd, event, fd); 151 } 152 } 153 154 @Override 155 void implDeregister(int fd, boolean polled) throws IOException { 156 if (!polled && !isShutdown()) { 157 cancels.put(fd, Thread.currentThread()); 158 synchronized (submitLock) { 159 // fd was the user data for IORING_OP_POLL_ADD request 160 // -fd is the user data for IORING_OP_POLL_REMOVE request 161 submitPollRemove(ring, fd, -fd); 162 } 163 while (cancels.containsKey(fd) && !isShutdown()) { 164 LockSupport.park(); 165 } 166 } 167 } 168 169 @Override 170 void wakeupPoller() throws IOException { 171 wakeupEvent.set(); 172 } 173 174 @Override 175 int poll(int timeout) throws IOException { 176 if (timeout > 0) { 177 throw new UnsupportedOperationException(); 178 } 179 boolean block = (timeout == -1); 180 int max = block ? MAX_EVENTS_PER_POLL : MAX_EVENTS_PER_SUBPOLL; 181 int polled = tryPoll(max); 182 if (polled > 0 || !block) { 183 return polled; 184 } else { 185 ring.enter(0, 1, 0); // wait for at least one completion 186 return tryPoll(max); 187 } 188 } 189 190 /** 191 * Poll up to max sockets without blocking. Also handles the completion of any 192 * POLL_REMOVE operations. 193 * @retutn the number of sockets polled 194 */ 195 private int tryPoll(int max) { 196 int polled = 0; 197 Cqe cqe; 198 while (polled < max && ((cqe = ring.pollCompletion()) != null)) { 199 // user data is fd or -fd 200 int fd = (int) cqe.user_data(); 201 if (fd > 0 && fd != wakeupEvent.efd()) { 202 // poll done 203 polled(fd); 204 polled++; 205 } else if (fd < 0) { 206 // cancel done 207 Thread t = cancels.remove(-fd); 208 if (t != null) { 209 LockSupport.unpark(t); 210 } 211 } 212 } 213 return polled; 214 } 215 216 /** 217 * Submit IORING_OP_POLL_ADD operation. 218 */ 219 private static void submitPollAdd(IOUringImpl ring, 220 int fd, 221 int events, 222 long udata) throws IOException { 223 Sqe sqe = new Sqe() 224 .opcode(IORING_OP_POLL_ADD()) 225 .fd(fd) 226 .user_data(udata) 227 .poll_events(events); 228 ring.submit(sqe); 229 int ret = ring.enter(1, 0, 0); // submit 1 230 if (ret < 1) { 231 throw new IOException("io_uring_enter failed, ret=" + ret); 232 } 233 } 234 235 /** 236 * Submit IORING_OP_POLL_REMOVE operation. 237 * @param req_udata the user data to identify the original POLL_ADD 238 * @param udata the user data for the POLL_REMOVE op 239 */ 240 private static void submitPollRemove(IOUringImpl ring, 241 long req_udata, 242 long udata) throws IOException { 243 @SuppressWarnings("restricted") 244 MemorySegment address = MemorySegment.ofAddress(req_udata).reinterpret(ADDRESS_SIZE); 245 Sqe sqe = new Sqe() 246 .opcode(IORING_OP_POLL_REMOVE()) 247 .addr(address) 248 .user_data(udata); 249 ring.submit(sqe); 250 int ret = ring.enter(1, 0, 0); // submit 1 251 if (ret < 1) { 252 throw new IOException("io_uring_enter failed, ret=" + ret); 253 } 254 } 255 }