1 /*
2 * Copyright (c) 2019, 2025, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package sun.nio.ch;
27
28 import java.io.FileDescriptor;
29 import java.io.IOException;
30 import java.io.InputStream;
31 import java.io.InterruptedIOException;
32 import java.io.OutputStream;
33 import java.io.UncheckedIOException;
34 import java.lang.ref.Cleaner.Cleanable;
35 import java.net.InetAddress;
36 import java.net.InetSocketAddress;
37 import java.net.ProtocolFamily;
38 import java.net.SocketAddress;
39 import java.net.SocketException;
40 import java.net.SocketImpl;
41 import java.net.SocketOption;
42 import java.net.SocketTimeoutException;
43 import java.net.StandardProtocolFamily;
44 import java.net.StandardSocketOptions;
45 import java.net.UnknownHostException;
46 import java.nio.ByteBuffer;
47 import java.util.Collections;
48 import java.util.HashSet;
49 import java.util.Objects;
50 import java.util.Set;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.locks.ReentrantLock;
53
54 import jdk.internal.access.JavaIOFileDescriptorAccess;
55 import jdk.internal.access.SharedSecrets;
56 import jdk.internal.ref.CleanerFactory;
57 import sun.net.ConnectionResetException;
58 import sun.net.PlatformSocketImpl;
59 import sun.net.ext.ExtendedSocketOptions;
60 import jdk.internal.util.Exceptions;
61
62 import static java.util.concurrent.TimeUnit.MILLISECONDS;
63 import static java.util.concurrent.TimeUnit.NANOSECONDS;
64 import static jdk.internal.util.Exceptions.filterNonSocketInfo;
65 import static jdk.internal.util.Exceptions.formatMsg;
66
67 /**
68 * NIO based SocketImpl.
69 *
70 * The underlying socket used by this SocketImpl is initially configured blocking.
71 * If a connect, accept or read is attempted with a timeout, or a virtual
72 * thread invokes a blocking operation, then the socket is changed to non-blocking
73 * When in non-blocking mode, operations that don't complete immediately will
74 * poll the socket (or park when invoked on a virtual thread) and preserve
75 * the semantics of blocking operations.
76 */
77
78 public final class NioSocketImpl extends SocketImpl implements PlatformSocketImpl {
79 private static final NativeDispatcher nd = new SocketDispatcher();
80
81 // The maximum number of bytes to read/write per syscall to avoid needing
82 // a huge buffer from the temporary buffer cache
83 private static final int MAX_BUFFER_SIZE = 128 * 1024;
84
85 // true if this is a SocketImpl for a ServerSocket
86 private final boolean server;
87
88 // Lock held when reading (also used when accepting or connecting)
89 private final ReentrantLock readLock = new ReentrantLock();
90
91 // Lock held when writing
92 private final ReentrantLock writeLock = new ReentrantLock();
93
94 // The stateLock for read/changing state
95 private final Object stateLock = new Object();
96 private static final int ST_NEW = 0;
97 private static final int ST_UNCONNECTED = 1;
98 private static final int ST_CONNECTING = 2;
99 private static final int ST_CONNECTED = 3;
100 private static final int ST_CLOSING = 4;
101 private static final int ST_CLOSED = 5;
102 private volatile int state; // need stateLock to change
103
104 private Cleanable cleaner;
105
106 // set to true when the socket is in non-blocking mode
107 private volatile boolean nonBlocking;
108
109 // used by connect/read/write/accept, protected by stateLock
110 private Thread readerThread;
111 private Thread writerThread;
112
113 // used when SO_REUSEADDR is emulated, protected by stateLock
114 private boolean isReuseAddress;
115
116 // read or accept timeout in millis
117 private volatile int timeout;
118
119 // flags to indicate if the connection is shutdown for input and output
120 private volatile boolean isInputClosed;
121 private volatile boolean isOutputClosed;
122
123 // used by read to emulate legacy behavior, protected by readLock
124 private boolean readEOF;
125 private boolean connectionReset;
126
127 /**
128 * Creates an instance of this SocketImpl.
129 * @param server true if this is a SocketImpl for a ServerSocket
130 */
131 public NioSocketImpl(boolean server) {
132 this.server = server;
133 }
134
135 /**
136 * Returns true if the socket is open.
137 */
138 private boolean isOpen() {
139 return state < ST_CLOSING;
140 }
141
142 /**
143 * Throws SocketException if the socket is not open.
144 */
145 private void ensureOpen() throws SocketException {
146 int state = this.state;
147 if (state == ST_NEW)
148 throw new SocketException("Socket not created");
149 if (state >= ST_CLOSING)
150 throw new SocketException("Socket closed");
151 }
152
153 /**
154 * Throws SocketException if the socket is not open and connected.
155 */
156 private void ensureOpenAndConnected() throws SocketException {
157 int state = this.state;
158 if (state < ST_CONNECTED)
159 throw new SocketException("Not connected");
160 if (state > ST_CONNECTED)
161 throw new SocketException("Socket closed");
162 }
163
164 /**
165 * Disables the current thread for scheduling purposes until the
166 * socket is ready for I/O or is asynchronously closed, for up to the
167 * specified waiting time.
168 * @throws IOException if an I/O error occurs
169 */
170 private void park(FileDescriptor fd, int event, long nanos) throws IOException {
171 Thread t = Thread.currentThread();
172 if (t.isVirtual()) {
173 Poller.poll(fdVal(fd), event, nanos, this::isOpen);
174 if (t.isInterrupted()) {
175 throw new InterruptedIOException();
176 }
177 } else {
178 long millis;
179 if (nanos == 0) {
180 millis = -1;
181 } else {
182 millis = NANOSECONDS.toMillis(nanos);
183 if (nanos > MILLISECONDS.toNanos(millis)) {
184 // Round up any excess nanos to the nearest millisecond to
185 // avoid parking for less than requested.
186 millis++;
187 }
188 }
189 Net.poll(fd, event, millis);
190 }
191 }
192
193 /**
194 * Disables the current thread for scheduling purposes until the socket is
195 * ready for I/O or is asynchronously closed.
196 * @throws IOException if an I/O error occurs
197 */
198 private void park(FileDescriptor fd, int event) throws IOException {
199 park(fd, event, 0);
200 }
201
202 /**
203 * Ensures that the socket is configured non-blocking invoked on a virtual
204 * thread or the operation has a timeout
205 * @throws IOException if there is an I/O error changing the blocking mode
206 */
207 private void configureNonBlockingIfNeeded(FileDescriptor fd, boolean timed)
208 throws IOException
209 {
210 if (!nonBlocking
211 && (timed || Thread.currentThread().isVirtual())) {
212 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
213 IOUtil.configureBlocking(fd, false);
214 nonBlocking = true;
215 }
216 }
217
218 /**
219 * Marks the beginning of a read operation that might block.
220 * @throws SocketException if the socket is closed or not connected
221 */
222 private FileDescriptor beginRead() throws SocketException {
223 synchronized (stateLock) {
224 ensureOpenAndConnected();
225 readerThread = NativeThread.threadToSignal();
226 return fd;
227 }
228 }
229
230 /**
231 * Marks the end of a read operation that may have blocked.
232 * @throws SocketException is the socket is closed
233 */
234 private void endRead(boolean completed) throws SocketException {
235 synchronized (stateLock) {
236 readerThread = null;
237 int state = this.state;
238 if (state == ST_CLOSING)
239 tryFinishClose();
240 if (!completed && state >= ST_CLOSING)
241 throw new SocketException("Socket closed");
242 }
243 }
244
245 /**
246 * Attempts to read bytes from the socket into the given byte array.
247 */
248 private int tryRead(FileDescriptor fd, byte[] b, int off, int len)
249 throws IOException
250 {
251 ByteBuffer dst = Util.getTemporaryDirectBuffer(len);
252 assert dst.position() == 0;
253 try {
254 int n = nd.read(fd, ((DirectBuffer)dst).address(), len);
255 if (n > 0) {
256 dst.get(b, off, n);
257 }
258 return n;
259 } finally {
260 Util.offerFirstTemporaryDirectBuffer(dst);
261 }
262 }
263
264 /**
265 * Reads bytes from the socket into the given byte array with a timeout.
266 * @throws SocketTimeoutException if the read timeout elapses
267 */
268 private int timedRead(FileDescriptor fd, byte[] b, int off, int len, long nanos)
269 throws IOException
270 {
271 long startNanos = System.nanoTime();
272 int n = tryRead(fd, b, off, len);
273 while (n == IOStatus.UNAVAILABLE && isOpen()) {
274 long remainingNanos = nanos - (System.nanoTime() - startNanos);
275 if (remainingNanos <= 0) {
276 throw new SocketTimeoutException("Read timed out");
277 }
278 park(fd, Net.POLLIN, remainingNanos);
279 n = tryRead(fd, b, off, len);
280 }
281 return n;
282 }
283
284 /**
285 * Reads bytes from the socket into the given byte array.
286 * @return the number of bytes read or -1 at EOF
287 * @throws SocketException if the socket is closed or a socket I/O error occurs
288 * @throws SocketTimeoutException if the read timeout elapses
289 */
290 private int implRead(byte[] b, int off, int len, long remainingNanos) throws IOException {
291 int n = 0;
292 FileDescriptor fd = beginRead();
293 try {
294 if (connectionReset)
295 throw new SocketException("Connection reset");
296 if (isInputClosed)
297 return -1;
298
299 // experimental
300 if (Poller.supportReadOps() && Thread.currentThread().isVirtual()) {
301 n = Poller.read(fdVal(fd), b, off, len, remainingNanos, this::isOpen);
302 if (n != IOStatus.UNAVAILABLE) return n;
303 }
304
305 configureNonBlockingIfNeeded(fd, remainingNanos > 0);
306 if (remainingNanos > 0) {
307 // read with timeout
308 n = timedRead(fd, b, off, len, remainingNanos);
309 } else {
310 // read, no timeout
311 n = tryRead(fd, b, off, len);
312 while (IOStatus.okayToRetry(n) && isOpen()) {
313 park(fd, Net.POLLIN);
314 n = tryRead(fd, b, off, len);
315 }
316 }
317 return n;
318 } catch (InterruptedIOException e) {
319 throw e;
320 } catch (ConnectionResetException e) {
321 connectionReset = true;
322 throw new SocketException("Connection reset");
323 } catch (IOException ioe) {
324 // throw SocketException to maintain compatibility
325 throw asSocketException(ioe);
326 } finally {
327 endRead(n > 0);
328 }
329 }
330
331 /**
332 * Reads bytes from the socket into the given byte array.
333 * @return the number of bytes read or -1 at EOF
334 * @throws IndexOutOfBoundsException if the bound checks fail
335 * @throws SocketException if the socket is closed or a socket I/O error occurs
336 * @throws SocketTimeoutException if the read timeout elapses
337 */
338 private int read(byte[] b, int off, int len) throws IOException {
339 Objects.checkFromIndexSize(off, len, b.length);
340 if (len == 0) {
341 return 0;
342 } else {
343 long remainingNanos = 0;
344 int timeout = this.timeout;
345 if (timeout > 0) {
346 remainingNanos = tryLock(readLock, timeout, MILLISECONDS);
347 if (remainingNanos <= 0) {
348 assert !readLock.isHeldByCurrentThread();
349 throw new SocketTimeoutException("Read timed out");
350 }
351 } else {
352 readLock.lock();
353 }
354 try {
355 // emulate legacy behavior to return -1, even if socket is closed
356 if (readEOF)
357 return -1;
358 // read up to MAX_BUFFER_SIZE bytes
359 int size = Math.min(len, MAX_BUFFER_SIZE);
360 int n = implRead(b, off, size, remainingNanos);
361 if (n == -1)
362 readEOF = true;
363 return n;
364 } finally {
365 readLock.unlock();
366 }
367 }
368 }
369
370 /**
371 * Marks the beginning of a write operation that might block.
372 * @throws SocketException if the socket is closed or not connected
373 */
374 private FileDescriptor beginWrite() throws SocketException {
375 synchronized (stateLock) {
376 ensureOpenAndConnected();
377 writerThread = NativeThread.threadToSignal();
378 return fd;
379 }
380 }
381
382 /**
383 * Marks the end of a write operation that may have blocked.
384 * @throws SocketException is the socket is closed
385 */
386 private void endWrite(boolean completed) throws SocketException {
387 synchronized (stateLock) {
388 writerThread = null;
389 int state = this.state;
390 if (state == ST_CLOSING)
391 tryFinishClose();
392 if (!completed && state >= ST_CLOSING)
393 throw new SocketException("Socket closed");
394 }
395 }
396
397 /**
398 * Attempts to write a sequence of bytes to the socket from the given
399 * byte array.
400 */
401 private int tryWrite(FileDescriptor fd, byte[] b, int off, int len)
402 throws IOException
403 {
404 ByteBuffer src = Util.getTemporaryDirectBuffer(len);
405 assert src.position() == 0;
406 try {
407 src.put(b, off, len);
408 return nd.write(fd, ((DirectBuffer)src).address(), len);
409 } finally {
410 Util.offerFirstTemporaryDirectBuffer(src);
411 }
412 }
413
414 /**
415 * Writes a sequence of bytes to the socket from the given byte array.
416 * @return the number of bytes written
417 * @throws SocketException if the socket is closed or a socket I/O error occurs
418 */
419 private int implWrite(byte[] b, int off, int len) throws IOException {
420 int n = 0;
421 FileDescriptor fd = beginWrite();
422 try {
423
424 // experimental
425 if (Poller.supportWriteOps() && Thread.currentThread().isVirtual()) {
426 n = Poller.write(fdVal(fd), b, off, len, this::isOpen);
427 if (n != IOStatus.UNAVAILABLE) return n;
428 }
429
430 configureNonBlockingIfNeeded(fd, false);
431 n = tryWrite(fd, b, off, len);
432 while (IOStatus.okayToRetry(n) && isOpen()) {
433 park(fd, Net.POLLOUT);
434 n = tryWrite(fd, b, off, len);
435 }
436 return n;
437 } catch (InterruptedIOException e) {
438 throw e;
439 } catch (IOException ioe) {
440 // throw SocketException to maintain compatibility
441 throw asSocketException(ioe);
442 } finally {
443 endWrite(n > 0);
444 }
445 }
446
447 /**
448 * Writes a sequence of bytes to the socket from the given byte array.
449 * @throws SocketException if the socket is closed or a socket I/O error occurs
450 */
451 private void write(byte[] b, int off, int len) throws IOException {
452 Objects.checkFromIndexSize(off, len, b.length);
453 if (len > 0) {
454 writeLock.lock();
455 try {
456 int pos = off;
457 int end = off + len;
458 while (pos < end) {
459 // write up to MAX_BUFFER_SIZE bytes
460 int size = Math.min((end - pos), MAX_BUFFER_SIZE);
461 int n = implWrite(b, pos, size);
462 pos += n;
463 }
464 } finally {
465 writeLock.unlock();
466 }
467 }
468 }
469
470 /**
471 * Creates the socket.
472 * @param stream {@code true} for a streams socket
473 */
474 @Override
475 protected void create(boolean stream) throws IOException {
476 if (!stream) {
477 throw new IOException("Datagram socket creation not supported");
478 }
479 synchronized (stateLock) {
480 if (state != ST_NEW)
481 throw new IOException("Already created");
482 FileDescriptor fd;
483 if (server) {
484 fd = Net.serverSocket();
485 } else {
486 fd = Net.socket();
487 }
488 Runnable closer = closerFor(fd);
489 this.fd = fd;
490 this.cleaner = CleanerFactory.cleaner().register(this, closer);
491 this.state = ST_UNCONNECTED;
492 }
493 }
494
495 /**
496 * Marks the beginning of a connect operation that might block.
497 * @throws SocketException if the socket is closed or already connected
498 */
499 private FileDescriptor beginConnect(InetAddress address, int port)
500 throws IOException
501 {
502 synchronized (stateLock) {
503 int state = this.state;
504 if (state != ST_UNCONNECTED) {
505 if (state == ST_NEW)
506 throw new SocketException("Not created");
507 if (state == ST_CONNECTING)
508 throw new SocketException("Connection in progress");
509 if (state == ST_CONNECTED)
510 throw new SocketException("Already connected");
511 if (state >= ST_CLOSING)
512 throw new SocketException("Socket closed");
513 assert false;
514 }
515 this.state = ST_CONNECTING;
516
517 // save the remote address/port
518 this.address = address;
519 this.port = port;
520
521 readerThread = NativeThread.threadToSignal();
522 return fd;
523 }
524 }
525
526 /**
527 * Marks the end of a connect operation that may have blocked.
528 * @throws SocketException is the socket is closed
529 */
530 private void endConnect(FileDescriptor fd, boolean completed) throws IOException {
531 synchronized (stateLock) {
532 readerThread = null;
533 int state = this.state;
534 if (state == ST_CLOSING)
535 tryFinishClose();
536 if (completed && state == ST_CONNECTING) {
537 this.state = ST_CONNECTED;
538 localport = Net.localAddress(fd).getPort();
539 } else if (!completed && state >= ST_CLOSING) {
540 throw new SocketException("Socket closed");
541 }
542 }
543 }
544
545 /**
546 * Waits for a connection attempt to finish with a timeout
547 * @throws SocketTimeoutException if the connect timeout elapses
548 */
549 private boolean timedFinishConnect(FileDescriptor fd, long nanos) throws IOException {
550 long startNanos = System.nanoTime();
551 boolean polled = Net.pollConnectNow(fd);
552 while (!polled && isOpen()) {
553 long remainingNanos = nanos - (System.nanoTime() - startNanos);
554 if (remainingNanos <= 0) {
555 throw new SocketTimeoutException("Connect timed out");
556 }
557 park(fd, Net.POLLOUT, remainingNanos);
558 polled = Net.pollConnectNow(fd);
559 }
560 return polled && isOpen();
561 }
562
563 /**
564 * Attempts to establish a connection to the given socket address with a
565 * timeout. Closes the socket if connection cannot be established.
566 * @throws IOException if the address is not a resolved InetSocketAddress or
567 * the connection cannot be established
568 */
569 @Override
570 protected void connect(SocketAddress remote, int millis) throws IOException {
571 // SocketImpl connect only specifies IOException
572 if (!(remote instanceof InetSocketAddress))
573 throw new IOException("Unsupported address type");
574 InetSocketAddress isa = (InetSocketAddress) remote;
575 if (isa.isUnresolved()) {
576 throw new UnknownHostException(
577 formatMsg(filterNonSocketInfo(isa.getHostName())));
578 }
579
580 InetAddress address = isa.getAddress();
581 if (address.isAnyLocalAddress())
582 address = InetAddress.getLocalHost();
583 int port = isa.getPort();
584
585 ReentrantLock connectLock = readLock;
586 try {
587 connectLock.lock();
588 try {
589 boolean connected = false;
590 FileDescriptor fd = beginConnect(address, port);
591 try {
592 configureNonBlockingIfNeeded(fd, millis > 0);
593 int n = Net.connect(fd, address, port);
594 if (n > 0) {
595 // connection established
596 connected = true;
597 } else {
598 assert IOStatus.okayToRetry(n);
599 if (millis > 0) {
600 // finish connect with timeout
601 long nanos = MILLISECONDS.toNanos(millis);
602 connected = timedFinishConnect(fd, nanos);
603 } else {
604 // finish connect, no timeout
605 boolean polled = false;
606 while (!polled && isOpen()) {
607 park(fd, Net.POLLOUT);
608 polled = Net.pollConnectNow(fd);
609 }
610 connected = polled && isOpen();
611 }
612 }
613 } finally {
614 endConnect(fd, connected);
615 }
616 } finally {
617 connectLock.unlock();
618 }
619 } catch (IOException ioe) {
620 close();
621 if (ioe instanceof SocketTimeoutException) {
622 throw ioe;
623 } else if (ioe instanceof InterruptedIOException) {
624 assert Thread.currentThread().isVirtual();
625 throw new SocketException("Closed by interrupt");
626 } else {
627 throw Exceptions.ioException(ioe, isa);
628 }
629 }
630 }
631
632 @Override
633 protected void connect(String host, int port) throws IOException {
634 connect(new InetSocketAddress(host, port), 0);
635 }
636
637 @Override
638 protected void connect(InetAddress address, int port) throws IOException {
639 connect(new InetSocketAddress(address, port), 0);
640 }
641
642 @Override
643 protected void bind(InetAddress host, int port) throws IOException {
644 synchronized (stateLock) {
645 ensureOpen();
646 if (localport != 0)
647 throw new SocketException("Already bound");
648 Net.bind(fd, host, port);
649 // set the address field to the given host address to
650 // maintain long standing behavior. When binding to 0.0.0.0
651 // then the actual local address will be ::0 when IPv6 is enabled.
652 address = host;
653 localport = Net.localAddress(fd).getPort();
654 }
655 }
656
657 @Override
658 protected void listen(int backlog) throws IOException {
659 synchronized (stateLock) {
660 ensureOpen();
661 if (localport == 0)
662 throw new SocketException("Not bound");
663 Net.listen(fd, backlog < 1 ? 50 : backlog);
664 }
665 }
666
667 /**
668 * Marks the beginning of an accept operation that might block.
669 * @throws SocketException if the socket is closed
670 */
671 private FileDescriptor beginAccept() throws SocketException {
672 synchronized (stateLock) {
673 ensureOpen();
674 if (localport == 0)
675 throw new SocketException("Not bound");
676 readerThread = NativeThread.threadToSignal();
677 return fd;
678 }
679 }
680
681 /**
682 * Marks the end of an accept operation that may have blocked.
683 * @throws SocketException is the socket is closed
684 */
685 private void endAccept(boolean completed) throws SocketException {
686 synchronized (stateLock) {
687 int state = this.state;
688 readerThread = null;
689 if (state == ST_CLOSING)
690 tryFinishClose();
691 if (!completed && state >= ST_CLOSING)
692 throw new SocketException("Socket closed");
693 }
694 }
695
696 /**
697 * Accepts a new connection with a timeout.
698 * @throws SocketTimeoutException if the accept timeout elapses
699 */
700 private int timedAccept(FileDescriptor fd,
701 FileDescriptor newfd,
702 InetSocketAddress[] isaa,
703 long nanos)
704 throws IOException
705 {
706 long startNanos = System.nanoTime();
707 int n = Net.accept(fd, newfd, isaa);
708 while (n == IOStatus.UNAVAILABLE && isOpen()) {
709 long remainingNanos = nanos - (System.nanoTime() - startNanos);
710 if (remainingNanos <= 0) {
711 throw new SocketTimeoutException("Accept timed out");
712 }
713 park(fd, Net.POLLIN, remainingNanos);
714 n = Net.accept(fd, newfd, isaa);
715 }
716 return n;
717 }
718
719 /**
720 * Accepts a new connection so that the given SocketImpl is connected to
721 * the peer. The SocketImpl must be a newly created NioSocketImpl.
722 */
723 @Override
724 protected void accept(SocketImpl si) throws IOException {
725 NioSocketImpl nsi = (NioSocketImpl) si;
726 if (nsi.state != ST_NEW)
727 throw new SocketException("Not a newly created SocketImpl");
728
729 FileDescriptor newfd = new FileDescriptor();
730 InetSocketAddress[] isaa = new InetSocketAddress[1];
731
732 // acquire the lock, adjusting the timeout for cases where several
733 // threads are accepting connections and there is a timeout set
734 ReentrantLock acceptLock = readLock;
735 int timeout = this.timeout;
736 long remainingNanos = 0;
737 if (timeout > 0) {
738 remainingNanos = tryLock(acceptLock, timeout, MILLISECONDS);
739 if (remainingNanos <= 0) {
740 assert !acceptLock.isHeldByCurrentThread();
741 throw new SocketTimeoutException("Accept timed out");
742 }
743 } else {
744 acceptLock.lock();
745 }
746
747 // accept a connection
748 try {
749 int n = 0;
750 FileDescriptor fd = beginAccept();
751 try {
752 configureNonBlockingIfNeeded(fd, remainingNanos > 0);
753 if (remainingNanos > 0) {
754 // accept with timeout
755 n = timedAccept(fd, newfd, isaa, remainingNanos);
756 } else {
757 // accept, no timeout
758 n = Net.accept(fd, newfd, isaa);
759 while (IOStatus.okayToRetry(n) && isOpen()) {
760 park(fd, Net.POLLIN);
761 n = Net.accept(fd, newfd, isaa);
762 }
763 }
764 } finally {
765 endAccept(n > 0);
766 assert IOStatus.check(n);
767 }
768 } finally {
769 acceptLock.unlock();
770 }
771
772 // get local address and configure accepted socket to blocking mode
773 InetSocketAddress localAddress;
774 try {
775 localAddress = Net.localAddress(newfd);
776 IOUtil.configureBlocking(newfd, true);
777 } catch (IOException ioe) {
778 nd.close(newfd);
779 throw ioe;
780 }
781
782 // set the fields
783 Runnable closer = closerFor(newfd);
784 synchronized (nsi.stateLock) {
785 nsi.fd = newfd;
786 nsi.cleaner = CleanerFactory.cleaner().register(nsi, closer);
787 nsi.localport = localAddress.getPort();
788 nsi.address = isaa[0].getAddress();
789 nsi.port = isaa[0].getPort();
790 nsi.state = ST_CONNECTED;
791 }
792 }
793
794 @Override
795 protected InputStream getInputStream() {
796 return new InputStream() {
797 @Override
798 public int read() throws IOException {
799 byte[] a = new byte[1];
800 int n = read(a, 0, 1);
801 return (n > 0) ? (a[0] & 0xff) : -1;
802 }
803 @Override
804 public int read(byte[] b, int off, int len) throws IOException {
805 return NioSocketImpl.this.read(b, off, len);
806 }
807 @Override
808 public int available() throws IOException {
809 return NioSocketImpl.this.available();
810 }
811 @Override
812 public void close() throws IOException {
813 NioSocketImpl.this.close();
814 }
815 };
816 }
817
818 @Override
819 protected OutputStream getOutputStream() {
820 return new OutputStream() {
821 @Override
822 public void write(int b) throws IOException {
823 byte[] a = new byte[]{(byte) b};
824 write(a, 0, 1);
825 }
826 @Override
827 public void write(byte[] b, int off, int len) throws IOException {
828 NioSocketImpl.this.write(b, off, len);
829 }
830 @Override
831 public void close() throws IOException {
832 NioSocketImpl.this.close();
833 }
834 };
835 }
836
837 @Override
838 protected int available() throws IOException {
839 synchronized (stateLock) {
840 ensureOpenAndConnected();
841 if (isInputClosed) {
842 return 0;
843 } else {
844 return Net.available(fd);
845 }
846 }
847 }
848
849 /**
850 * Closes the socket if there are no I/O operations in progress.
851 */
852 private boolean tryClose() throws IOException {
853 assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
854 if (readerThread == null && writerThread == null) {
855 try {
856 cleaner.clean();
857 } catch (UncheckedIOException ioe) {
858 throw ioe.getCause();
859 } finally {
860 state = ST_CLOSED;
861 }
862 return true;
863 } else {
864 return false;
865 }
866 }
867
868 /**
869 * Invokes tryClose to attempt to close the socket.
870 *
871 * This method is used for deferred closing by I/O operations.
872 */
873 private void tryFinishClose() {
874 try {
875 tryClose();
876 } catch (IOException ignore) { }
877 }
878
879 /**
880 * Closes the socket. If there are I/O operations in progress then the
881 * socket is pre-closed and the threads are signalled. The socket will be
882 * closed when the last I/O operation aborts.
883 */
884 @Override
885 protected void close() throws IOException {
886 synchronized (stateLock) {
887 int state = this.state;
888 if (state >= ST_CLOSING)
889 return;
890 if (state == ST_NEW) {
891 // stillborn
892 this.state = ST_CLOSED;
893 return;
894 }
895 boolean connected = (state == ST_CONNECTED);
896 this.state = ST_CLOSING;
897
898 // shutdown output when linger interval not set to 0
899 if (connected) {
900 try {
901 var SO_LINGER = StandardSocketOptions.SO_LINGER;
902 if ((int) Net.getSocketOption(fd, SO_LINGER) != 0) {
903 Net.shutdown(fd, Net.SHUT_WR);
904 }
905 } catch (IOException ignore) { }
906 }
907
908 // attempt to close the socket. If there are I/O operations in progress
909 // then the socket is pre-closed and the thread(s) signalled. The
910 // last thread will close the file descriptor.
911 if (!tryClose()) {
912 nd.preClose(fd, readerThread, writerThread);
913 }
914 }
915 }
916
917 // the socket options supported by client and server sockets
918 private static volatile Set<SocketOption<?>> clientSocketOptions;
919 private static volatile Set<SocketOption<?>> serverSocketOptions;
920
921 @Override
922 protected Set<SocketOption<?>> supportedOptions() {
923 Set<SocketOption<?>> options = (server) ? serverSocketOptions : clientSocketOptions;
924 if (options == null) {
925 options = new HashSet<>();
926 options.add(StandardSocketOptions.SO_RCVBUF);
927 options.add(StandardSocketOptions.SO_REUSEADDR);
928 if (server) {
929 // IP_TOS added for server socket to maintain compatibility
930 options.add(StandardSocketOptions.IP_TOS);
931 options.addAll(ExtendedSocketOptions.serverSocketOptions());
932 } else {
933 options.add(StandardSocketOptions.IP_TOS);
934 options.add(StandardSocketOptions.SO_KEEPALIVE);
935 options.add(StandardSocketOptions.SO_SNDBUF);
936 options.add(StandardSocketOptions.SO_LINGER);
937 options.add(StandardSocketOptions.TCP_NODELAY);
938 options.addAll(ExtendedSocketOptions.clientSocketOptions());
939 }
940 if (Net.isReusePortAvailable())
941 options.add(StandardSocketOptions.SO_REUSEPORT);
942 options = Collections.unmodifiableSet(options);
943 if (server) {
944 serverSocketOptions = options;
945 } else {
946 clientSocketOptions = options;
947 }
948 }
949 return options;
950 }
951
952 @Override
953 protected <T> void setOption(SocketOption<T> opt, T value) throws IOException {
954 if (!supportedOptions().contains(opt))
955 throw new UnsupportedOperationException("'" + opt + "' not supported");
956 if (!opt.type().isInstance(value))
957 throw new IllegalArgumentException("Invalid value '" + value + "'");
958 synchronized (stateLock) {
959 ensureOpen();
960 if (opt == StandardSocketOptions.IP_TOS) {
961 // maps to IPV6_TCLASS and/or IP_TOS
962 Net.setIpSocketOption(fd, family(), opt, value);
963 } else if (opt == StandardSocketOptions.SO_REUSEADDR) {
964 boolean b = (boolean) value;
965 if (Net.useExclusiveBind()) {
966 isReuseAddress = b;
967 } else {
968 Net.setSocketOption(fd, opt, b);
969 }
970 } else {
971 // option does not need special handling
972 Net.setSocketOption(fd, opt, value);
973 }
974 }
975 }
976
977 @SuppressWarnings("unchecked")
978 protected <T> T getOption(SocketOption<T> opt) throws IOException {
979 if (!supportedOptions().contains(opt))
980 throw new UnsupportedOperationException("'" + opt + "' not supported");
981 synchronized (stateLock) {
982 ensureOpen();
983 if (opt == StandardSocketOptions.IP_TOS) {
984 return (T) Net.getSocketOption(fd, family(), opt);
985 } else if (opt == StandardSocketOptions.SO_REUSEADDR) {
986 if (Net.useExclusiveBind()) {
987 return (T) Boolean.valueOf(isReuseAddress);
988 } else {
989 return (T) Net.getSocketOption(fd, opt);
990 }
991 } else {
992 // option does not need special handling
993 return (T) Net.getSocketOption(fd, opt);
994 }
995 }
996 }
997
998 private boolean booleanValue(Object value, String desc) throws SocketException {
999 if (!(value instanceof Boolean))
1000 throw new SocketException("Bad value for " + desc);
1001 return (boolean) value;
1002 }
1003
1004 private int intValue(Object value, String desc) throws SocketException {
1005 if (!(value instanceof Integer))
1006 throw new SocketException("Bad value for " + desc);
1007 return (int) value;
1008 }
1009
1010 @Override
1011 public void setOption(int opt, Object value) throws SocketException {
1012 synchronized (stateLock) {
1013 ensureOpen();
1014 try {
1015 switch (opt) {
1016 case SO_LINGER: {
1017 // the value is "false" to disable, or linger interval to enable
1018 int i;
1019 if (value instanceof Boolean && ((boolean) value) == false) {
1020 i = -1;
1021 } else {
1022 i = intValue(value, "SO_LINGER");
1023 }
1024 Net.setSocketOption(fd, StandardSocketOptions.SO_LINGER, i);
1025 break;
1026 }
1027 case SO_TIMEOUT: {
1028 int i = intValue(value, "SO_TIMEOUT");
1029 if (i < 0)
1030 throw new IllegalArgumentException("timeout < 0");
1031 timeout = i;
1032 break;
1033 }
1034 case IP_TOS: {
1035 int i = intValue(value, "IP_TOS");
1036 Net.setIpSocketOption(fd, family(), StandardSocketOptions.IP_TOS, i);
1037 break;
1038 }
1039 case TCP_NODELAY: {
1040 boolean b = booleanValue(value, "TCP_NODELAY");
1041 Net.setSocketOption(fd, StandardSocketOptions.TCP_NODELAY, b);
1042 break;
1043 }
1044 case SO_SNDBUF: {
1045 int i = intValue(value, "SO_SNDBUF");
1046 if (i <= 0)
1047 throw new SocketException("SO_SNDBUF <= 0");
1048 Net.setSocketOption(fd, StandardSocketOptions.SO_SNDBUF, i);
1049 break;
1050 }
1051 case SO_RCVBUF: {
1052 int i = intValue(value, "SO_RCVBUF");
1053 if (i <= 0)
1054 throw new SocketException("SO_RCVBUF <= 0");
1055 Net.setSocketOption(fd, StandardSocketOptions.SO_RCVBUF, i);
1056 break;
1057 }
1058 case SO_KEEPALIVE: {
1059 boolean b = booleanValue(value, "SO_KEEPALIVE");
1060 Net.setSocketOption(fd, StandardSocketOptions.SO_KEEPALIVE, b);
1061 break;
1062 }
1063 case SO_OOBINLINE: {
1064 boolean b = booleanValue(value, "SO_OOBINLINE");
1065 Net.setSocketOption(fd, ExtendedSocketOption.SO_OOBINLINE, b);
1066 break;
1067 }
1068 case SO_REUSEADDR: {
1069 boolean b = booleanValue(value, "SO_REUSEADDR");
1070 if (Net.useExclusiveBind()) {
1071 isReuseAddress = b;
1072 } else {
1073 Net.setSocketOption(fd, StandardSocketOptions.SO_REUSEADDR, b);
1074 }
1075 break;
1076 }
1077 case SO_REUSEPORT: {
1078 if (!Net.isReusePortAvailable())
1079 throw new SocketException("SO_REUSEPORT not supported");
1080 boolean b = booleanValue(value, "SO_REUSEPORT");
1081 Net.setSocketOption(fd, StandardSocketOptions.SO_REUSEPORT, b);
1082 break;
1083 }
1084 default:
1085 throw new SocketException("Unknown option " + opt);
1086 }
1087 } catch (SocketException e) {
1088 throw e;
1089 } catch (IllegalArgumentException | IOException e) {
1090 throw asSocketException(e);
1091 }
1092 }
1093 }
1094
1095 @Override
1096 public Object getOption(int opt) throws SocketException {
1097 synchronized (stateLock) {
1098 ensureOpen();
1099 try {
1100 switch (opt) {
1101 case SO_TIMEOUT:
1102 return timeout;
1103 case TCP_NODELAY:
1104 return Net.getSocketOption(fd, StandardSocketOptions.TCP_NODELAY);
1105 case SO_OOBINLINE:
1106 return Net.getSocketOption(fd, ExtendedSocketOption.SO_OOBINLINE);
1107 case SO_LINGER: {
1108 // return "false" when disabled, linger interval when enabled
1109 int i = (int) Net.getSocketOption(fd, StandardSocketOptions.SO_LINGER);
1110 if (i == -1) {
1111 return Boolean.FALSE;
1112 } else {
1113 return i;
1114 }
1115 }
1116 case SO_REUSEADDR:
1117 if (Net.useExclusiveBind()) {
1118 return isReuseAddress;
1119 } else {
1120 return Net.getSocketOption(fd, StandardSocketOptions.SO_REUSEADDR);
1121 }
1122 case SO_BINDADDR:
1123 return Net.localAddress(fd).getAddress();
1124 case SO_SNDBUF:
1125 return Net.getSocketOption(fd, StandardSocketOptions.SO_SNDBUF);
1126 case SO_RCVBUF:
1127 return Net.getSocketOption(fd, StandardSocketOptions.SO_RCVBUF);
1128 case IP_TOS:
1129 return Net.getSocketOption(fd, family(), StandardSocketOptions.IP_TOS);
1130 case SO_KEEPALIVE:
1131 return Net.getSocketOption(fd, StandardSocketOptions.SO_KEEPALIVE);
1132 case SO_REUSEPORT:
1133 if (!Net.isReusePortAvailable())
1134 throw new SocketException("SO_REUSEPORT not supported");
1135 return Net.getSocketOption(fd, StandardSocketOptions.SO_REUSEPORT);
1136 default:
1137 throw new SocketException("Unknown option " + opt);
1138 }
1139 } catch (SocketException e) {
1140 throw e;
1141 } catch (IllegalArgumentException | IOException e) {
1142 throw asSocketException(e);
1143 }
1144 }
1145 }
1146
1147 @Override
1148 protected void shutdownInput() throws IOException {
1149 synchronized (stateLock) {
1150 ensureOpenAndConnected();
1151 if (!isInputClosed) {
1152 Net.shutdown(fd, Net.SHUT_RD);
1153 if (readerThread != null && readerThread.isVirtual()) {
1154 Poller.stopPoll(readerThread);
1155 }
1156 isInputClosed = true;
1157 }
1158 }
1159 }
1160
1161 @Override
1162 protected void shutdownOutput() throws IOException {
1163 synchronized (stateLock) {
1164 ensureOpenAndConnected();
1165 if (!isOutputClosed) {
1166 Net.shutdown(fd, Net.SHUT_WR);
1167 if (writerThread != null && writerThread.isVirtual()) {
1168 Poller.stopPoll(writerThread);
1169 }
1170 isOutputClosed = true;
1171 }
1172 }
1173 }
1174
1175 @Override
1176 protected boolean supportsUrgentData() {
1177 return true;
1178 }
1179
1180 @Override
1181 protected void sendUrgentData(int data) throws IOException {
1182 writeLock.lock();
1183 try {
1184 int n = 0;
1185 FileDescriptor fd = beginWrite();
1186 try {
1187 configureNonBlockingIfNeeded(fd, false);
1188 do {
1189 n = Net.sendOOB(fd, (byte) data);
1190 } while (n == IOStatus.INTERRUPTED && isOpen());
1191 if (n == IOStatus.UNAVAILABLE) {
1192 throw new SocketException("No buffer space available");
1193 }
1194 } finally {
1195 endWrite(n > 0);
1196 }
1197 } finally {
1198 writeLock.unlock();
1199 }
1200 }
1201
1202 /**
1203 * Returns an action to close the given file descriptor.
1204 */
1205 private static Runnable closerFor(FileDescriptor fd) {
1206 return () -> {
1207 try {
1208 nd.close(fd);
1209 } catch (IOException ioe) {
1210 throw new UncheckedIOException(ioe);
1211 }
1212 };
1213 }
1214
1215 /**
1216 * Attempts to acquire the given lock within the given waiting time.
1217 * @return the remaining time in nanoseconds when the lock is acquired, zero
1218 * or less if the lock was not acquired before the timeout expired
1219 */
1220 private static long tryLock(ReentrantLock lock, long timeout, TimeUnit unit) {
1221 assert timeout > 0;
1222 boolean interrupted = false;
1223 long nanos = unit.toNanos(timeout);
1224 long remainingNanos = nanos;
1225 long startNanos = System.nanoTime();
1226 boolean acquired = false;
1227 while (!acquired && (remainingNanos > 0)) {
1228 try {
1229 acquired = lock.tryLock(remainingNanos, NANOSECONDS);
1230 } catch (InterruptedException e) {
1231 interrupted = true;
1232 }
1233 remainingNanos = nanos - (System.nanoTime() - startNanos);
1234 }
1235 if (acquired && remainingNanos <= 0L)
1236 lock.unlock(); // release lock if timeout has expired
1237 if (interrupted)
1238 Thread.currentThread().interrupt();
1239 return remainingNanos;
1240 }
1241
1242 /**
1243 * Creates a SocketException from the given exception.
1244 */
1245 private static SocketException asSocketException(Exception e) {
1246 if (e instanceof SocketException se) {
1247 return se;
1248 } else {
1249 var se = new SocketException(e.getMessage());
1250 se.setStackTrace(e.getStackTrace());
1251 return se;
1252 }
1253 }
1254
1255 /**
1256 * Returns the socket protocol family.
1257 */
1258 private static ProtocolFamily family() {
1259 if (Net.isIPv6Available()) {
1260 return StandardProtocolFamily.INET6;
1261 } else {
1262 return StandardProtocolFamily.INET;
1263 }
1264 }
1265
1266 /**
1267 * Return the file descriptor value.
1268 */
1269 private static int fdVal(FileDescriptor fd) {
1270 return JIOFDA.get(fd);
1271 }
1272
1273 private static final JavaIOFileDescriptorAccess JIOFDA = SharedSecrets.getJavaIOFileDescriptorAccess();
1274 }
--- EOF ---