1 /*
2 * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package sun.nio.ch;
26
27 import java.lang.foreign.Arena;
28 import java.lang.foreign.MemorySegment;
29 import java.lang.foreign.ValueLayout;
30 import java.lang.ref.Cleaner.Cleanable;
31 import java.io.IOException;
32 import java.util.Map;
33 import java.util.concurrent.*;
34 import java.util.concurrent.locks.LockSupport;
35 import java.util.function.BooleanSupplier;
36 import jdk.internal.ref.CleanerFactory;
37 import sun.nio.ch.iouring.IOUring;
38 import sun.nio.ch.iouring.Cqe;
39 import sun.nio.ch.iouring.Sqe;
40 import jdk.internal.ffi.generated.iouring.*;
41 import static jdk.internal.ffi.generated.iouring.iouring_h.*;
42
43 /**
44 * Poller implementation based io_uring.
45 *
46 * @apiNote This implementation is experimental. There are many design choices, esp.
47 * around buffer management, that have to be explored.
48 */
49
50 public class IoUringPoller extends Poller {
51 private static final Arena ARENA = Arena.ofAuto();
52 private static final long ADDRESS_SIZE = ValueLayout.ADDRESS.byteSize();
53 private static final int MAX_BUF_SIZE = 16384;
54
55 // submission queue polling enabled if kernel thread idle time > 0 millis
56 private static final int SQPOLL_IDLE_TIME = Integer.getInteger("jdk.io_uring.sqpoll_idle", 0);
57
58 // submition and completion queue sizes
59 private static final int DEFAULT_SQ_SIZE = (SQPOLL_IDLE_TIME > 0) ? 64 : 4;
60 private static final int SQ_SIZE = Integer.getInteger("jdk.io_uring.sqsize", DEFAULT_SQ_SIZE);
61 private static final int CQ_SIZE = Math.max(SQ_SIZE + 1, 1024);
62
63 // max completion events to consume in a blocking poll and non-blocking subpoll
64 private static final int MAX_EVENTS_PER_POLL = 64;
65 private static final int MAX_EVENTS_PER_SUBPOLL = 8;
66
67 private final int event;
68 private final IOUring ring;
69 private final EventFD readyEvent; // completion events posted to CQ ring
70 private final EventFD wakeupEvent; // wakeup event, used for shutdown
71
72 // close action, and cleaner if this is subpoller
73 private final Runnable closer;
74 private final Cleanable cleaner;
75
76 // used to coordinate access to submission queue
77 private final Object submitLock = new Object();
78
79 // maps file descriptor to Thread when cancelling poll
80 private final Map<Integer, Thread> cancels = new ConcurrentHashMap<>();
81
82 // the map value for in-progress read/write ops
83 private static class Op {
84 final Thread thread;
85 volatile int result;
86 Op(Thread thread) {
87 this.thread = thread;
88 }
89 Thread thread() {
90 return thread;
91 }
92 int result() {
93 return result;
94 }
95 void setResult(int result) {
96 this.result = result;
97 }
98 }
99
100 // maps user data to in-progress read/write ops
101 private final Map<Long, Op> ops;
102
103 // per poller cache of memory segments used for read/write ops
104 private final BlockingQueue<MemorySegment> memorySegmentCache;
105
106 IoUringPoller(Poller.Mode mode,
107 boolean subPoller,
108 boolean read,
109 boolean supportIoOps) throws IOException {
110 IOUring ring = new IOUring(SQ_SIZE, CQ_SIZE, 0, 0, 0, SQPOLL_IDLE_TIME);
111 EventFD wakeupEvent = null;
112 EventFD readyEvent = null;
113
114 if (subPoller) {
115 try {
116 // event to allow registering with master poller
117 readyEvent = new EventFD();
118 ring.register_eventfd(readyEvent.efd());
119
120 // wakeup event to allow for shutdown
121 if (mode == Poller.Mode.POLLER_PER_CARRIER) {
122 wakeupEvent = new EventFD();
123 int efd = wakeupEvent.efd();
124 IOUtil.configureBlocking(efd, false);
125 submitPollAdd(ring, efd, Net.POLLIN, efd);
126 enter(ring, 1);
127 }
128 } catch (Throwable e) {
129 ring.close();
130 if (readyEvent != null) readyEvent.close();
131 if (wakeupEvent != null) wakeupEvent.close();
132 throw e;
133 }
134 }
135
136 this.event = (read) ? Net.POLLIN : Net.POLLOUT;
137 this.ring = ring;
138 this.readyEvent = readyEvent;
139 this.wakeupEvent = wakeupEvent;
140
141 // setup if supporting read/write ops
142 if (supportIoOps) {
143 this.ops = new ConcurrentHashMap<>();
144 this.memorySegmentCache = new LinkedTransferQueue<>();
145 } else {
146 this.ops = null;
147 this.memorySegmentCache = null;
148 }
149
150 // create action to close io_uring instance, register cleaner if this is a subpoller
151 this.closer = closer(ring, readyEvent, wakeupEvent);
152 if (subPoller) {
153 this.cleaner = CleanerFactory.cleaner().register(this, closer);
154 } else {
155 this.cleaner = null;
156 }
157 }
158
159 /**
160 * Returns an action to close the io_uring and release other resources.
161 */
162 private static Runnable closer(IOUring ring, EventFD readyEvent, EventFD wakeupEvent) {
163 return () -> {
164 try {
165 ring.close();
166 if (readyEvent != null) readyEvent.close();
167 if (wakeupEvent != null) wakeupEvent.close();
168 } catch (IOException _) { }
169 };
170 }
171
172 @Override
173 void close() throws IOException {
174 if (cleaner != null) {
175 cleaner.clean();
176 } else {
177 closer.run();
178 }
179 }
180
181 @Override
182 int fdVal() {
183 if (readyEvent == null) {
184 throw new UnsupportedOperationException();
185 }
186 return readyEvent.efd();
187 }
188
189 @Override
190 void pollerPolled() throws IOException {
191 readyEvent.reset();
192 }
193
194 /**
195 * Submits a IORING_OP_POLL_ADD op to poll a file descriptor for read or write.
196 */
197 @Override
198 void implStartPoll(int fd) throws IOException {
199 assert fd > 0; // fd == 0 used for wakeup
200
201 synchronized (submitLock) {
202 // fd is the user data for IORING_OP_POLL_ADD request
203 submitPollAdd(ring, fd, event, fd);
204 enter(1);
205 }
206 }
207
208 /**
209 * Submits a IORING_OP_POLL_REMOVE op, and waits for it complete, to stop polling
210 * a file descriptor. A no-op if already polled.
211 */
212 @Override
213 void implStopPoll(int fd, boolean polled) throws IOException {
214 if (!polled && !isShutdown()) {
215 cancels.put(fd, Thread.currentThread());
216
217 synchronized (submitLock) {
218 // TBD if SQPOLL enabled, need IORING_OP_POLL_ADD to be processed
219
220 // fd was the user data for IORING_OP_POLL_ADD request
221 // -fd is the user data for IORING_OP_POLL_REMOVE request
222 submitPollRemove(fd, -fd);
223 enter(1);
224 }
225
226 while (cancels.containsKey(fd) && !isShutdown()) {
227 LockSupport.park();
228 }
229 }
230 }
231
232 /**
233 * Uses IORING_OP_READ op to read bytes into a byte array.
234 */
235 @Override
236 int implRead(int fd, byte[] b, int off, int len, long nanos, BooleanSupplier isOpen) throws IOException {
237 // off-heap buffer for read op
238 MemorySegment buf = takeMemorySegment();
239 len = Math.min(len, (int) buf.byteSize());
240
241 // use the buf address as the user_data
242 long udata = buf.address();
243 var op = new Op(Thread.currentThread());
244 ops.put(udata, op);
245
246 int res = 0;
247 try {
248 synchronized (submitLock) {
249 submitRead(fd, buf, len, -1L, udata);
250 }
251 if (isOpen.getAsBoolean() && !isShutdown()) {
252 if (nanos > 0) {
253 LockSupport.parkNanos(nanos);
254 } else {
255 LockSupport.park();
256 }
257 }
258 } finally {
259 Op previous = ops.remove(udata);
260 assert previous == op;
261
262 res = op.result();
263 try {
264 if (res > 0) {
265 // copy bytes into the byte array
266 assert res <= len;
267 MemorySegment dst = MemorySegment.ofArray(b);
268 MemorySegment.copy(buf, 0, dst, off, res);
269 } else if (res < 0) {
270 // EOF or read failed
271 if (res != -1) {
272 throw new IOException("IORING_OP_READ failed errno=" + (-res));
273 }
274 } else {
275 // TBD if SQPOLL enabled, need the request to be processed before cancel
276
277 // read did not complete, need to cancel. If the cancel fails then
278 // we can't return the buffer to the cache.
279 cancelOp(fd, udata);
280 res = IOStatus.UNAVAILABLE;
281 }
282 } finally {
283 if (res != 0) {
284 offerMemorySegment(buf);
285 }
286 }
287 }
288 return res;
289 }
290
291 /**
292 * Uses IORING_OP_WRITE op to write bytes from a byte array.
293 */
294 @Override
295 int implWrite(int fd, byte[] b, int off, int len, BooleanSupplier isOpen) throws IOException {
296 // off-heap buffer for write op
297 MemorySegment buf = takeMemorySegment();
298 len = Math.min(len, (int) buf.byteSize());
299
300 // copy the bytes from the byte array into the buffer
301 MemorySegment src = MemorySegment.ofArray(b);
302 MemorySegment.copy(src, off, buf, 0, len);
303
304 // use the buffer address as the user_data
305 long udata = buf.address();
306 var op = new Op(Thread.currentThread());
307 ops.put(udata, op);
308
309 int res = 0;
310 try {
311 synchronized (submitLock) {
312 submitWrite(fd, buf, len, -1L, udata);
313 }
314 if (isOpen.getAsBoolean() && !isShutdown()) {
315 LockSupport.park();
316 }
317 } finally {
318 Op previous = ops.remove(udata);
319 assert previous == op;
320
321 res = op.result();
322 try {
323 if (res > 0) {
324 assert res <= len;
325 } else if (res < 0) {
326 throw new IOException("IORING_OP_WRITE failed errno=" + (-res));
327 } else {
328 // TBD if SQPOLL enabled, need the request to be processed before cancel
329
330 // write did not complete, need to cancel. If the cancel fails then
331 // we can't return the buffer to the cache.
332 cancelOp(fd, udata);
333 res = IOStatus.UNAVAILABLE;
334 }
335 } finally {
336 if (res != 0) {
337 offerMemorySegment(buf);
338 }
339 }
340 }
341 return res;
342 }
343
344 @Override
345 void wakeupPoller() throws IOException {
346 if (wakeupEvent == null) {
347 throw new UnsupportedOperationException();
348 }
349
350 // causes subpoller to wakeup
351 wakeupEvent.set();
352 }
353
354 @Override
355 int poll(int timeout) throws IOException {
356 if (timeout > 0) {
357 // timed polls not supported by this Poller
358 throw new UnsupportedOperationException();
359 }
360 boolean block = (timeout == -1);
361 int max = block ? MAX_EVENTS_PER_POLL : MAX_EVENTS_PER_SUBPOLL;
362 int polled = tryPoll(max);
363 if (polled > 0 || !block) {
364 return polled;
365 } else {
366 int ret = ring.enter(0, 1, 0); // wait for at least one completion
367 if (ret < 0) {
368 throw new IOException("io_uring_enter failed, " +
369 IOUring.getError(ret));
370 }
371 return tryPoll(max);
372 }
373 }
374
375 /**
376 * Poll or handle completions up to the given max without blocking. This method also
377 * handles the completion of any cancelled operations.
378 * @retutn the number of sockets polled and I/O operations completed
379 */
380 private int tryPoll(int max) {
381 int polled = 0;
382 Cqe cqe;
383 while (polled < max && ((cqe = ring.pollCompletion()) != null)) {
384 long udata = cqe.user_data();
385
386 // handle read/write ops
387 if (ops != null) {
388 Op op = ops.get(udata);
389 if (op != null) {
390 int res = cqe.res();
391 op.setResult((res != 0) ? res : -1); // map 0 to -1 at EOF
392 LockSupport.unpark(op.thread());
393 polled++;
394 continue;
395 }
396 }
397
398 // handle poll and cancls ops, user data is fd or -fd
399 int fd = (int) udata;
400 if (fd > 0 && (wakeupEvent == null || fd != wakeupEvent.efd())) {
401 // poll done
402 polled(fd);
403 polled++;
404 } else if (fd < 0) {
405 // cancel done
406 Thread t = cancels.remove(-fd);
407 if (t != null) {
408 LockSupport.unpark(t);
409 }
410 }
411 }
412 return polled;
413 }
414
415 /**
416 * Invoke io_uring_enter to submit the SQE entries
417 */
418 private static void enter(IOUring ring, int n) throws IOException {
419 if (SQPOLL_IDLE_TIME > 0) {
420 ring.pollingEnter();
421 } else {
422 int ret = ring.enter(n, 0, 0);
423 if (ret < 0) {
424 throw new IOException("io_uring_enter failed, " +
425 IOUring.getError(ret));
426 }
427 assert ret == n;
428 }
429 }
430
431 private void enter(int n) throws IOException {
432 enter(ring, n);
433 }
434
435 /**
436 * Submit IORING_OP_POLL_ADD operation.
437 */
438 private static void submitPollAdd(IOUring ring,
439 int fd,
440 int events,
441 long udata) throws IOException {
442 Sqe sqe = new Sqe()
443 .opcode(IORING_OP_POLL_ADD())
444 .fd(fd)
445 .user_data(udata)
446 .poll_events(events);
447 ring.submit(sqe);
448 }
449
450 private void submitPollAdd(int fd, int events, long udata) throws IOException {
451 submitPollAdd(ring, fd, events, udata);
452 }
453
454 /**
455 * Submit IORING_OP_POLL_REMOVE operation.
456 * @param req_udata the user data to identify the original POLL_ADD
457 * @param udata the user data for the POLL_REMOVE op
458 */
459 private void submitPollRemove(long req_udata, long udata) throws IOException {
460 @SuppressWarnings("restricted")
461 MemorySegment address = MemorySegment.ofAddress(req_udata).reinterpret(ADDRESS_SIZE);
462 Sqe sqe = new Sqe()
463 .opcode(IORING_OP_POLL_REMOVE())
464 .addr(address)
465 .user_data(udata);
466 ring.submit(sqe);
467 }
468
469 /**
470 * Submit IORING_OP_READ operation.
471 */
472 private void submitRead(int fd, MemorySegment buf, int len, long pos, long udata) throws IOException {
473 Sqe sqe = new Sqe()
474 .opcode(IORING_OP_READ())
475 .fd(fd)
476 .addr(buf)
477 .len(len)
478 .off(pos) // file position or -1L
479 .user_data(udata);
480 ring.submit(sqe);
481 enter(1);
482 }
483
484 /**
485 * Submit IORING_OP_WRITE operation.
486 */
487 private void submitWrite(int fd, MemorySegment buf, int len, long pos, long udata) throws IOException {
488 Sqe sqe = new Sqe()
489 .opcode(IORING_OP_WRITE())
490 .fd(fd)
491 .addr(buf)
492 .len(len)
493 .off(pos) // file position or -1L
494 .user_data(udata);
495 ring.submit(sqe);
496 enter(1);
497 }
498
499 /**
500 * Cancels an operation submitted with the given user_data.
501 */
502 private void cancelOp(int fd, long req_udata) throws IOException {
503 @SuppressWarnings("restricted")
504 MemorySegment address = MemorySegment.ofAddress(req_udata).reinterpret(ADDRESS_SIZE);
505 cancels.put(fd, Thread.currentThread());
506 synchronized (submitLock) {
507 Sqe sqe = new Sqe()
508 .opcode(IORING_OP_ASYNC_CANCEL())
509 .addr(address)
510 .user_data(-fd); // user data for IORING_OP_ASYNC_CANCEL
511 ring.submit(sqe);
512 enter(1);
513 }
514 while (cancels.containsKey(fd) && !isShutdown()) {
515 LockSupport.park();
516 }
517 }
518
519 private MemorySegment takeMemorySegment() {
520 MemorySegment buf = memorySegmentCache.poll();
521 if (buf == null) {
522 buf = ARENA.allocate(MAX_BUF_SIZE);
523 }
524 return buf;
525 }
526
527 private void offerMemorySegment(MemorySegment buf) {
528 memorySegmentCache.offer(buf);
529 }
530 }