1 /*
2 * Copyright (c) 2019, 2025, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package sun.nio.ch;
27
28 import java.io.FileDescriptor;
29 import java.io.IOException;
30 import java.io.InputStream;
31 import java.io.InterruptedIOException;
32 import java.io.OutputStream;
33 import java.io.UncheckedIOException;
34 import java.lang.ref.Cleaner.Cleanable;
35 import java.net.InetAddress;
36 import java.net.InetSocketAddress;
37 import java.net.ProtocolFamily;
38 import java.net.SocketAddress;
39 import java.net.SocketException;
40 import java.net.SocketImpl;
41 import java.net.SocketOption;
42 import java.net.SocketTimeoutException;
43 import java.net.StandardProtocolFamily;
44 import java.net.StandardSocketOptions;
45 import java.net.UnknownHostException;
46 import java.nio.ByteBuffer;
47 import java.util.Collections;
48 import java.util.HashSet;
49 import java.util.Objects;
50 import java.util.Set;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.locks.ReentrantLock;
53
54 import jdk.internal.access.JavaIOFileDescriptorAccess;
55 import jdk.internal.access.SharedSecrets;
56 import jdk.internal.ref.CleanerFactory;
57 import sun.net.ConnectionResetException;
58 import sun.net.PlatformSocketImpl;
59 import sun.net.ext.ExtendedSocketOptions;
60 import jdk.internal.util.Exceptions;
61
62 import static java.util.concurrent.TimeUnit.MILLISECONDS;
63 import static java.util.concurrent.TimeUnit.NANOSECONDS;
64 import static jdk.internal.util.Exceptions.filterNonSocketInfo;
65 import static jdk.internal.util.Exceptions.formatMsg;
66
67 /**
68 * NIO based SocketImpl.
69 *
70 * The underlying socket used by this SocketImpl is initially configured blocking.
71 * If a connect, accept or read is attempted with a timeout, or a virtual
72 * thread invokes a blocking operation, then the socket is changed to non-blocking
73 * When in non-blocking mode, operations that don't complete immediately will
74 * poll the socket (or park when invoked on a virtual thread) and preserve
75 * the semantics of blocking operations.
76 */
77
78 public final class NioSocketImpl extends SocketImpl implements PlatformSocketImpl {
79 private static final NativeDispatcher nd = new SocketDispatcher();
80
81 // The maximum number of bytes to read/write per syscall to avoid needing
82 // a huge buffer from the temporary buffer cache
83 private static final int MAX_BUFFER_SIZE = 128 * 1024;
84
85 // true if this is a SocketImpl for a ServerSocket
86 private final boolean server;
87
88 // Lock held when reading (also used when accepting or connecting)
89 private final ReentrantLock readLock = new ReentrantLock();
90
91 // Lock held when writing
92 private final ReentrantLock writeLock = new ReentrantLock();
93
94 // The stateLock for read/changing state
95 private final Object stateLock = new Object();
96 private static final int ST_NEW = 0;
97 private static final int ST_UNCONNECTED = 1;
98 private static final int ST_CONNECTING = 2;
99 private static final int ST_CONNECTED = 3;
100 private static final int ST_CLOSING = 4;
101 private static final int ST_CLOSED = 5;
102 private volatile int state; // need stateLock to change
103
104 private Cleanable cleaner;
105
106 // set to true when the socket is in non-blocking mode
107 private volatile boolean nonBlocking;
108
109 // used by connect/read/write/accept, protected by stateLock
110 private long readerThread;
111 private long writerThread;
112
113 // used when SO_REUSEADDR is emulated, protected by stateLock
114 private boolean isReuseAddress;
115
116 // read or accept timeout in millis
117 private volatile int timeout;
118
119 // flags to indicate if the connection is shutdown for input and output
120 private volatile boolean isInputClosed;
121 private volatile boolean isOutputClosed;
122
123 // used by read to emulate legacy behavior, protected by readLock
124 private boolean readEOF;
125 private boolean connectionReset;
126
127 /**
128 * Creates an instance of this SocketImpl.
129 * @param server true if this is a SocketImpl for a ServerSocket
130 */
131 public NioSocketImpl(boolean server) {
132 this.server = server;
133 }
134
135 /**
136 * Returns true if the socket is open.
137 */
138 private boolean isOpen() {
139 return state < ST_CLOSING;
140 }
141
142 /**
143 * Throws SocketException if the socket is not open.
144 */
145 private void ensureOpen() throws SocketException {
146 int state = this.state;
147 if (state == ST_NEW)
148 throw new SocketException("Socket not created");
149 if (state >= ST_CLOSING)
150 throw new SocketException("Socket closed");
151 }
152
153 /**
154 * Throws SocketException if the socket is not open and connected.
155 */
156 private void ensureOpenAndConnected() throws SocketException {
157 int state = this.state;
158 if (state < ST_CONNECTED)
159 throw new SocketException("Not connected");
160 if (state > ST_CONNECTED)
161 throw new SocketException("Socket closed");
162 }
163
164 /**
165 * Disables the current thread for scheduling purposes until the
166 * socket is ready for I/O or is asynchronously closed, for up to the
167 * specified waiting time.
168 * @throws IOException if an I/O error occurs
169 */
170 private void park(FileDescriptor fd, int event, long nanos) throws IOException {
171 Thread t = Thread.currentThread();
172 if (t.isVirtual()) {
173 Poller.poll(fdVal(fd), event, nanos, this::isOpen);
174 if (t.isInterrupted()) {
175 throw new InterruptedIOException();
176 }
177 } else {
178 long millis;
179 if (nanos == 0) {
180 millis = -1;
181 } else {
182 millis = NANOSECONDS.toMillis(nanos);
183 if (nanos > MILLISECONDS.toNanos(millis)) {
184 // Round up any excess nanos to the nearest millisecond to
185 // avoid parking for less than requested.
186 millis++;
187 }
188 }
189 Net.poll(fd, event, millis);
190 }
191 }
192
193 /**
194 * Disables the current thread for scheduling purposes until the socket is
195 * ready for I/O or is asynchronously closed.
196 * @throws IOException if an I/O error occurs
197 */
198 private void park(FileDescriptor fd, int event) throws IOException {
199 park(fd, event, 0);
200 }
201
202 /**
203 * Ensures that the socket is configured non-blocking invoked on a virtual
204 * thread or the operation has a timeout
205 * @throws IOException if there is an I/O error changing the blocking mode
206 */
207 private void configureNonBlockingIfNeeded(FileDescriptor fd, boolean timed)
208 throws IOException
209 {
210 if (!nonBlocking
211 && (timed || Thread.currentThread().isVirtual())) {
212 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
213 IOUtil.configureBlocking(fd, false);
214 nonBlocking = true;
215 }
216 }
217
218 /**
219 * Marks the beginning of a read operation that might block.
220 * @throws SocketException if the socket is closed or not connected
221 */
222 private FileDescriptor beginRead() throws SocketException {
223 synchronized (stateLock) {
224 ensureOpenAndConnected();
225 readerThread = NativeThread.current();
226 return fd;
227 }
228 }
229
230 /**
231 * Marks the end of a read operation that may have blocked.
232 * @throws SocketException is the socket is closed
233 */
234 private void endRead(boolean completed) throws SocketException {
235 synchronized (stateLock) {
236 readerThread = 0;
237 int state = this.state;
238 if (state == ST_CLOSING)
239 tryFinishClose();
240 if (!completed && state >= ST_CLOSING)
241 throw new SocketException("Socket closed");
242 }
243 }
244
245 /**
246 * Attempts to read bytes from the socket into the given byte array.
247 */
248 private int tryRead(FileDescriptor fd, byte[] b, int off, int len)
249 throws IOException
250 {
251 ByteBuffer dst = Util.getTemporaryDirectBuffer(len);
252 assert dst.position() == 0;
253 try {
254 int n = nd.read(fd, ((DirectBuffer)dst).address(), len);
255 if (n > 0) {
256 dst.get(b, off, n);
257 }
258 return n;
259 } finally {
260 Util.offerFirstTemporaryDirectBuffer(dst);
261 }
262 }
263
264 /**
265 * Reads bytes from the socket into the given byte array with a timeout.
266 * @throws SocketTimeoutException if the read timeout elapses
267 */
268 private int timedRead(FileDescriptor fd, byte[] b, int off, int len, long nanos)
269 throws IOException
270 {
271 long startNanos = System.nanoTime();
272 int n = tryRead(fd, b, off, len);
273 while (n == IOStatus.UNAVAILABLE && isOpen()) {
274 long remainingNanos = nanos - (System.nanoTime() - startNanos);
275 if (remainingNanos <= 0) {
276 throw new SocketTimeoutException("Read timed out");
277 }
278 park(fd, Net.POLLIN, remainingNanos);
279 n = tryRead(fd, b, off, len);
280 }
281 return n;
282 }
283
284 /**
285 * Reads bytes from the socket into the given byte array.
286 * @return the number of bytes read or -1 at EOF
287 * @throws SocketException if the socket is closed or a socket I/O error occurs
288 * @throws SocketTimeoutException if the read timeout elapses
289 */
290 private int implRead(byte[] b, int off, int len, long remainingNanos) throws IOException {
291 int n = 0;
292 FileDescriptor fd = beginRead();
293 try {
294 if (connectionReset)
295 throw new SocketException("Connection reset");
296 if (isInputClosed)
297 return -1;
298 configureNonBlockingIfNeeded(fd, remainingNanos > 0);
299 if (remainingNanos > 0) {
300 // read with timeout
301 n = timedRead(fd, b, off, len, remainingNanos);
302 } else {
303 // read, no timeout
304 n = tryRead(fd, b, off, len);
305 while (IOStatus.okayToRetry(n) && isOpen()) {
306 park(fd, Net.POLLIN);
307 n = tryRead(fd, b, off, len);
308 }
309 }
310 return n;
311 } catch (InterruptedIOException e) {
312 throw e;
313 } catch (ConnectionResetException e) {
314 connectionReset = true;
315 throw new SocketException("Connection reset");
316 } catch (IOException ioe) {
317 // throw SocketException to maintain compatibility
318 throw asSocketException(ioe);
319 } finally {
320 endRead(n > 0);
321 }
322 }
323
324 /**
325 * Reads bytes from the socket into the given byte array.
326 * @return the number of bytes read or -1 at EOF
327 * @throws IndexOutOfBoundsException if the bound checks fail
328 * @throws SocketException if the socket is closed or a socket I/O error occurs
329 * @throws SocketTimeoutException if the read timeout elapses
330 */
331 private int read(byte[] b, int off, int len) throws IOException {
332 Objects.checkFromIndexSize(off, len, b.length);
333 if (len == 0) {
334 return 0;
335 } else {
336 long remainingNanos = 0;
337 int timeout = this.timeout;
338 if (timeout > 0) {
339 remainingNanos = tryLock(readLock, timeout, MILLISECONDS);
340 if (remainingNanos <= 0) {
341 assert !readLock.isHeldByCurrentThread();
342 throw new SocketTimeoutException("Read timed out");
343 }
344 } else {
345 readLock.lock();
346 }
347 try {
348 // emulate legacy behavior to return -1, even if socket is closed
349 if (readEOF)
350 return -1;
351 // read up to MAX_BUFFER_SIZE bytes
352 int size = Math.min(len, MAX_BUFFER_SIZE);
353 int n = implRead(b, off, size, remainingNanos);
354 if (n == -1)
355 readEOF = true;
356 return n;
357 } finally {
358 readLock.unlock();
359 }
360 }
361 }
362
363 /**
364 * Marks the beginning of a write operation that might block.
365 * @throws SocketException if the socket is closed or not connected
366 */
367 private FileDescriptor beginWrite() throws SocketException {
368 synchronized (stateLock) {
369 ensureOpenAndConnected();
370 writerThread = NativeThread.current();
371 return fd;
372 }
373 }
374
375 /**
376 * Marks the end of a write operation that may have blocked.
377 * @throws SocketException is the socket is closed
378 */
379 private void endWrite(boolean completed) throws SocketException {
380 synchronized (stateLock) {
381 writerThread = 0;
382 int state = this.state;
383 if (state == ST_CLOSING)
384 tryFinishClose();
385 if (!completed && state >= ST_CLOSING)
386 throw new SocketException("Socket closed");
387 }
388 }
389
390 /**
391 * Attempts to write a sequence of bytes to the socket from the given
392 * byte array.
393 */
394 private int tryWrite(FileDescriptor fd, byte[] b, int off, int len)
395 throws IOException
396 {
397 ByteBuffer src = Util.getTemporaryDirectBuffer(len);
398 assert src.position() == 0;
399 try {
400 src.put(b, off, len);
401 return nd.write(fd, ((DirectBuffer)src).address(), len);
402 } finally {
403 Util.offerFirstTemporaryDirectBuffer(src);
404 }
405 }
406
407 /**
408 * Writes a sequence of bytes to the socket from the given byte array.
409 * @return the number of bytes written
410 * @throws SocketException if the socket is closed or a socket I/O error occurs
411 */
412 private int implWrite(byte[] b, int off, int len) throws IOException {
413 int n = 0;
414 FileDescriptor fd = beginWrite();
415 try {
416 configureNonBlockingIfNeeded(fd, false);
417 n = tryWrite(fd, b, off, len);
418 while (IOStatus.okayToRetry(n) && isOpen()) {
419 park(fd, Net.POLLOUT);
420 n = tryWrite(fd, b, off, len);
421 }
422 return n;
423 } catch (InterruptedIOException e) {
424 throw e;
425 } catch (IOException ioe) {
426 // throw SocketException to maintain compatibility
427 throw asSocketException(ioe);
428 } finally {
429 endWrite(n > 0);
430 }
431 }
432
433 /**
434 * Writes a sequence of bytes to the socket from the given byte array.
435 * @throws SocketException if the socket is closed or a socket I/O error occurs
436 */
437 private void write(byte[] b, int off, int len) throws IOException {
438 Objects.checkFromIndexSize(off, len, b.length);
439 if (len > 0) {
440 writeLock.lock();
441 try {
442 int pos = off;
443 int end = off + len;
444 while (pos < end) {
445 // write up to MAX_BUFFER_SIZE bytes
446 int size = Math.min((end - pos), MAX_BUFFER_SIZE);
447 int n = implWrite(b, pos, size);
448 pos += n;
449 }
450 } finally {
451 writeLock.unlock();
452 }
453 }
454 }
455
456 /**
457 * Creates the socket.
458 * @param stream {@code true} for a streams socket
459 */
460 @Override
461 protected void create(boolean stream) throws IOException {
462 if (!stream) {
463 throw new IOException("Datagram socket creation not supported");
464 }
465 synchronized (stateLock) {
466 if (state != ST_NEW)
467 throw new IOException("Already created");
468 FileDescriptor fd;
469 if (server) {
470 fd = Net.serverSocket();
471 } else {
472 fd = Net.socket();
473 }
474 Runnable closer = closerFor(fd);
475 this.fd = fd;
476 this.cleaner = CleanerFactory.cleaner().register(this, closer);
477 this.state = ST_UNCONNECTED;
478 }
479 }
480
481 /**
482 * Marks the beginning of a connect operation that might block.
483 * @throws SocketException if the socket is closed or already connected
484 */
485 private FileDescriptor beginConnect(InetAddress address, int port)
486 throws IOException
487 {
488 synchronized (stateLock) {
489 int state = this.state;
490 if (state != ST_UNCONNECTED) {
491 if (state == ST_NEW)
492 throw new SocketException("Not created");
493 if (state == ST_CONNECTING)
494 throw new SocketException("Connection in progress");
495 if (state == ST_CONNECTED)
496 throw new SocketException("Already connected");
497 if (state >= ST_CLOSING)
498 throw new SocketException("Socket closed");
499 assert false;
500 }
501 this.state = ST_CONNECTING;
502
503 // save the remote address/port
504 this.address = address;
505 this.port = port;
506
507 readerThread = NativeThread.current();
508 return fd;
509 }
510 }
511
512 /**
513 * Marks the end of a connect operation that may have blocked.
514 * @throws SocketException is the socket is closed
515 */
516 private void endConnect(FileDescriptor fd, boolean completed) throws IOException {
517 synchronized (stateLock) {
518 readerThread = 0;
519 int state = this.state;
520 if (state == ST_CLOSING)
521 tryFinishClose();
522 if (completed && state == ST_CONNECTING) {
523 this.state = ST_CONNECTED;
524 localport = Net.localAddress(fd).getPort();
525 } else if (!completed && state >= ST_CLOSING) {
526 throw new SocketException("Socket closed");
527 }
528 }
529 }
530
531 /**
532 * Waits for a connection attempt to finish with a timeout
533 * @throws SocketTimeoutException if the connect timeout elapses
534 */
535 private boolean timedFinishConnect(FileDescriptor fd, long nanos) throws IOException {
536 long startNanos = System.nanoTime();
537 boolean polled = Net.pollConnectNow(fd);
538 while (!polled && isOpen()) {
539 long remainingNanos = nanos - (System.nanoTime() - startNanos);
540 if (remainingNanos <= 0) {
541 throw new SocketTimeoutException("Connect timed out");
542 }
543 park(fd, Net.POLLOUT, remainingNanos);
544 polled = Net.pollConnectNow(fd);
545 }
546 return polled && isOpen();
547 }
548
549 /**
550 * Attempts to establish a connection to the given socket address with a
551 * timeout. Closes the socket if connection cannot be established.
552 * @throws IOException if the address is not a resolved InetSocketAddress or
553 * the connection cannot be established
554 */
555 @Override
556 protected void connect(SocketAddress remote, int millis) throws IOException {
557 // SocketImpl connect only specifies IOException
558 if (!(remote instanceof InetSocketAddress))
559 throw new IOException("Unsupported address type");
560 InetSocketAddress isa = (InetSocketAddress) remote;
561 if (isa.isUnresolved()) {
562 throw new UnknownHostException(
563 formatMsg(filterNonSocketInfo(isa.getHostName())));
564 }
565
566 InetAddress address = isa.getAddress();
567 if (address.isAnyLocalAddress())
568 address = InetAddress.getLocalHost();
569 int port = isa.getPort();
570
571 ReentrantLock connectLock = readLock;
572 try {
573 connectLock.lock();
574 try {
575 boolean connected = false;
576 FileDescriptor fd = beginConnect(address, port);
577 try {
578 configureNonBlockingIfNeeded(fd, millis > 0);
579 int n = Net.connect(fd, address, port);
580 if (n > 0) {
581 // connection established
582 connected = true;
583 } else {
584 assert IOStatus.okayToRetry(n);
585 if (millis > 0) {
586 // finish connect with timeout
587 long nanos = MILLISECONDS.toNanos(millis);
588 connected = timedFinishConnect(fd, nanos);
589 } else {
590 // finish connect, no timeout
591 boolean polled = false;
592 while (!polled && isOpen()) {
593 park(fd, Net.POLLOUT);
594 polled = Net.pollConnectNow(fd);
595 }
596 connected = polled && isOpen();
597 }
598 }
599 } finally {
600 endConnect(fd, connected);
601 }
602 } finally {
603 connectLock.unlock();
604 }
605 } catch (IOException ioe) {
606 close();
607 if (ioe instanceof SocketTimeoutException) {
608 throw ioe;
609 } else if (ioe instanceof InterruptedIOException) {
610 assert Thread.currentThread().isVirtual();
611 throw new SocketException("Closed by interrupt");
612 } else {
613 throw Exceptions.ioException(ioe, isa);
614 }
615 }
616 }
617
618 @Override
619 protected void connect(String host, int port) throws IOException {
620 connect(new InetSocketAddress(host, port), 0);
621 }
622
623 @Override
624 protected void connect(InetAddress address, int port) throws IOException {
625 connect(new InetSocketAddress(address, port), 0);
626 }
627
628 @Override
629 protected void bind(InetAddress host, int port) throws IOException {
630 synchronized (stateLock) {
631 ensureOpen();
632 if (localport != 0)
633 throw new SocketException("Already bound");
634 Net.bind(fd, host, port);
635 // set the address field to the given host address to
636 // maintain long standing behavior. When binding to 0.0.0.0
637 // then the actual local address will be ::0 when IPv6 is enabled.
638 address = host;
639 localport = Net.localAddress(fd).getPort();
640 }
641 }
642
643 @Override
644 protected void listen(int backlog) throws IOException {
645 synchronized (stateLock) {
646 ensureOpen();
647 if (localport == 0)
648 throw new SocketException("Not bound");
649 Net.listen(fd, backlog < 1 ? 50 : backlog);
650 }
651 }
652
653 /**
654 * Marks the beginning of an accept operation that might block.
655 * @throws SocketException if the socket is closed
656 */
657 private FileDescriptor beginAccept() throws SocketException {
658 synchronized (stateLock) {
659 ensureOpen();
660 if (localport == 0)
661 throw new SocketException("Not bound");
662 readerThread = NativeThread.current();
663 return fd;
664 }
665 }
666
667 /**
668 * Marks the end of an accept operation that may have blocked.
669 * @throws SocketException is the socket is closed
670 */
671 private void endAccept(boolean completed) throws SocketException {
672 synchronized (stateLock) {
673 int state = this.state;
674 readerThread = 0;
675 if (state == ST_CLOSING)
676 tryFinishClose();
677 if (!completed && state >= ST_CLOSING)
678 throw new SocketException("Socket closed");
679 }
680 }
681
682 /**
683 * Accepts a new connection with a timeout.
684 * @throws SocketTimeoutException if the accept timeout elapses
685 */
686 private int timedAccept(FileDescriptor fd,
687 FileDescriptor newfd,
688 InetSocketAddress[] isaa,
689 long nanos)
690 throws IOException
691 {
692 long startNanos = System.nanoTime();
693 int n = Net.accept(fd, newfd, isaa);
694 while (n == IOStatus.UNAVAILABLE && isOpen()) {
695 long remainingNanos = nanos - (System.nanoTime() - startNanos);
696 if (remainingNanos <= 0) {
697 throw new SocketTimeoutException("Accept timed out");
698 }
699 park(fd, Net.POLLIN, remainingNanos);
700 n = Net.accept(fd, newfd, isaa);
701 }
702 return n;
703 }
704
705 /**
706 * Accepts a new connection so that the given SocketImpl is connected to
707 * the peer. The SocketImpl must be a newly created NioSocketImpl.
708 */
709 @Override
710 protected void accept(SocketImpl si) throws IOException {
711 NioSocketImpl nsi = (NioSocketImpl) si;
712 if (nsi.state != ST_NEW)
713 throw new SocketException("Not a newly created SocketImpl");
714
715 FileDescriptor newfd = new FileDescriptor();
716 InetSocketAddress[] isaa = new InetSocketAddress[1];
717
718 // acquire the lock, adjusting the timeout for cases where several
719 // threads are accepting connections and there is a timeout set
720 ReentrantLock acceptLock = readLock;
721 int timeout = this.timeout;
722 long remainingNanos = 0;
723 if (timeout > 0) {
724 remainingNanos = tryLock(acceptLock, timeout, MILLISECONDS);
725 if (remainingNanos <= 0) {
726 assert !acceptLock.isHeldByCurrentThread();
727 throw new SocketTimeoutException("Accept timed out");
728 }
729 } else {
730 acceptLock.lock();
731 }
732
733 // accept a connection
734 try {
735 int n = 0;
736 FileDescriptor fd = beginAccept();
737 try {
738 configureNonBlockingIfNeeded(fd, remainingNanos > 0);
739 if (remainingNanos > 0) {
740 // accept with timeout
741 n = timedAccept(fd, newfd, isaa, remainingNanos);
742 } else {
743 // accept, no timeout
744 n = Net.accept(fd, newfd, isaa);
745 while (IOStatus.okayToRetry(n) && isOpen()) {
746 park(fd, Net.POLLIN);
747 n = Net.accept(fd, newfd, isaa);
748 }
749 }
750 } finally {
751 endAccept(n > 0);
752 assert IOStatus.check(n);
753 }
754 } finally {
755 acceptLock.unlock();
756 }
757
758 // get local address and configure accepted socket to blocking mode
759 InetSocketAddress localAddress;
760 try {
761 localAddress = Net.localAddress(newfd);
762 IOUtil.configureBlocking(newfd, true);
763 } catch (IOException ioe) {
764 nd.close(newfd);
765 throw ioe;
766 }
767
768 // set the fields
769 Runnable closer = closerFor(newfd);
770 synchronized (nsi.stateLock) {
771 nsi.fd = newfd;
772 nsi.cleaner = CleanerFactory.cleaner().register(nsi, closer);
773 nsi.localport = localAddress.getPort();
774 nsi.address = isaa[0].getAddress();
775 nsi.port = isaa[0].getPort();
776 nsi.state = ST_CONNECTED;
777 }
778 }
779
780 @Override
781 protected InputStream getInputStream() {
782 return new InputStream() {
783 @Override
784 public int read() throws IOException {
785 byte[] a = new byte[1];
786 int n = read(a, 0, 1);
787 return (n > 0) ? (a[0] & 0xff) : -1;
788 }
789 @Override
790 public int read(byte[] b, int off, int len) throws IOException {
791 return NioSocketImpl.this.read(b, off, len);
792 }
793 @Override
794 public int available() throws IOException {
795 return NioSocketImpl.this.available();
796 }
797 @Override
798 public void close() throws IOException {
799 NioSocketImpl.this.close();
800 }
801 };
802 }
803
804 @Override
805 protected OutputStream getOutputStream() {
806 return new OutputStream() {
807 @Override
808 public void write(int b) throws IOException {
809 byte[] a = new byte[]{(byte) b};
810 write(a, 0, 1);
811 }
812 @Override
813 public void write(byte[] b, int off, int len) throws IOException {
814 NioSocketImpl.this.write(b, off, len);
815 }
816 @Override
817 public void close() throws IOException {
818 NioSocketImpl.this.close();
819 }
820 };
821 }
822
823 @Override
824 protected int available() throws IOException {
825 synchronized (stateLock) {
826 ensureOpenAndConnected();
827 if (isInputClosed) {
828 return 0;
829 } else {
830 return Net.available(fd);
831 }
832 }
833 }
834
835 /**
836 * Closes the socket if there are no I/O operations in progress.
837 */
838 private boolean tryClose() throws IOException {
839 assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
840 if (readerThread == 0 && writerThread == 0) {
841 try {
842 cleaner.clean();
843 } catch (UncheckedIOException ioe) {
844 throw ioe.getCause();
845 } finally {
846 state = ST_CLOSED;
847 }
848 return true;
849 } else {
850 return false;
851 }
852 }
853
854 /**
855 * Invokes tryClose to attempt to close the socket.
856 *
857 * This method is used for deferred closing by I/O operations.
858 */
859 private void tryFinishClose() {
860 try {
861 tryClose();
862 } catch (IOException ignore) { }
863 }
864
865 /**
866 * Closes the socket. If there are I/O operations in progress then the
867 * socket is pre-closed and the threads are signalled. The socket will be
868 * closed when the last I/O operation aborts.
869 */
870 @Override
871 protected void close() throws IOException {
872 synchronized (stateLock) {
873 int state = this.state;
874 if (state >= ST_CLOSING)
875 return;
876 if (state == ST_NEW) {
877 // stillborn
878 this.state = ST_CLOSED;
879 return;
880 }
881 boolean connected = (state == ST_CONNECTED);
882 this.state = ST_CLOSING;
883
884 // shutdown output when linger interval not set to 0
885 if (connected) {
886 try {
887 var SO_LINGER = StandardSocketOptions.SO_LINGER;
888 if ((int) Net.getSocketOption(fd, SO_LINGER) != 0) {
889 Net.shutdown(fd, Net.SHUT_WR);
890 }
891 } catch (IOException ignore) { }
892 }
893
894 // attempt to close the socket. If there are I/O operations in progress
895 // then the socket is pre-closed and the thread(s) signalled. The
896 // last thread will close the file descriptor.
897 if (!tryClose()) {
898 nd.preClose(fd, readerThread, writerThread);
899 }
900 }
901 }
902
903 // the socket options supported by client and server sockets
904 private static volatile Set<SocketOption<?>> clientSocketOptions;
905 private static volatile Set<SocketOption<?>> serverSocketOptions;
906
907 @Override
908 protected Set<SocketOption<?>> supportedOptions() {
909 Set<SocketOption<?>> options = (server) ? serverSocketOptions : clientSocketOptions;
910 if (options == null) {
911 options = new HashSet<>();
912 options.add(StandardSocketOptions.SO_RCVBUF);
913 options.add(StandardSocketOptions.SO_REUSEADDR);
914 if (server) {
915 // IP_TOS added for server socket to maintain compatibility
916 options.add(StandardSocketOptions.IP_TOS);
917 options.addAll(ExtendedSocketOptions.serverSocketOptions());
918 } else {
919 options.add(StandardSocketOptions.IP_TOS);
920 options.add(StandardSocketOptions.SO_KEEPALIVE);
921 options.add(StandardSocketOptions.SO_SNDBUF);
922 options.add(StandardSocketOptions.SO_LINGER);
923 options.add(StandardSocketOptions.TCP_NODELAY);
924 options.addAll(ExtendedSocketOptions.clientSocketOptions());
925 }
926 if (Net.isReusePortAvailable())
927 options.add(StandardSocketOptions.SO_REUSEPORT);
928 options = Collections.unmodifiableSet(options);
929 if (server) {
930 serverSocketOptions = options;
931 } else {
932 clientSocketOptions = options;
933 }
934 }
935 return options;
936 }
937
938 @Override
939 protected <T> void setOption(SocketOption<T> opt, T value) throws IOException {
940 if (!supportedOptions().contains(opt))
941 throw new UnsupportedOperationException("'" + opt + "' not supported");
942 if (!opt.type().isInstance(value))
943 throw new IllegalArgumentException("Invalid value '" + value + "'");
944 synchronized (stateLock) {
945 ensureOpen();
946 if (opt == StandardSocketOptions.IP_TOS) {
947 // maps to IPV6_TCLASS and/or IP_TOS
948 Net.setIpSocketOption(fd, family(), opt, value);
949 } else if (opt == StandardSocketOptions.SO_REUSEADDR) {
950 boolean b = (boolean) value;
951 if (Net.useExclusiveBind()) {
952 isReuseAddress = b;
953 } else {
954 Net.setSocketOption(fd, opt, b);
955 }
956 } else {
957 // option does not need special handling
958 Net.setSocketOption(fd, opt, value);
959 }
960 }
961 }
962
963 @SuppressWarnings("unchecked")
964 protected <T> T getOption(SocketOption<T> opt) throws IOException {
965 if (!supportedOptions().contains(opt))
966 throw new UnsupportedOperationException("'" + opt + "' not supported");
967 synchronized (stateLock) {
968 ensureOpen();
969 if (opt == StandardSocketOptions.IP_TOS) {
970 return (T) Net.getSocketOption(fd, family(), opt);
971 } else if (opt == StandardSocketOptions.SO_REUSEADDR) {
972 if (Net.useExclusiveBind()) {
973 return (T) Boolean.valueOf(isReuseAddress);
974 } else {
975 return (T) Net.getSocketOption(fd, opt);
976 }
977 } else {
978 // option does not need special handling
979 return (T) Net.getSocketOption(fd, opt);
980 }
981 }
982 }
983
984 private boolean booleanValue(Object value, String desc) throws SocketException {
985 if (!(value instanceof Boolean))
986 throw new SocketException("Bad value for " + desc);
987 return (boolean) value;
988 }
989
990 private int intValue(Object value, String desc) throws SocketException {
991 if (!(value instanceof Integer))
992 throw new SocketException("Bad value for " + desc);
993 return (int) value;
994 }
995
996 @Override
997 public void setOption(int opt, Object value) throws SocketException {
998 synchronized (stateLock) {
999 ensureOpen();
1000 try {
1001 switch (opt) {
1002 case SO_LINGER: {
1003 // the value is "false" to disable, or linger interval to enable
1004 int i;
1005 if (value instanceof Boolean && ((boolean) value) == false) {
1006 i = -1;
1007 } else {
1008 i = intValue(value, "SO_LINGER");
1009 }
1010 Net.setSocketOption(fd, StandardSocketOptions.SO_LINGER, i);
1011 break;
1012 }
1013 case SO_TIMEOUT: {
1014 int i = intValue(value, "SO_TIMEOUT");
1015 if (i < 0)
1016 throw new IllegalArgumentException("timeout < 0");
1017 timeout = i;
1018 break;
1019 }
1020 case IP_TOS: {
1021 int i = intValue(value, "IP_TOS");
1022 Net.setIpSocketOption(fd, family(), StandardSocketOptions.IP_TOS, i);
1023 break;
1024 }
1025 case TCP_NODELAY: {
1026 boolean b = booleanValue(value, "TCP_NODELAY");
1027 Net.setSocketOption(fd, StandardSocketOptions.TCP_NODELAY, b);
1028 break;
1029 }
1030 case SO_SNDBUF: {
1031 int i = intValue(value, "SO_SNDBUF");
1032 if (i <= 0)
1033 throw new SocketException("SO_SNDBUF <= 0");
1034 Net.setSocketOption(fd, StandardSocketOptions.SO_SNDBUF, i);
1035 break;
1036 }
1037 case SO_RCVBUF: {
1038 int i = intValue(value, "SO_RCVBUF");
1039 if (i <= 0)
1040 throw new SocketException("SO_RCVBUF <= 0");
1041 Net.setSocketOption(fd, StandardSocketOptions.SO_RCVBUF, i);
1042 break;
1043 }
1044 case SO_KEEPALIVE: {
1045 boolean b = booleanValue(value, "SO_KEEPALIVE");
1046 Net.setSocketOption(fd, StandardSocketOptions.SO_KEEPALIVE, b);
1047 break;
1048 }
1049 case SO_OOBINLINE: {
1050 boolean b = booleanValue(value, "SO_OOBINLINE");
1051 Net.setSocketOption(fd, ExtendedSocketOption.SO_OOBINLINE, b);
1052 break;
1053 }
1054 case SO_REUSEADDR: {
1055 boolean b = booleanValue(value, "SO_REUSEADDR");
1056 if (Net.useExclusiveBind()) {
1057 isReuseAddress = b;
1058 } else {
1059 Net.setSocketOption(fd, StandardSocketOptions.SO_REUSEADDR, b);
1060 }
1061 break;
1062 }
1063 case SO_REUSEPORT: {
1064 if (!Net.isReusePortAvailable())
1065 throw new SocketException("SO_REUSEPORT not supported");
1066 boolean b = booleanValue(value, "SO_REUSEPORT");
1067 Net.setSocketOption(fd, StandardSocketOptions.SO_REUSEPORT, b);
1068 break;
1069 }
1070 default:
1071 throw new SocketException("Unknown option " + opt);
1072 }
1073 } catch (SocketException e) {
1074 throw e;
1075 } catch (IllegalArgumentException | IOException e) {
1076 throw asSocketException(e);
1077 }
1078 }
1079 }
1080
1081 @Override
1082 public Object getOption(int opt) throws SocketException {
1083 synchronized (stateLock) {
1084 ensureOpen();
1085 try {
1086 switch (opt) {
1087 case SO_TIMEOUT:
1088 return timeout;
1089 case TCP_NODELAY:
1090 return Net.getSocketOption(fd, StandardSocketOptions.TCP_NODELAY);
1091 case SO_OOBINLINE:
1092 return Net.getSocketOption(fd, ExtendedSocketOption.SO_OOBINLINE);
1093 case SO_LINGER: {
1094 // return "false" when disabled, linger interval when enabled
1095 int i = (int) Net.getSocketOption(fd, StandardSocketOptions.SO_LINGER);
1096 if (i == -1) {
1097 return Boolean.FALSE;
1098 } else {
1099 return i;
1100 }
1101 }
1102 case SO_REUSEADDR:
1103 if (Net.useExclusiveBind()) {
1104 return isReuseAddress;
1105 } else {
1106 return Net.getSocketOption(fd, StandardSocketOptions.SO_REUSEADDR);
1107 }
1108 case SO_BINDADDR:
1109 return Net.localAddress(fd).getAddress();
1110 case SO_SNDBUF:
1111 return Net.getSocketOption(fd, StandardSocketOptions.SO_SNDBUF);
1112 case SO_RCVBUF:
1113 return Net.getSocketOption(fd, StandardSocketOptions.SO_RCVBUF);
1114 case IP_TOS:
1115 return Net.getSocketOption(fd, family(), StandardSocketOptions.IP_TOS);
1116 case SO_KEEPALIVE:
1117 return Net.getSocketOption(fd, StandardSocketOptions.SO_KEEPALIVE);
1118 case SO_REUSEPORT:
1119 if (!Net.isReusePortAvailable())
1120 throw new SocketException("SO_REUSEPORT not supported");
1121 return Net.getSocketOption(fd, StandardSocketOptions.SO_REUSEPORT);
1122 default:
1123 throw new SocketException("Unknown option " + opt);
1124 }
1125 } catch (SocketException e) {
1126 throw e;
1127 } catch (IllegalArgumentException | IOException e) {
1128 throw asSocketException(e);
1129 }
1130 }
1131 }
1132
1133 @Override
1134 protected void shutdownInput() throws IOException {
1135 synchronized (stateLock) {
1136 ensureOpenAndConnected();
1137 if (!isInputClosed) {
1138 Net.shutdown(fd, Net.SHUT_RD);
1139 if (NativeThread.isVirtualThread(readerThread)) {
1140 Poller.stopPoll(fdVal(fd), Net.POLLIN);
1141 }
1142 isInputClosed = true;
1143 }
1144 }
1145 }
1146
1147 @Override
1148 protected void shutdownOutput() throws IOException {
1149 synchronized (stateLock) {
1150 ensureOpenAndConnected();
1151 if (!isOutputClosed) {
1152 Net.shutdown(fd, Net.SHUT_WR);
1153 if (NativeThread.isVirtualThread(writerThread)) {
1154 Poller.stopPoll(fdVal(fd), Net.POLLOUT);
1155 }
1156 isOutputClosed = true;
1157 }
1158 }
1159 }
1160
1161 @Override
1162 protected boolean supportsUrgentData() {
1163 return true;
1164 }
1165
1166 @Override
1167 protected void sendUrgentData(int data) throws IOException {
1168 writeLock.lock();
1169 try {
1170 int n = 0;
1171 FileDescriptor fd = beginWrite();
1172 try {
1173 configureNonBlockingIfNeeded(fd, false);
1174 do {
1175 n = Net.sendOOB(fd, (byte) data);
1176 } while (n == IOStatus.INTERRUPTED && isOpen());
1177 if (n == IOStatus.UNAVAILABLE) {
1178 throw new SocketException("No buffer space available");
1179 }
1180 } finally {
1181 endWrite(n > 0);
1182 }
1183 } finally {
1184 writeLock.unlock();
1185 }
1186 }
1187
1188 /**
1189 * Returns an action to close the given file descriptor.
1190 */
1191 private static Runnable closerFor(FileDescriptor fd) {
1192 return () -> {
1193 try {
1194 nd.close(fd);
1195 } catch (IOException ioe) {
1196 throw new UncheckedIOException(ioe);
1197 }
1198 };
1199 }
1200
1201 /**
1202 * Attempts to acquire the given lock within the given waiting time.
1203 * @return the remaining time in nanoseconds when the lock is acquired, zero
1204 * or less if the lock was not acquired before the timeout expired
1205 */
1206 private static long tryLock(ReentrantLock lock, long timeout, TimeUnit unit) {
1207 assert timeout > 0;
1208 boolean interrupted = false;
1209 long nanos = unit.toNanos(timeout);
1210 long remainingNanos = nanos;
1211 long startNanos = System.nanoTime();
1212 boolean acquired = false;
1213 while (!acquired && (remainingNanos > 0)) {
1214 try {
1215 acquired = lock.tryLock(remainingNanos, NANOSECONDS);
1216 } catch (InterruptedException e) {
1217 interrupted = true;
1218 }
1219 remainingNanos = nanos - (System.nanoTime() - startNanos);
1220 }
1221 if (acquired && remainingNanos <= 0L)
1222 lock.unlock(); // release lock if timeout has expired
1223 if (interrupted)
1224 Thread.currentThread().interrupt();
1225 return remainingNanos;
1226 }
1227
1228 /**
1229 * Creates a SocketException from the given exception.
1230 */
1231 private static SocketException asSocketException(Exception e) {
1232 if (e instanceof SocketException se) {
1233 return se;
1234 } else {
1235 var se = new SocketException(e.getMessage());
1236 se.setStackTrace(e.getStackTrace());
1237 return se;
1238 }
1239 }
1240
1241 /**
1242 * Returns the socket protocol family.
1243 */
1244 private static ProtocolFamily family() {
1245 if (Net.isIPv6Available()) {
1246 return StandardProtocolFamily.INET6;
1247 } else {
1248 return StandardProtocolFamily.INET;
1249 }
1250 }
1251
1252 /**
1253 * Return the file descriptor value.
1254 */
1255 private static int fdVal(FileDescriptor fd) {
1256 return JIOFDA.get(fd);
1257 }
1258
1259 private static final JavaIOFileDescriptorAccess JIOFDA = SharedSecrets.getJavaIOFileDescriptorAccess();
1260 }