1 /*
2 * Copyright (c) 2019, 2025, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package sun.nio.ch;
27
28 import java.io.FileDescriptor;
29 import java.io.IOException;
30 import java.io.InputStream;
31 import java.io.InterruptedIOException;
32 import java.io.OutputStream;
33 import java.io.UncheckedIOException;
34 import java.lang.ref.Cleaner.Cleanable;
35 import java.net.InetAddress;
36 import java.net.InetSocketAddress;
37 import java.net.ProtocolFamily;
38 import java.net.SocketAddress;
39 import java.net.SocketException;
40 import java.net.SocketImpl;
41 import java.net.SocketOption;
42 import java.net.SocketTimeoutException;
43 import java.net.StandardProtocolFamily;
44 import java.net.StandardSocketOptions;
45 import java.net.UnknownHostException;
46 import java.nio.ByteBuffer;
47 import java.util.Collections;
48 import java.util.HashSet;
49 import java.util.Objects;
50 import java.util.Set;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.locks.ReentrantLock;
53
54 import jdk.internal.access.JavaIOFileDescriptorAccess;
55 import jdk.internal.access.SharedSecrets;
56 import jdk.internal.ref.CleanerFactory;
57 import sun.net.ConnectionResetException;
58 import sun.net.NetHooks;
59 import sun.net.PlatformSocketImpl;
60 import sun.net.ext.ExtendedSocketOptions;
61 import jdk.internal.util.Exceptions;
62
63 import static java.util.concurrent.TimeUnit.MILLISECONDS;
64 import static java.util.concurrent.TimeUnit.NANOSECONDS;
65 import static jdk.internal.util.Exceptions.filterNonSocketInfo;
66 import static jdk.internal.util.Exceptions.formatMsg;
67
68 /**
69 * NIO based SocketImpl.
70 *
71 * The underlying socket used by this SocketImpl is initially configured blocking.
72 * If a connect, accept or read is attempted with a timeout, or a virtual
73 * thread invokes a blocking operation, then the socket is changed to non-blocking
74 * When in non-blocking mode, operations that don't complete immediately will
75 * poll the socket (or park when invoked on a virtual thread) and preserve
76 * the semantics of blocking operations.
77 */
78
79 public final class NioSocketImpl extends SocketImpl implements PlatformSocketImpl {
80 private static final NativeDispatcher nd = new SocketDispatcher();
81
82 // The maximum number of bytes to read/write per syscall to avoid needing
83 // a huge buffer from the temporary buffer cache
84 private static final int MAX_BUFFER_SIZE = 128 * 1024;
85
86 // true if this is a SocketImpl for a ServerSocket
87 private final boolean server;
88
89 // Lock held when reading (also used when accepting or connecting)
90 private final ReentrantLock readLock = new ReentrantLock();
91
92 // Lock held when writing
93 private final ReentrantLock writeLock = new ReentrantLock();
94
95 // The stateLock for read/changing state
96 private final Object stateLock = new Object();
97 private static final int ST_NEW = 0;
98 private static final int ST_UNCONNECTED = 1;
99 private static final int ST_CONNECTING = 2;
100 private static final int ST_CONNECTED = 3;
101 private static final int ST_CLOSING = 4;
102 private static final int ST_CLOSED = 5;
103 private volatile int state; // need stateLock to change
104
105 private Cleanable cleaner;
106
107 // set to true when the socket is in non-blocking mode
108 private volatile boolean nonBlocking;
109
110 // used by connect/read/write/accept, protected by stateLock
111 private Thread readerThread;
112 private Thread writerThread;
113
114 // used when SO_REUSEADDR is emulated, protected by stateLock
115 private boolean isReuseAddress;
116
117 // read or accept timeout in millis
118 private volatile int timeout;
119
120 // flags to indicate if the connection is shutdown for input and output
121 private volatile boolean isInputClosed;
122 private volatile boolean isOutputClosed;
123
124 // used by read to emulate legacy behavior, protected by readLock
125 private boolean readEOF;
126 private boolean connectionReset;
127
128 /**
129 * Creates an instance of this SocketImpl.
130 * @param server true if this is a SocketImpl for a ServerSocket
131 */
132 public NioSocketImpl(boolean server) {
133 this.server = server;
134 }
135
136 /**
137 * Returns true if the socket is open.
138 */
139 private boolean isOpen() {
140 return state < ST_CLOSING;
141 }
142
143 /**
144 * Throws SocketException if the socket is not open.
145 */
146 private void ensureOpen() throws SocketException {
147 int state = this.state;
148 if (state == ST_NEW)
149 throw new SocketException("Socket not created");
150 if (state >= ST_CLOSING)
151 throw new SocketException("Socket closed");
152 }
153
154 /**
155 * Throws SocketException if the socket is not open and connected.
156 */
157 private void ensureOpenAndConnected() throws SocketException {
158 int state = this.state;
159 if (state < ST_CONNECTED)
160 throw new SocketException("Not connected");
161 if (state > ST_CONNECTED)
162 throw new SocketException("Socket closed");
163 }
164
165 /**
166 * Disables the current thread for scheduling purposes until the
167 * socket is ready for I/O or is asynchronously closed, for up to the
168 * specified waiting time.
169 * @throws IOException if an I/O error occurs
170 */
171 private void park(FileDescriptor fd, int event, long nanos) throws IOException {
172 Thread t = Thread.currentThread();
173 if (t.isVirtual()) {
174 Poller.poll(fdVal(fd), event, nanos, this::isOpen);
175 if (t.isInterrupted()) {
176 throw new InterruptedIOException();
177 }
178 } else {
179 long millis;
180 if (nanos == 0) {
181 millis = -1;
182 } else {
183 millis = NANOSECONDS.toMillis(nanos);
184 if (nanos > MILLISECONDS.toNanos(millis)) {
185 // Round up any excess nanos to the nearest millisecond to
186 // avoid parking for less than requested.
187 millis++;
188 }
189 }
190 Net.poll(fd, event, millis);
191 }
192 }
193
194 /**
195 * Disables the current thread for scheduling purposes until the socket is
196 * ready for I/O or is asynchronously closed.
197 * @throws IOException if an I/O error occurs
198 */
199 private void park(FileDescriptor fd, int event) throws IOException {
200 park(fd, event, 0);
201 }
202
203 /**
204 * Ensures that the socket is configured non-blocking invoked on a virtual
205 * thread or the operation has a timeout
206 * @throws IOException if there is an I/O error changing the blocking mode
207 */
208 private void configureNonBlockingIfNeeded(FileDescriptor fd, boolean timed)
209 throws IOException
210 {
211 if (!nonBlocking
212 && (timed || Thread.currentThread().isVirtual())) {
213 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
214 IOUtil.configureBlocking(fd, false);
215 nonBlocking = true;
216 }
217 }
218
219 /**
220 * Marks the beginning of a read operation that might block.
221 * @throws SocketException if the socket is closed or not connected
222 */
223 private FileDescriptor beginRead() throws SocketException {
224 synchronized (stateLock) {
225 ensureOpenAndConnected();
226 readerThread = NativeThread.threadToSignal();
227 return fd;
228 }
229 }
230
231 /**
232 * Marks the end of a read operation that may have blocked.
233 * @throws SocketException is the socket is closed
234 */
235 private void endRead(boolean completed) throws SocketException {
236 synchronized (stateLock) {
237 readerThread = null;
238 int state = this.state;
239 if (state == ST_CLOSING)
240 tryFinishClose();
241 if (!completed && state >= ST_CLOSING)
242 throw new SocketException("Socket closed");
243 }
244 }
245
246 /**
247 * Attempts to read bytes from the socket into the given byte array.
248 */
249 private int tryRead(FileDescriptor fd, byte[] b, int off, int len)
250 throws IOException
251 {
252 ByteBuffer dst = Util.getTemporaryDirectBuffer(len);
253 assert dst.position() == 0;
254 try {
255 int n = nd.read(fd, ((DirectBuffer)dst).address(), len);
256 if (n > 0) {
257 dst.get(b, off, n);
258 }
259 return n;
260 } finally {
261 Util.offerFirstTemporaryDirectBuffer(dst);
262 }
263 }
264
265 /**
266 * Reads bytes from the socket into the given byte array with a timeout.
267 * @throws SocketTimeoutException if the read timeout elapses
268 */
269 private int timedRead(FileDescriptor fd, byte[] b, int off, int len, long nanos)
270 throws IOException
271 {
272 long startNanos = System.nanoTime();
273 int n = tryRead(fd, b, off, len);
274 while (n == IOStatus.UNAVAILABLE && isOpen()) {
275 long remainingNanos = nanos - (System.nanoTime() - startNanos);
276 if (remainingNanos <= 0) {
277 throw new SocketTimeoutException("Read timed out");
278 }
279 park(fd, Net.POLLIN, remainingNanos);
280 n = tryRead(fd, b, off, len);
281 }
282 return n;
283 }
284
285 /**
286 * Reads bytes from the socket into the given byte array.
287 * @return the number of bytes read or -1 at EOF
288 * @throws SocketException if the socket is closed or a socket I/O error occurs
289 * @throws SocketTimeoutException if the read timeout elapses
290 */
291 private int implRead(byte[] b, int off, int len, long remainingNanos) throws IOException {
292 int n = 0;
293 FileDescriptor fd = beginRead();
294 try {
295 if (connectionReset)
296 throw new SocketException("Connection reset");
297 if (isInputClosed)
298 return -1;
299
300 // experimental
301 if (Poller.supportReadOps() && Thread.currentThread().isVirtual()) {
302 n = Poller.read(fdVal(fd), b, off, len, remainingNanos, this::isOpen);
303 if (n != IOStatus.UNAVAILABLE) return n;
304 }
305
306 configureNonBlockingIfNeeded(fd, remainingNanos > 0);
307 if (remainingNanos > 0) {
308 // read with timeout
309 n = timedRead(fd, b, off, len, remainingNanos);
310 } else {
311 // read, no timeout
312 n = tryRead(fd, b, off, len);
313 while (IOStatus.okayToRetry(n) && isOpen()) {
314 park(fd, Net.POLLIN);
315 n = tryRead(fd, b, off, len);
316 }
317 }
318 return n;
319 } catch (InterruptedIOException e) {
320 throw e;
321 } catch (ConnectionResetException e) {
322 connectionReset = true;
323 throw new SocketException("Connection reset");
324 } catch (IOException ioe) {
325 // throw SocketException to maintain compatibility
326 throw asSocketException(ioe);
327 } finally {
328 endRead(n > 0);
329 }
330 }
331
332 /**
333 * Reads bytes from the socket into the given byte array.
334 * @return the number of bytes read or -1 at EOF
335 * @throws IndexOutOfBoundsException if the bound checks fail
336 * @throws SocketException if the socket is closed or a socket I/O error occurs
337 * @throws SocketTimeoutException if the read timeout elapses
338 */
339 private int read(byte[] b, int off, int len) throws IOException {
340 Objects.checkFromIndexSize(off, len, b.length);
341 if (len == 0) {
342 return 0;
343 } else {
344 long remainingNanos = 0;
345 int timeout = this.timeout;
346 if (timeout > 0) {
347 remainingNanos = tryLock(readLock, timeout, MILLISECONDS);
348 if (remainingNanos <= 0) {
349 assert !readLock.isHeldByCurrentThread();
350 throw new SocketTimeoutException("Read timed out");
351 }
352 } else {
353 readLock.lock();
354 }
355 try {
356 // emulate legacy behavior to return -1, even if socket is closed
357 if (readEOF)
358 return -1;
359 // read up to MAX_BUFFER_SIZE bytes
360 int size = Math.min(len, MAX_BUFFER_SIZE);
361 int n = implRead(b, off, size, remainingNanos);
362 if (n == -1)
363 readEOF = true;
364 return n;
365 } finally {
366 readLock.unlock();
367 }
368 }
369 }
370
371 /**
372 * Marks the beginning of a write operation that might block.
373 * @throws SocketException if the socket is closed or not connected
374 */
375 private FileDescriptor beginWrite() throws SocketException {
376 synchronized (stateLock) {
377 ensureOpenAndConnected();
378 writerThread = NativeThread.threadToSignal();
379 return fd;
380 }
381 }
382
383 /**
384 * Marks the end of a write operation that may have blocked.
385 * @throws SocketException is the socket is closed
386 */
387 private void endWrite(boolean completed) throws SocketException {
388 synchronized (stateLock) {
389 writerThread = null;
390 int state = this.state;
391 if (state == ST_CLOSING)
392 tryFinishClose();
393 if (!completed && state >= ST_CLOSING)
394 throw new SocketException("Socket closed");
395 }
396 }
397
398 /**
399 * Attempts to write a sequence of bytes to the socket from the given
400 * byte array.
401 */
402 private int tryWrite(FileDescriptor fd, byte[] b, int off, int len)
403 throws IOException
404 {
405 ByteBuffer src = Util.getTemporaryDirectBuffer(len);
406 assert src.position() == 0;
407 try {
408 src.put(b, off, len);
409 return nd.write(fd, ((DirectBuffer)src).address(), len);
410 } finally {
411 Util.offerFirstTemporaryDirectBuffer(src);
412 }
413 }
414
415 /**
416 * Writes a sequence of bytes to the socket from the given byte array.
417 * @return the number of bytes written
418 * @throws SocketException if the socket is closed or a socket I/O error occurs
419 */
420 private int implWrite(byte[] b, int off, int len) throws IOException {
421 int n = 0;
422 FileDescriptor fd = beginWrite();
423 try {
424
425 // experimental
426 if (Poller.supportWriteOps() && Thread.currentThread().isVirtual()) {
427 n = Poller.write(fdVal(fd), b, off, len, this::isOpen);
428 if (n != IOStatus.UNAVAILABLE) return n;
429 }
430
431 configureNonBlockingIfNeeded(fd, false);
432 n = tryWrite(fd, b, off, len);
433 while (IOStatus.okayToRetry(n) && isOpen()) {
434 park(fd, Net.POLLOUT);
435 n = tryWrite(fd, b, off, len);
436 }
437 return n;
438 } catch (InterruptedIOException e) {
439 throw e;
440 } catch (IOException ioe) {
441 // throw SocketException to maintain compatibility
442 throw asSocketException(ioe);
443 } finally {
444 endWrite(n > 0);
445 }
446 }
447
448 /**
449 * Writes a sequence of bytes to the socket from the given byte array.
450 * @throws SocketException if the socket is closed or a socket I/O error occurs
451 */
452 private void write(byte[] b, int off, int len) throws IOException {
453 Objects.checkFromIndexSize(off, len, b.length);
454 if (len > 0) {
455 writeLock.lock();
456 try {
457 int pos = off;
458 int end = off + len;
459 while (pos < end) {
460 // write up to MAX_BUFFER_SIZE bytes
461 int size = Math.min((end - pos), MAX_BUFFER_SIZE);
462 int n = implWrite(b, pos, size);
463 pos += n;
464 }
465 } finally {
466 writeLock.unlock();
467 }
468 }
469 }
470
471 /**
472 * Creates the socket.
473 * @param stream {@code true} for a streams socket
474 */
475 @Override
476 protected void create(boolean stream) throws IOException {
477 if (!stream) {
478 throw new IOException("Datagram socket creation not supported");
479 }
480 synchronized (stateLock) {
481 if (state != ST_NEW)
482 throw new IOException("Already created");
483 FileDescriptor fd;
484 if (server) {
485 fd = Net.serverSocket();
486 } else {
487 fd = Net.socket();
488 }
489 Runnable closer = closerFor(fd);
490 this.fd = fd;
491 this.cleaner = CleanerFactory.cleaner().register(this, closer);
492 this.state = ST_UNCONNECTED;
493 }
494 }
495
496 /**
497 * Marks the beginning of a connect operation that might block.
498 * @throws SocketException if the socket is closed or already connected
499 */
500 private FileDescriptor beginConnect(InetAddress address, int port)
501 throws IOException
502 {
503 synchronized (stateLock) {
504 int state = this.state;
505 if (state != ST_UNCONNECTED) {
506 if (state == ST_NEW)
507 throw new SocketException("Not created");
508 if (state == ST_CONNECTING)
509 throw new SocketException("Connection in progress");
510 if (state == ST_CONNECTED)
511 throw new SocketException("Already connected");
512 if (state >= ST_CLOSING)
513 throw new SocketException("Socket closed");
514 assert false;
515 }
516 this.state = ST_CONNECTING;
517
518 // invoke beforeTcpConnect hook if not already bound
519 if (localport == 0) {
520 NetHooks.beforeTcpConnect(fd, address, port);
521 }
522
523 // save the remote address/port
524 this.address = address;
525 this.port = port;
526
527 readerThread = NativeThread.threadToSignal();
528 return fd;
529 }
530 }
531
532 /**
533 * Marks the end of a connect operation that may have blocked.
534 * @throws SocketException is the socket is closed
535 */
536 private void endConnect(FileDescriptor fd, boolean completed) throws IOException {
537 synchronized (stateLock) {
538 readerThread = null;
539 int state = this.state;
540 if (state == ST_CLOSING)
541 tryFinishClose();
542 if (completed && state == ST_CONNECTING) {
543 this.state = ST_CONNECTED;
544 localport = Net.localAddress(fd).getPort();
545 } else if (!completed && state >= ST_CLOSING) {
546 throw new SocketException("Socket closed");
547 }
548 }
549 }
550
551 /**
552 * Waits for a connection attempt to finish with a timeout
553 * @throws SocketTimeoutException if the connect timeout elapses
554 */
555 private boolean timedFinishConnect(FileDescriptor fd, long nanos) throws IOException {
556 long startNanos = System.nanoTime();
557 boolean polled = Net.pollConnectNow(fd);
558 while (!polled && isOpen()) {
559 long remainingNanos = nanos - (System.nanoTime() - startNanos);
560 if (remainingNanos <= 0) {
561 throw new SocketTimeoutException("Connect timed out");
562 }
563 park(fd, Net.POLLOUT, remainingNanos);
564 polled = Net.pollConnectNow(fd);
565 }
566 return polled && isOpen();
567 }
568
569 /**
570 * Attempts to establish a connection to the given socket address with a
571 * timeout. Closes the socket if connection cannot be established.
572 * @throws IOException if the address is not a resolved InetSocketAddress or
573 * the connection cannot be established
574 */
575 @Override
576 protected void connect(SocketAddress remote, int millis) throws IOException {
577 // SocketImpl connect only specifies IOException
578 if (!(remote instanceof InetSocketAddress))
579 throw new IOException("Unsupported address type");
580 InetSocketAddress isa = (InetSocketAddress) remote;
581 if (isa.isUnresolved()) {
582 throw new UnknownHostException(
583 formatMsg(filterNonSocketInfo(isa.getHostName())));
584 }
585
586 InetAddress address = isa.getAddress();
587 if (address.isAnyLocalAddress())
588 address = InetAddress.getLocalHost();
589 int port = isa.getPort();
590
591 ReentrantLock connectLock = readLock;
592 try {
593 connectLock.lock();
594 try {
595 boolean connected = false;
596 FileDescriptor fd = beginConnect(address, port);
597 try {
598 configureNonBlockingIfNeeded(fd, millis > 0);
599 int n = Net.connect(fd, address, port);
600 if (n > 0) {
601 // connection established
602 connected = true;
603 } else {
604 assert IOStatus.okayToRetry(n);
605 if (millis > 0) {
606 // finish connect with timeout
607 long nanos = MILLISECONDS.toNanos(millis);
608 connected = timedFinishConnect(fd, nanos);
609 } else {
610 // finish connect, no timeout
611 boolean polled = false;
612 while (!polled && isOpen()) {
613 park(fd, Net.POLLOUT);
614 polled = Net.pollConnectNow(fd);
615 }
616 connected = polled && isOpen();
617 }
618 }
619 } finally {
620 endConnect(fd, connected);
621 }
622 } finally {
623 connectLock.unlock();
624 }
625 } catch (IOException ioe) {
626 close();
627 if (ioe instanceof SocketTimeoutException) {
628 throw ioe;
629 } else if (ioe instanceof InterruptedIOException) {
630 assert Thread.currentThread().isVirtual();
631 throw new SocketException("Closed by interrupt");
632 } else {
633 throw Exceptions.ioException(ioe, isa);
634 }
635 }
636 }
637
638 @Override
639 protected void connect(String host, int port) throws IOException {
640 connect(new InetSocketAddress(host, port), 0);
641 }
642
643 @Override
644 protected void connect(InetAddress address, int port) throws IOException {
645 connect(new InetSocketAddress(address, port), 0);
646 }
647
648 @Override
649 protected void bind(InetAddress host, int port) throws IOException {
650 synchronized (stateLock) {
651 ensureOpen();
652 if (localport != 0)
653 throw new SocketException("Already bound");
654 NetHooks.beforeTcpBind(fd, host, port);
655 Net.bind(fd, host, port);
656 // set the address field to the given host address to
657 // maintain long standing behavior. When binding to 0.0.0.0
658 // then the actual local address will be ::0 when IPv6 is enabled.
659 address = host;
660 localport = Net.localAddress(fd).getPort();
661 }
662 }
663
664 @Override
665 protected void listen(int backlog) throws IOException {
666 synchronized (stateLock) {
667 ensureOpen();
668 if (localport == 0)
669 throw new SocketException("Not bound");
670 Net.listen(fd, backlog < 1 ? 50 : backlog);
671 }
672 }
673
674 /**
675 * Marks the beginning of an accept operation that might block.
676 * @throws SocketException if the socket is closed
677 */
678 private FileDescriptor beginAccept() throws SocketException {
679 synchronized (stateLock) {
680 ensureOpen();
681 if (localport == 0)
682 throw new SocketException("Not bound");
683 readerThread = NativeThread.threadToSignal();
684 return fd;
685 }
686 }
687
688 /**
689 * Marks the end of an accept operation that may have blocked.
690 * @throws SocketException is the socket is closed
691 */
692 private void endAccept(boolean completed) throws SocketException {
693 synchronized (stateLock) {
694 int state = this.state;
695 readerThread = null;
696 if (state == ST_CLOSING)
697 tryFinishClose();
698 if (!completed && state >= ST_CLOSING)
699 throw new SocketException("Socket closed");
700 }
701 }
702
703 /**
704 * Accepts a new connection with a timeout.
705 * @throws SocketTimeoutException if the accept timeout elapses
706 */
707 private int timedAccept(FileDescriptor fd,
708 FileDescriptor newfd,
709 InetSocketAddress[] isaa,
710 long nanos)
711 throws IOException
712 {
713 long startNanos = System.nanoTime();
714 int n = Net.accept(fd, newfd, isaa);
715 while (n == IOStatus.UNAVAILABLE && isOpen()) {
716 long remainingNanos = nanos - (System.nanoTime() - startNanos);
717 if (remainingNanos <= 0) {
718 throw new SocketTimeoutException("Accept timed out");
719 }
720 park(fd, Net.POLLIN, remainingNanos);
721 n = Net.accept(fd, newfd, isaa);
722 }
723 return n;
724 }
725
726 /**
727 * Accepts a new connection so that the given SocketImpl is connected to
728 * the peer. The SocketImpl must be a newly created NioSocketImpl.
729 */
730 @Override
731 protected void accept(SocketImpl si) throws IOException {
732 NioSocketImpl nsi = (NioSocketImpl) si;
733 if (nsi.state != ST_NEW)
734 throw new SocketException("Not a newly created SocketImpl");
735
736 FileDescriptor newfd = new FileDescriptor();
737 InetSocketAddress[] isaa = new InetSocketAddress[1];
738
739 // acquire the lock, adjusting the timeout for cases where several
740 // threads are accepting connections and there is a timeout set
741 ReentrantLock acceptLock = readLock;
742 int timeout = this.timeout;
743 long remainingNanos = 0;
744 if (timeout > 0) {
745 remainingNanos = tryLock(acceptLock, timeout, MILLISECONDS);
746 if (remainingNanos <= 0) {
747 assert !acceptLock.isHeldByCurrentThread();
748 throw new SocketTimeoutException("Accept timed out");
749 }
750 } else {
751 acceptLock.lock();
752 }
753
754 // accept a connection
755 try {
756 int n = 0;
757 FileDescriptor fd = beginAccept();
758 try {
759 configureNonBlockingIfNeeded(fd, remainingNanos > 0);
760 if (remainingNanos > 0) {
761 // accept with timeout
762 n = timedAccept(fd, newfd, isaa, remainingNanos);
763 } else {
764 // accept, no timeout
765 n = Net.accept(fd, newfd, isaa);
766 while (IOStatus.okayToRetry(n) && isOpen()) {
767 park(fd, Net.POLLIN);
768 n = Net.accept(fd, newfd, isaa);
769 }
770 }
771 } finally {
772 endAccept(n > 0);
773 assert IOStatus.check(n);
774 }
775 } finally {
776 acceptLock.unlock();
777 }
778
779 // get local address and configure accepted socket to blocking mode
780 InetSocketAddress localAddress;
781 try {
782 localAddress = Net.localAddress(newfd);
783 IOUtil.configureBlocking(newfd, true);
784 } catch (IOException ioe) {
785 nd.close(newfd);
786 throw ioe;
787 }
788
789 // set the fields
790 Runnable closer = closerFor(newfd);
791 synchronized (nsi.stateLock) {
792 nsi.fd = newfd;
793 nsi.cleaner = CleanerFactory.cleaner().register(nsi, closer);
794 nsi.localport = localAddress.getPort();
795 nsi.address = isaa[0].getAddress();
796 nsi.port = isaa[0].getPort();
797 nsi.state = ST_CONNECTED;
798 }
799 }
800
801 @Override
802 protected InputStream getInputStream() {
803 return new InputStream() {
804 @Override
805 public int read() throws IOException {
806 byte[] a = new byte[1];
807 int n = read(a, 0, 1);
808 return (n > 0) ? (a[0] & 0xff) : -1;
809 }
810 @Override
811 public int read(byte[] b, int off, int len) throws IOException {
812 return NioSocketImpl.this.read(b, off, len);
813 }
814 @Override
815 public int available() throws IOException {
816 return NioSocketImpl.this.available();
817 }
818 @Override
819 public void close() throws IOException {
820 NioSocketImpl.this.close();
821 }
822 };
823 }
824
825 @Override
826 protected OutputStream getOutputStream() {
827 return new OutputStream() {
828 @Override
829 public void write(int b) throws IOException {
830 byte[] a = new byte[]{(byte) b};
831 write(a, 0, 1);
832 }
833 @Override
834 public void write(byte[] b, int off, int len) throws IOException {
835 NioSocketImpl.this.write(b, off, len);
836 }
837 @Override
838 public void close() throws IOException {
839 NioSocketImpl.this.close();
840 }
841 };
842 }
843
844 @Override
845 protected int available() throws IOException {
846 synchronized (stateLock) {
847 ensureOpenAndConnected();
848 if (isInputClosed) {
849 return 0;
850 } else {
851 return Net.available(fd);
852 }
853 }
854 }
855
856 /**
857 * Closes the socket if there are no I/O operations in progress.
858 */
859 private boolean tryClose() throws IOException {
860 assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
861 if (readerThread == null && writerThread == null) {
862 try {
863 cleaner.clean();
864 } catch (UncheckedIOException ioe) {
865 throw ioe.getCause();
866 } finally {
867 state = ST_CLOSED;
868 }
869 return true;
870 } else {
871 return false;
872 }
873 }
874
875 /**
876 * Invokes tryClose to attempt to close the socket.
877 *
878 * This method is used for deferred closing by I/O operations.
879 */
880 private void tryFinishClose() {
881 try {
882 tryClose();
883 } catch (IOException ignore) { }
884 }
885
886 /**
887 * Closes the socket. If there are I/O operations in progress then the
888 * socket is pre-closed and the threads are signalled. The socket will be
889 * closed when the last I/O operation aborts.
890 */
891 @Override
892 protected void close() throws IOException {
893 synchronized (stateLock) {
894 int state = this.state;
895 if (state >= ST_CLOSING)
896 return;
897 if (state == ST_NEW) {
898 // stillborn
899 this.state = ST_CLOSED;
900 return;
901 }
902 boolean connected = (state == ST_CONNECTED);
903 this.state = ST_CLOSING;
904
905 // shutdown output when linger interval not set to 0
906 if (connected) {
907 try {
908 var SO_LINGER = StandardSocketOptions.SO_LINGER;
909 if ((int) Net.getSocketOption(fd, SO_LINGER) != 0) {
910 Net.shutdown(fd, Net.SHUT_WR);
911 }
912 } catch (IOException ignore) { }
913 }
914
915 // attempt to close the socket. If there are I/O operations in progress
916 // then the socket is pre-closed and the thread(s) signalled. The
917 // last thread will close the file descriptor.
918 if (!tryClose()) {
919 nd.preClose(fd, readerThread, writerThread);
920 }
921 }
922 }
923
924 // the socket options supported by client and server sockets
925 private static volatile Set<SocketOption<?>> clientSocketOptions;
926 private static volatile Set<SocketOption<?>> serverSocketOptions;
927
928 @Override
929 protected Set<SocketOption<?>> supportedOptions() {
930 Set<SocketOption<?>> options = (server) ? serverSocketOptions : clientSocketOptions;
931 if (options == null) {
932 options = new HashSet<>();
933 options.add(StandardSocketOptions.SO_RCVBUF);
934 options.add(StandardSocketOptions.SO_REUSEADDR);
935 if (server) {
936 // IP_TOS added for server socket to maintain compatibility
937 options.add(StandardSocketOptions.IP_TOS);
938 options.addAll(ExtendedSocketOptions.serverSocketOptions());
939 } else {
940 options.add(StandardSocketOptions.IP_TOS);
941 options.add(StandardSocketOptions.SO_KEEPALIVE);
942 options.add(StandardSocketOptions.SO_SNDBUF);
943 options.add(StandardSocketOptions.SO_LINGER);
944 options.add(StandardSocketOptions.TCP_NODELAY);
945 options.addAll(ExtendedSocketOptions.clientSocketOptions());
946 }
947 if (Net.isReusePortAvailable())
948 options.add(StandardSocketOptions.SO_REUSEPORT);
949 options = Collections.unmodifiableSet(options);
950 if (server) {
951 serverSocketOptions = options;
952 } else {
953 clientSocketOptions = options;
954 }
955 }
956 return options;
957 }
958
959 @Override
960 protected <T> void setOption(SocketOption<T> opt, T value) throws IOException {
961 if (!supportedOptions().contains(opt))
962 throw new UnsupportedOperationException("'" + opt + "' not supported");
963 if (!opt.type().isInstance(value))
964 throw new IllegalArgumentException("Invalid value '" + value + "'");
965 synchronized (stateLock) {
966 ensureOpen();
967 if (opt == StandardSocketOptions.IP_TOS) {
968 // maps to IPV6_TCLASS and/or IP_TOS
969 Net.setIpSocketOption(fd, family(), opt, value);
970 } else if (opt == StandardSocketOptions.SO_REUSEADDR) {
971 boolean b = (boolean) value;
972 if (Net.useExclusiveBind()) {
973 isReuseAddress = b;
974 } else {
975 Net.setSocketOption(fd, opt, b);
976 }
977 } else {
978 // option does not need special handling
979 Net.setSocketOption(fd, opt, value);
980 }
981 }
982 }
983
984 @SuppressWarnings("unchecked")
985 protected <T> T getOption(SocketOption<T> opt) throws IOException {
986 if (!supportedOptions().contains(opt))
987 throw new UnsupportedOperationException("'" + opt + "' not supported");
988 synchronized (stateLock) {
989 ensureOpen();
990 if (opt == StandardSocketOptions.IP_TOS) {
991 return (T) Net.getSocketOption(fd, family(), opt);
992 } else if (opt == StandardSocketOptions.SO_REUSEADDR) {
993 if (Net.useExclusiveBind()) {
994 return (T) Boolean.valueOf(isReuseAddress);
995 } else {
996 return (T) Net.getSocketOption(fd, opt);
997 }
998 } else {
999 // option does not need special handling
1000 return (T) Net.getSocketOption(fd, opt);
1001 }
1002 }
1003 }
1004
1005 private boolean booleanValue(Object value, String desc) throws SocketException {
1006 if (!(value instanceof Boolean))
1007 throw new SocketException("Bad value for " + desc);
1008 return (boolean) value;
1009 }
1010
1011 private int intValue(Object value, String desc) throws SocketException {
1012 if (!(value instanceof Integer))
1013 throw new SocketException("Bad value for " + desc);
1014 return (int) value;
1015 }
1016
1017 @Override
1018 public void setOption(int opt, Object value) throws SocketException {
1019 synchronized (stateLock) {
1020 ensureOpen();
1021 try {
1022 switch (opt) {
1023 case SO_LINGER: {
1024 // the value is "false" to disable, or linger interval to enable
1025 int i;
1026 if (value instanceof Boolean && ((boolean) value) == false) {
1027 i = -1;
1028 } else {
1029 i = intValue(value, "SO_LINGER");
1030 }
1031 Net.setSocketOption(fd, StandardSocketOptions.SO_LINGER, i);
1032 break;
1033 }
1034 case SO_TIMEOUT: {
1035 int i = intValue(value, "SO_TIMEOUT");
1036 if (i < 0)
1037 throw new IllegalArgumentException("timeout < 0");
1038 timeout = i;
1039 break;
1040 }
1041 case IP_TOS: {
1042 int i = intValue(value, "IP_TOS");
1043 Net.setIpSocketOption(fd, family(), StandardSocketOptions.IP_TOS, i);
1044 break;
1045 }
1046 case TCP_NODELAY: {
1047 boolean b = booleanValue(value, "TCP_NODELAY");
1048 Net.setSocketOption(fd, StandardSocketOptions.TCP_NODELAY, b);
1049 break;
1050 }
1051 case SO_SNDBUF: {
1052 int i = intValue(value, "SO_SNDBUF");
1053 if (i <= 0)
1054 throw new SocketException("SO_SNDBUF <= 0");
1055 Net.setSocketOption(fd, StandardSocketOptions.SO_SNDBUF, i);
1056 break;
1057 }
1058 case SO_RCVBUF: {
1059 int i = intValue(value, "SO_RCVBUF");
1060 if (i <= 0)
1061 throw new SocketException("SO_RCVBUF <= 0");
1062 Net.setSocketOption(fd, StandardSocketOptions.SO_RCVBUF, i);
1063 break;
1064 }
1065 case SO_KEEPALIVE: {
1066 boolean b = booleanValue(value, "SO_KEEPALIVE");
1067 Net.setSocketOption(fd, StandardSocketOptions.SO_KEEPALIVE, b);
1068 break;
1069 }
1070 case SO_OOBINLINE: {
1071 boolean b = booleanValue(value, "SO_OOBINLINE");
1072 Net.setSocketOption(fd, ExtendedSocketOption.SO_OOBINLINE, b);
1073 break;
1074 }
1075 case SO_REUSEADDR: {
1076 boolean b = booleanValue(value, "SO_REUSEADDR");
1077 if (Net.useExclusiveBind()) {
1078 isReuseAddress = b;
1079 } else {
1080 Net.setSocketOption(fd, StandardSocketOptions.SO_REUSEADDR, b);
1081 }
1082 break;
1083 }
1084 case SO_REUSEPORT: {
1085 if (!Net.isReusePortAvailable())
1086 throw new SocketException("SO_REUSEPORT not supported");
1087 boolean b = booleanValue(value, "SO_REUSEPORT");
1088 Net.setSocketOption(fd, StandardSocketOptions.SO_REUSEPORT, b);
1089 break;
1090 }
1091 default:
1092 throw new SocketException("Unknown option " + opt);
1093 }
1094 } catch (SocketException e) {
1095 throw e;
1096 } catch (IllegalArgumentException | IOException e) {
1097 throw asSocketException(e);
1098 }
1099 }
1100 }
1101
1102 @Override
1103 public Object getOption(int opt) throws SocketException {
1104 synchronized (stateLock) {
1105 ensureOpen();
1106 try {
1107 switch (opt) {
1108 case SO_TIMEOUT:
1109 return timeout;
1110 case TCP_NODELAY:
1111 return Net.getSocketOption(fd, StandardSocketOptions.TCP_NODELAY);
1112 case SO_OOBINLINE:
1113 return Net.getSocketOption(fd, ExtendedSocketOption.SO_OOBINLINE);
1114 case SO_LINGER: {
1115 // return "false" when disabled, linger interval when enabled
1116 int i = (int) Net.getSocketOption(fd, StandardSocketOptions.SO_LINGER);
1117 if (i == -1) {
1118 return Boolean.FALSE;
1119 } else {
1120 return i;
1121 }
1122 }
1123 case SO_REUSEADDR:
1124 if (Net.useExclusiveBind()) {
1125 return isReuseAddress;
1126 } else {
1127 return Net.getSocketOption(fd, StandardSocketOptions.SO_REUSEADDR);
1128 }
1129 case SO_BINDADDR:
1130 return Net.localAddress(fd).getAddress();
1131 case SO_SNDBUF:
1132 return Net.getSocketOption(fd, StandardSocketOptions.SO_SNDBUF);
1133 case SO_RCVBUF:
1134 return Net.getSocketOption(fd, StandardSocketOptions.SO_RCVBUF);
1135 case IP_TOS:
1136 return Net.getSocketOption(fd, family(), StandardSocketOptions.IP_TOS);
1137 case SO_KEEPALIVE:
1138 return Net.getSocketOption(fd, StandardSocketOptions.SO_KEEPALIVE);
1139 case SO_REUSEPORT:
1140 if (!Net.isReusePortAvailable())
1141 throw new SocketException("SO_REUSEPORT not supported");
1142 return Net.getSocketOption(fd, StandardSocketOptions.SO_REUSEPORT);
1143 default:
1144 throw new SocketException("Unknown option " + opt);
1145 }
1146 } catch (SocketException e) {
1147 throw e;
1148 } catch (IllegalArgumentException | IOException e) {
1149 throw asSocketException(e);
1150 }
1151 }
1152 }
1153
1154 @Override
1155 protected void shutdownInput() throws IOException {
1156 synchronized (stateLock) {
1157 ensureOpenAndConnected();
1158 if (!isInputClosed) {
1159 Net.shutdown(fd, Net.SHUT_RD);
1160 if (readerThread != null && readerThread.isVirtual()) {
1161 Poller.stopPoll(readerThread);
1162 }
1163 isInputClosed = true;
1164 }
1165 }
1166 }
1167
1168 @Override
1169 protected void shutdownOutput() throws IOException {
1170 synchronized (stateLock) {
1171 ensureOpenAndConnected();
1172 if (!isOutputClosed) {
1173 Net.shutdown(fd, Net.SHUT_WR);
1174 if (writerThread != null && writerThread.isVirtual()) {
1175 Poller.stopPoll(writerThread);
1176 }
1177 isOutputClosed = true;
1178 }
1179 }
1180 }
1181
1182 @Override
1183 protected boolean supportsUrgentData() {
1184 return true;
1185 }
1186
1187 @Override
1188 protected void sendUrgentData(int data) throws IOException {
1189 writeLock.lock();
1190 try {
1191 int n = 0;
1192 FileDescriptor fd = beginWrite();
1193 try {
1194 configureNonBlockingIfNeeded(fd, false);
1195 do {
1196 n = Net.sendOOB(fd, (byte) data);
1197 } while (n == IOStatus.INTERRUPTED && isOpen());
1198 if (n == IOStatus.UNAVAILABLE) {
1199 throw new SocketException("No buffer space available");
1200 }
1201 } finally {
1202 endWrite(n > 0);
1203 }
1204 } finally {
1205 writeLock.unlock();
1206 }
1207 }
1208
1209 /**
1210 * Returns an action to close the given file descriptor.
1211 */
1212 private static Runnable closerFor(FileDescriptor fd) {
1213 return () -> {
1214 try {
1215 nd.close(fd);
1216 } catch (IOException ioe) {
1217 throw new UncheckedIOException(ioe);
1218 }
1219 };
1220 }
1221
1222 /**
1223 * Attempts to acquire the given lock within the given waiting time.
1224 * @return the remaining time in nanoseconds when the lock is acquired, zero
1225 * or less if the lock was not acquired before the timeout expired
1226 */
1227 private static long tryLock(ReentrantLock lock, long timeout, TimeUnit unit) {
1228 assert timeout > 0;
1229 boolean interrupted = false;
1230 long nanos = unit.toNanos(timeout);
1231 long remainingNanos = nanos;
1232 long startNanos = System.nanoTime();
1233 boolean acquired = false;
1234 while (!acquired && (remainingNanos > 0)) {
1235 try {
1236 acquired = lock.tryLock(remainingNanos, NANOSECONDS);
1237 } catch (InterruptedException e) {
1238 interrupted = true;
1239 }
1240 remainingNanos = nanos - (System.nanoTime() - startNanos);
1241 }
1242 if (acquired && remainingNanos <= 0L)
1243 lock.unlock(); // release lock if timeout has expired
1244 if (interrupted)
1245 Thread.currentThread().interrupt();
1246 return remainingNanos;
1247 }
1248
1249 /**
1250 * Creates a SocketException from the given exception.
1251 */
1252 private static SocketException asSocketException(Exception e) {
1253 if (e instanceof SocketException se) {
1254 return se;
1255 } else {
1256 var se = new SocketException(e.getMessage());
1257 se.setStackTrace(e.getStackTrace());
1258 return se;
1259 }
1260 }
1261
1262 /**
1263 * Returns the socket protocol family.
1264 */
1265 private static ProtocolFamily family() {
1266 if (Net.isIPv6Available()) {
1267 return StandardProtocolFamily.INET6;
1268 } else {
1269 return StandardProtocolFamily.INET;
1270 }
1271 }
1272
1273 /**
1274 * Return the file descriptor value.
1275 */
1276 private static int fdVal(FileDescriptor fd) {
1277 return JIOFDA.get(fd);
1278 }
1279
1280 private static final JavaIOFileDescriptorAccess JIOFDA = SharedSecrets.getJavaIOFileDescriptorAccess();
1281 }