1 /*
2 * Copyright (c) 2019, 2025, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package sun.nio.ch;
27
28 import java.io.FileDescriptor;
29 import java.io.IOException;
30 import java.io.InputStream;
31 import java.io.InterruptedIOException;
32 import java.io.OutputStream;
33 import java.io.UncheckedIOException;
34 import java.lang.ref.Cleaner.Cleanable;
35 import java.net.InetAddress;
36 import java.net.InetSocketAddress;
37 import java.net.ProtocolFamily;
38 import java.net.SocketAddress;
39 import java.net.SocketException;
40 import java.net.SocketImpl;
41 import java.net.SocketOption;
42 import java.net.SocketTimeoutException;
43 import java.net.StandardProtocolFamily;
44 import java.net.StandardSocketOptions;
45 import java.net.UnknownHostException;
46 import java.nio.ByteBuffer;
47 import java.util.Collections;
48 import java.util.HashSet;
49 import java.util.Objects;
50 import java.util.Set;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.locks.ReentrantLock;
53
54 import jdk.internal.access.JavaIOFileDescriptorAccess;
55 import jdk.internal.access.SharedSecrets;
56 import jdk.internal.ref.CleanerFactory;
57 import sun.net.ConnectionResetException;
58 import sun.net.PlatformSocketImpl;
59 import sun.net.ext.ExtendedSocketOptions;
60 import jdk.internal.util.Exceptions;
61
62 import static java.util.concurrent.TimeUnit.MILLISECONDS;
63 import static java.util.concurrent.TimeUnit.NANOSECONDS;
64 import static jdk.internal.util.Exceptions.filterNonSocketInfo;
65 import static jdk.internal.util.Exceptions.formatMsg;
66
67 /**
68 * NIO based SocketImpl.
69 *
70 * The underlying socket used by this SocketImpl is initially configured blocking.
71 * If a connect, accept or read is attempted with a timeout, or a virtual
72 * thread invokes a blocking operation, then the socket is changed to non-blocking
73 * When in non-blocking mode, operations that don't complete immediately will
74 * poll the socket (or park when invoked on a virtual thread) and preserve
75 * the semantics of blocking operations.
76 */
77
78 public final class NioSocketImpl extends SocketImpl implements PlatformSocketImpl {
79 private static final NativeDispatcher nd = new SocketDispatcher();
80
81 // The maximum number of bytes to read/write per syscall to avoid needing
82 // a huge buffer from the temporary buffer cache
83 private static final int MAX_BUFFER_SIZE = 128 * 1024;
84
85 // true if this is a SocketImpl for a ServerSocket
86 private final boolean server;
87
88 // Lock held when reading (also used when accepting or connecting)
89 private final ReentrantLock readLock = new ReentrantLock();
90
91 // Lock held when writing
92 private final ReentrantLock writeLock = new ReentrantLock();
93
94 // The stateLock for read/changing state
95 private final Object stateLock = new Object();
96 private static final int ST_NEW = 0;
97 private static final int ST_UNCONNECTED = 1;
98 private static final int ST_CONNECTING = 2;
99 private static final int ST_CONNECTED = 3;
100 private static final int ST_CLOSING = 4;
101 private static final int ST_CLOSED = 5;
102 private volatile int state; // need stateLock to change
103
104 private Cleanable cleaner;
105
106 // set to true when the socket is in non-blocking mode
107 private volatile boolean nonBlocking;
108
109 // used by connect/read/write/accept, protected by stateLock
110 private Thread readerThread;
111 private Thread writerThread;
112
113 // used when SO_REUSEADDR is emulated, protected by stateLock
114 private boolean isReuseAddress;
115
116 // read or accept timeout in millis
117 private volatile int timeout;
118
119 // flags to indicate if the connection is shutdown for input and output
120 private volatile boolean isInputClosed;
121 private volatile boolean isOutputClosed;
122
123 // used by read to emulate legacy behavior, protected by readLock
124 private boolean readEOF;
125 private boolean connectionReset;
126
127 /**
128 * Creates an instance of this SocketImpl.
129 * @param server true if this is a SocketImpl for a ServerSocket
130 */
131 public NioSocketImpl(boolean server) {
132 this.server = server;
133 }
134
135 /**
136 * Returns true if the socket is open.
137 */
138 private boolean isOpen() {
139 return state < ST_CLOSING;
140 }
141
142 /**
143 * Throws SocketException if the socket is not open.
144 */
145 private void ensureOpen() throws SocketException {
146 int state = this.state;
147 if (state == ST_NEW)
148 throw new SocketException("Socket not created");
149 if (state >= ST_CLOSING)
150 throw new SocketException("Socket closed");
151 }
152
153 /**
154 * Throws SocketException if the socket is not open and connected.
155 */
156 private void ensureOpenAndConnected() throws SocketException {
157 int state = this.state;
158 if (state < ST_CONNECTED)
159 throw new SocketException("Not connected");
160 if (state > ST_CONNECTED)
161 throw new SocketException("Socket closed");
162 }
163
164 /**
165 * Disables the current thread for scheduling purposes until the
166 * socket is ready for I/O or is asynchronously closed, for up to the
167 * specified waiting time.
168 * @throws IOException if an I/O error occurs
169 */
170 private void park(FileDescriptor fd, int event, long nanos) throws IOException {
171 Thread t = Thread.currentThread();
172 if (t.isVirtual()) {
173 Poller.poll(fdVal(fd), event, nanos, this::isOpen);
174 if (t.isInterrupted()) {
175 throw new InterruptedIOException();
176 }
177 } else {
178 long millis;
179 if (nanos == 0) {
180 millis = -1;
181 } else {
182 millis = NANOSECONDS.toMillis(nanos);
183 if (nanos > MILLISECONDS.toNanos(millis)) {
184 // Round up any excess nanos to the nearest millisecond to
185 // avoid parking for less than requested.
186 millis++;
187 }
188 }
189 Net.poll(fd, event, millis);
190 }
191 }
192
193 /**
194 * Disables the current thread for scheduling purposes until the socket is
195 * ready for I/O or is asynchronously closed.
196 * @throws IOException if an I/O error occurs
197 */
198 private void park(FileDescriptor fd, int event) throws IOException {
199 park(fd, event, 0);
200 }
201
202 /**
203 * Ensures that the socket is configured non-blocking invoked on a virtual
204 * thread or the operation has a timeout
205 * @throws IOException if there is an I/O error changing the blocking mode
206 */
207 private void configureNonBlockingIfNeeded(FileDescriptor fd, boolean timed)
208 throws IOException
209 {
210 if (!nonBlocking
211 && (timed || Thread.currentThread().isVirtual())) {
212 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
213 IOUtil.configureBlocking(fd, false);
214 nonBlocking = true;
215 }
216 }
217
218 /**
219 * Marks the beginning of a read operation that might block.
220 * @throws SocketException if the socket is closed or not connected
221 */
222 private FileDescriptor beginRead() throws SocketException {
223 synchronized (stateLock) {
224 ensureOpenAndConnected();
225 readerThread = NativeThread.threadToSignal();
226 return fd;
227 }
228 }
229
230 /**
231 * Marks the end of a read operation that may have blocked.
232 * @throws SocketException is the socket is closed
233 */
234 private void endRead(boolean completed) throws SocketException {
235 synchronized (stateLock) {
236 readerThread = null;
237 int state = this.state;
238 if (state == ST_CLOSING)
239 tryFinishClose();
240 if (!completed && state >= ST_CLOSING)
241 throw new SocketException("Socket closed");
242 }
243 }
244
245 /**
246 * Attempts to read bytes from the socket into the given byte array.
247 */
248 private int tryRead(FileDescriptor fd, byte[] b, int off, int len)
249 throws IOException
250 {
251 ByteBuffer dst = Util.getTemporaryDirectBuffer(len);
252 assert dst.position() == 0;
253 try {
254 int n = nd.read(fd, ((DirectBuffer)dst).address(), len);
255 if (n > 0) {
256 dst.get(b, off, n);
257 }
258 return n;
259 } finally {
260 Util.offerFirstTemporaryDirectBuffer(dst);
261 }
262 }
263
264 /**
265 * Reads bytes from the socket into the given byte array with a timeout.
266 * @throws SocketTimeoutException if the read timeout elapses
267 */
268 private int timedRead(FileDescriptor fd, byte[] b, int off, int len, long nanos)
269 throws IOException
270 {
271 long startNanos = System.nanoTime();
272 int n = tryRead(fd, b, off, len);
273 while (n == IOStatus.UNAVAILABLE && isOpen()) {
274 long remainingNanos = nanos - (System.nanoTime() - startNanos);
275 if (remainingNanos <= 0) {
276 throw new SocketTimeoutException("Read timed out");
277 }
278 park(fd, Net.POLLIN, remainingNanos);
279 n = tryRead(fd, b, off, len);
280 }
281 return n;
282 }
283
284 /**
285 * Reads bytes from the socket into the given byte array.
286 * @return the number of bytes read or -1 at EOF
287 * @throws SocketException if the socket is closed or a socket I/O error occurs
288 * @throws SocketTimeoutException if the read timeout elapses
289 */
290 private int implRead(byte[] b, int off, int len, long remainingNanos) throws IOException {
291 int n = 0;
292 SocketException ex = null;
293 FileDescriptor fd = beginRead();
294 try {
295 if (connectionReset)
296 throw new SocketException("Connection reset");
297 if (isInputClosed)
298 return -1;
299
300 // experimental
301 if (Poller.supportReadOps() && Thread.currentThread().isVirtual()) {
302 n = Poller.read(fdVal(fd), b, off, len, remainingNanos, this::isOpen);
303 if (n != IOStatus.UNAVAILABLE) return n;
304 }
305
306 configureNonBlockingIfNeeded(fd, remainingNanos > 0);
307 if (remainingNanos > 0) {
308 // read with timeout
309 n = timedRead(fd, b, off, len, remainingNanos);
310 } else {
311 // read, no timeout
312 n = tryRead(fd, b, off, len);
313 while (IOStatus.okayToRetry(n) && isOpen()) {
314 park(fd, Net.POLLIN);
315 n = tryRead(fd, b, off, len);
316 }
317 }
318 } catch (InterruptedIOException e) {
319 throw e;
320 } catch (ConnectionResetException e) {
321 connectionReset = true;
322 throw new SocketException("Connection reset");
323 } catch (IOException ioe) {
324 // translate to SocketException to maintain compatibility
325 ex = asSocketException(ioe);
326 } finally {
327 endRead(n > 0);
328 }
329 if (n <= 0 && isInputClosed) {
330 return -1;
331 }
332 if (ex != null) {
333 throw ex;
334 }
335 return n;
336 }
337
338 /**
339 * Reads bytes from the socket into the given byte array.
340 * @return the number of bytes read or -1 at EOF
341 * @throws IndexOutOfBoundsException if the bound checks fail
342 * @throws SocketException if the socket is closed or a socket I/O error occurs
343 * @throws SocketTimeoutException if the read timeout elapses
344 */
345 private int read(byte[] b, int off, int len) throws IOException {
346 Objects.checkFromIndexSize(off, len, b.length);
347 if (len == 0) {
348 return 0;
349 } else {
350 long remainingNanos = 0;
351 int timeout = this.timeout;
352 if (timeout > 0) {
353 remainingNanos = tryLock(readLock, timeout, MILLISECONDS);
354 if (remainingNanos <= 0) {
355 assert !readLock.isHeldByCurrentThread();
356 throw new SocketTimeoutException("Read timed out");
357 }
358 } else {
359 readLock.lock();
360 }
361 try {
362 // emulate legacy behavior to return -1, even if socket is closed
363 if (readEOF)
364 return -1;
365 // read up to MAX_BUFFER_SIZE bytes
366 int size = Math.min(len, MAX_BUFFER_SIZE);
367 int n = implRead(b, off, size, remainingNanos);
368 if (n == -1)
369 readEOF = true;
370 return n;
371 } finally {
372 readLock.unlock();
373 }
374 }
375 }
376
377 /**
378 * Marks the beginning of a write operation that might block.
379 * @throws SocketException if the socket is closed or not connected
380 */
381 private FileDescriptor beginWrite() throws SocketException {
382 synchronized (stateLock) {
383 ensureOpenAndConnected();
384 writerThread = NativeThread.threadToSignal();
385 return fd;
386 }
387 }
388
389 /**
390 * Marks the end of a write operation that may have blocked.
391 * @throws SocketException is the socket is closed
392 */
393 private void endWrite(boolean completed) throws SocketException {
394 synchronized (stateLock) {
395 writerThread = null;
396 int state = this.state;
397 if (state == ST_CLOSING)
398 tryFinishClose();
399 if (!completed && state >= ST_CLOSING)
400 throw new SocketException("Socket closed");
401 }
402 }
403
404 /**
405 * Attempts to write a sequence of bytes to the socket from the given
406 * byte array.
407 */
408 private int tryWrite(FileDescriptor fd, byte[] b, int off, int len)
409 throws IOException
410 {
411 ByteBuffer src = Util.getTemporaryDirectBuffer(len);
412 assert src.position() == 0;
413 try {
414 src.put(b, off, len);
415 return nd.write(fd, ((DirectBuffer)src).address(), len);
416 } finally {
417 Util.offerFirstTemporaryDirectBuffer(src);
418 }
419 }
420
421 /**
422 * Writes a sequence of bytes to the socket from the given byte array.
423 * @return the number of bytes written
424 * @throws SocketException if the socket is closed or a socket I/O error occurs
425 */
426 private int implWrite(byte[] b, int off, int len) throws IOException {
427 int n = 0;
428 SocketException ex = null;
429 FileDescriptor fd = beginWrite();
430 try {
431
432 // experimental
433 if (Poller.supportWriteOps() && Thread.currentThread().isVirtual()) {
434 n = Poller.write(fdVal(fd), b, off, len, this::isOpen);
435 if (n != IOStatus.UNAVAILABLE) return n;
436 }
437
438 configureNonBlockingIfNeeded(fd, false);
439 n = tryWrite(fd, b, off, len);
440 while (IOStatus.okayToRetry(n) && isOpen()) {
441 park(fd, Net.POLLOUT);
442 n = tryWrite(fd, b, off, len);
443 }
444 } catch (InterruptedIOException e) {
445 throw e;
446 } catch (IOException ioe) {
447 // translate to SocketException to maintain compatibility
448 ex = asSocketException(ioe);
449 } finally {
450 endWrite(n > 0);
451 }
452 if (ex != null) {
453 throw ex;
454 }
455 return n;
456 }
457
458 /**
459 * Writes a sequence of bytes to the socket from the given byte array.
460 * @throws SocketException if the socket is closed or a socket I/O error occurs
461 */
462 private void write(byte[] b, int off, int len) throws IOException {
463 Objects.checkFromIndexSize(off, len, b.length);
464 if (len > 0) {
465 writeLock.lock();
466 try {
467 int pos = off;
468 int end = off + len;
469 while (pos < end) {
470 // write up to MAX_BUFFER_SIZE bytes
471 int size = Math.min((end - pos), MAX_BUFFER_SIZE);
472 int n = implWrite(b, pos, size);
473 pos += n;
474 }
475 } finally {
476 writeLock.unlock();
477 }
478 }
479 }
480
481 /**
482 * Creates the socket.
483 * @param stream {@code true} for a streams socket
484 */
485 @Override
486 protected void create(boolean stream) throws IOException {
487 if (!stream) {
488 throw new IOException("Datagram socket creation not supported");
489 }
490 synchronized (stateLock) {
491 if (state != ST_NEW)
492 throw new IOException("Already created");
493 FileDescriptor fd;
494 if (server) {
495 fd = Net.serverSocket();
496 } else {
497 fd = Net.socket();
498 }
499 Runnable closer = closerFor(fd);
500 this.fd = fd;
501 this.cleaner = CleanerFactory.cleaner().register(this, closer);
502 this.state = ST_UNCONNECTED;
503 }
504 }
505
506 /**
507 * Marks the beginning of a connect operation that might block.
508 * @throws SocketException if the socket is closed or already connected
509 */
510 private FileDescriptor beginConnect(InetAddress address, int port)
511 throws IOException
512 {
513 synchronized (stateLock) {
514 int state = this.state;
515 if (state != ST_UNCONNECTED) {
516 if (state == ST_NEW)
517 throw new SocketException("Not created");
518 if (state == ST_CONNECTING)
519 throw new SocketException("Connection in progress");
520 if (state == ST_CONNECTED)
521 throw new SocketException("Already connected");
522 if (state >= ST_CLOSING)
523 throw new SocketException("Socket closed");
524 assert false;
525 }
526 this.state = ST_CONNECTING;
527
528 // save the remote address/port
529 this.address = address;
530 this.port = port;
531
532 readerThread = NativeThread.threadToSignal();
533 return fd;
534 }
535 }
536
537 /**
538 * Marks the end of a connect operation that may have blocked.
539 * @throws SocketException is the socket is closed
540 */
541 private void endConnect(FileDescriptor fd, boolean completed) throws IOException {
542 synchronized (stateLock) {
543 readerThread = null;
544 int state = this.state;
545 if (state == ST_CLOSING)
546 tryFinishClose();
547 if (completed && state == ST_CONNECTING) {
548 this.state = ST_CONNECTED;
549 localport = Net.localAddress(fd).getPort();
550 } else if (!completed && state >= ST_CLOSING) {
551 throw new SocketException("Socket closed");
552 }
553 }
554 }
555
556 /**
557 * Waits for a connection attempt to finish with a timeout
558 * @throws SocketTimeoutException if the connect timeout elapses
559 */
560 private boolean timedFinishConnect(FileDescriptor fd, long nanos) throws IOException {
561 long startNanos = System.nanoTime();
562 boolean polled = Net.pollConnectNow(fd);
563 while (!polled && isOpen()) {
564 long remainingNanos = nanos - (System.nanoTime() - startNanos);
565 if (remainingNanos <= 0) {
566 throw new SocketTimeoutException("Connect timed out");
567 }
568 park(fd, Net.POLLOUT, remainingNanos);
569 polled = Net.pollConnectNow(fd);
570 }
571 return polled && isOpen();
572 }
573
574 /**
575 * Attempts to establish a connection to the given socket address with a
576 * timeout. Closes the socket if connection cannot be established.
577 * @throws IOException if the address is not a resolved InetSocketAddress or
578 * the connection cannot be established
579 */
580 @Override
581 protected void connect(SocketAddress remote, int millis) throws IOException {
582 // SocketImpl connect only specifies IOException
583 if (!(remote instanceof InetSocketAddress))
584 throw new IOException("Unsupported address type");
585 InetSocketAddress isa = (InetSocketAddress) remote;
586 if (isa.isUnresolved()) {
587 throw new UnknownHostException(
588 formatMsg(filterNonSocketInfo(isa.getHostName())));
589 }
590
591 InetAddress address = isa.getAddress();
592 if (address.isAnyLocalAddress())
593 address = InetAddress.getLocalHost();
594 int port = isa.getPort();
595
596 ReentrantLock connectLock = readLock;
597 try {
598 connectLock.lock();
599 try {
600 boolean connected = false;
601 FileDescriptor fd = beginConnect(address, port);
602 try {
603 configureNonBlockingIfNeeded(fd, millis > 0);
604 int n = Net.connect(fd, address, port);
605 if (n > 0) {
606 // connection established
607 connected = true;
608 } else {
609 assert IOStatus.okayToRetry(n);
610 if (millis > 0) {
611 // finish connect with timeout
612 long nanos = MILLISECONDS.toNanos(millis);
613 connected = timedFinishConnect(fd, nanos);
614 } else {
615 // finish connect, no timeout
616 boolean polled = false;
617 while (!polled && isOpen()) {
618 park(fd, Net.POLLOUT);
619 polled = Net.pollConnectNow(fd);
620 }
621 connected = polled && isOpen();
622 }
623 }
624 } finally {
625 endConnect(fd, connected);
626 }
627 } finally {
628 connectLock.unlock();
629 }
630 } catch (IOException ioe) {
631 close();
632 if (ioe instanceof SocketTimeoutException) {
633 throw ioe;
634 } else if (ioe instanceof InterruptedIOException) {
635 assert Thread.currentThread().isVirtual();
636 throw new SocketException("Closed by interrupt");
637 } else {
638 throw Exceptions.ioException(ioe, isa);
639 }
640 }
641 }
642
643 @Override
644 protected void connect(String host, int port) throws IOException {
645 connect(new InetSocketAddress(host, port), 0);
646 }
647
648 @Override
649 protected void connect(InetAddress address, int port) throws IOException {
650 connect(new InetSocketAddress(address, port), 0);
651 }
652
653 @Override
654 protected void bind(InetAddress host, int port) throws IOException {
655 synchronized (stateLock) {
656 ensureOpen();
657 if (localport != 0)
658 throw new SocketException("Already bound");
659 Net.bind(fd, host, port);
660 // set the address field to the given host address to
661 // maintain long standing behavior. When binding to 0.0.0.0
662 // then the actual local address will be ::0 when IPv6 is enabled.
663 address = host;
664 localport = Net.localAddress(fd).getPort();
665 }
666 }
667
668 @Override
669 protected void listen(int backlog) throws IOException {
670 synchronized (stateLock) {
671 ensureOpen();
672 if (localport == 0)
673 throw new SocketException("Not bound");
674 Net.listen(fd, backlog < 1 ? 50 : backlog);
675 }
676 }
677
678 /**
679 * Marks the beginning of an accept operation that might block.
680 * @throws SocketException if the socket is closed
681 */
682 private FileDescriptor beginAccept() throws SocketException {
683 synchronized (stateLock) {
684 ensureOpen();
685 if (localport == 0)
686 throw new SocketException("Not bound");
687 readerThread = NativeThread.threadToSignal();
688 return fd;
689 }
690 }
691
692 /**
693 * Marks the end of an accept operation that may have blocked.
694 * @throws SocketException is the socket is closed
695 */
696 private void endAccept(boolean completed) throws SocketException {
697 synchronized (stateLock) {
698 int state = this.state;
699 readerThread = null;
700 if (state == ST_CLOSING)
701 tryFinishClose();
702 if (!completed && state >= ST_CLOSING)
703 throw new SocketException("Socket closed");
704 }
705 }
706
707 /**
708 * Accepts a new connection with a timeout.
709 * @throws SocketTimeoutException if the accept timeout elapses
710 */
711 private int timedAccept(FileDescriptor fd,
712 FileDescriptor newfd,
713 InetSocketAddress[] isaa,
714 long nanos)
715 throws IOException
716 {
717 long startNanos = System.nanoTime();
718 int n = Net.accept(fd, newfd, isaa);
719 while (n == IOStatus.UNAVAILABLE && isOpen()) {
720 long remainingNanos = nanos - (System.nanoTime() - startNanos);
721 if (remainingNanos <= 0) {
722 throw new SocketTimeoutException("Accept timed out");
723 }
724 park(fd, Net.POLLIN, remainingNanos);
725 n = Net.accept(fd, newfd, isaa);
726 }
727 return n;
728 }
729
730 /**
731 * Accepts a new connection so that the given SocketImpl is connected to
732 * the peer. The SocketImpl must be a newly created NioSocketImpl.
733 */
734 @Override
735 protected void accept(SocketImpl si) throws IOException {
736 NioSocketImpl nsi = (NioSocketImpl) si;
737 if (nsi.state != ST_NEW)
738 throw new SocketException("Not a newly created SocketImpl");
739
740 FileDescriptor newfd = new FileDescriptor();
741 InetSocketAddress[] isaa = new InetSocketAddress[1];
742
743 // acquire the lock, adjusting the timeout for cases where several
744 // threads are accepting connections and there is a timeout set
745 ReentrantLock acceptLock = readLock;
746 int timeout = this.timeout;
747 long remainingNanos = 0;
748 if (timeout > 0) {
749 remainingNanos = tryLock(acceptLock, timeout, MILLISECONDS);
750 if (remainingNanos <= 0) {
751 assert !acceptLock.isHeldByCurrentThread();
752 throw new SocketTimeoutException("Accept timed out");
753 }
754 } else {
755 acceptLock.lock();
756 }
757
758 // accept a connection
759 try {
760 int n = 0;
761 FileDescriptor fd = beginAccept();
762 try {
763 configureNonBlockingIfNeeded(fd, remainingNanos > 0);
764 if (remainingNanos > 0) {
765 // accept with timeout
766 n = timedAccept(fd, newfd, isaa, remainingNanos);
767 } else {
768 // accept, no timeout
769 n = Net.accept(fd, newfd, isaa);
770 while (IOStatus.okayToRetry(n) && isOpen()) {
771 park(fd, Net.POLLIN);
772 n = Net.accept(fd, newfd, isaa);
773 }
774 }
775 } finally {
776 endAccept(n > 0);
777 assert IOStatus.check(n);
778 }
779 } finally {
780 acceptLock.unlock();
781 }
782
783 // get local address and configure accepted socket to blocking mode
784 InetSocketAddress localAddress;
785 try {
786 localAddress = Net.localAddress(newfd);
787 IOUtil.configureBlocking(newfd, true);
788 } catch (IOException ioe) {
789 nd.close(newfd);
790 throw ioe;
791 }
792
793 // set the fields
794 Runnable closer = closerFor(newfd);
795 synchronized (nsi.stateLock) {
796 nsi.fd = newfd;
797 nsi.cleaner = CleanerFactory.cleaner().register(nsi, closer);
798 nsi.localport = localAddress.getPort();
799 nsi.address = isaa[0].getAddress();
800 nsi.port = isaa[0].getPort();
801 nsi.state = ST_CONNECTED;
802 }
803 }
804
805 @Override
806 protected InputStream getInputStream() {
807 return new InputStream() {
808 @Override
809 public int read() throws IOException {
810 byte[] a = new byte[1];
811 int n = read(a, 0, 1);
812 return (n > 0) ? (a[0] & 0xff) : -1;
813 }
814 @Override
815 public int read(byte[] b, int off, int len) throws IOException {
816 return NioSocketImpl.this.read(b, off, len);
817 }
818 @Override
819 public int available() throws IOException {
820 return NioSocketImpl.this.available();
821 }
822 @Override
823 public void close() throws IOException {
824 NioSocketImpl.this.close();
825 }
826 };
827 }
828
829 @Override
830 protected OutputStream getOutputStream() {
831 return new OutputStream() {
832 @Override
833 public void write(int b) throws IOException {
834 byte[] a = new byte[]{(byte) b};
835 write(a, 0, 1);
836 }
837 @Override
838 public void write(byte[] b, int off, int len) throws IOException {
839 NioSocketImpl.this.write(b, off, len);
840 }
841 @Override
842 public void close() throws IOException {
843 NioSocketImpl.this.close();
844 }
845 };
846 }
847
848 @Override
849 protected int available() throws IOException {
850 synchronized (stateLock) {
851 ensureOpenAndConnected();
852 if (isInputClosed) {
853 return 0;
854 } else {
855 return Net.available(fd);
856 }
857 }
858 }
859
860 /**
861 * Closes the socket if there are no I/O operations in progress.
862 */
863 private boolean tryClose() throws IOException {
864 assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
865 if (readerThread == null && writerThread == null) {
866 try {
867 cleaner.clean();
868 } catch (UncheckedIOException ioe) {
869 throw ioe.getCause();
870 } finally {
871 state = ST_CLOSED;
872 }
873 return true;
874 } else {
875 return false;
876 }
877 }
878
879 /**
880 * Invokes tryClose to attempt to close the socket.
881 *
882 * This method is used for deferred closing by I/O operations.
883 */
884 private void tryFinishClose() {
885 try {
886 tryClose();
887 } catch (IOException ignore) { }
888 }
889
890 /**
891 * Closes the socket. If there are I/O operations in progress then the
892 * socket is pre-closed and the threads are signalled. The socket will be
893 * closed when the last I/O operation aborts.
894 */
895 @Override
896 protected void close() throws IOException {
897 synchronized (stateLock) {
898 int state = this.state;
899 if (state >= ST_CLOSING)
900 return;
901 if (state == ST_NEW) {
902 // stillborn
903 this.state = ST_CLOSED;
904 return;
905 }
906 boolean connected = (state == ST_CONNECTED);
907 this.state = ST_CLOSING;
908
909 // shutdown output when linger interval not set to 0
910 if (connected) {
911 try {
912 var SO_LINGER = StandardSocketOptions.SO_LINGER;
913 if ((int) Net.getSocketOption(fd, SO_LINGER) != 0) {
914 Net.shutdown(fd, Net.SHUT_WR);
915 }
916 } catch (IOException ignore) { }
917 }
918
919 // attempt to close the socket. If there are I/O operations in progress
920 // then the socket is pre-closed and the thread(s) signalled. The
921 // last thread will close the file descriptor.
922 if (!tryClose()) {
923 nd.preClose(fd, readerThread, writerThread);
924 }
925 }
926 }
927
928 // the socket options supported by client and server sockets
929 private static volatile Set<SocketOption<?>> clientSocketOptions;
930 private static volatile Set<SocketOption<?>> serverSocketOptions;
931
932 @Override
933 protected Set<SocketOption<?>> supportedOptions() {
934 Set<SocketOption<?>> options = (server) ? serverSocketOptions : clientSocketOptions;
935 if (options == null) {
936 options = new HashSet<>();
937 options.add(StandardSocketOptions.SO_RCVBUF);
938 options.add(StandardSocketOptions.SO_REUSEADDR);
939 if (server) {
940 // IP_TOS added for server socket to maintain compatibility
941 options.add(StandardSocketOptions.IP_TOS);
942 options.addAll(ExtendedSocketOptions.serverSocketOptions());
943 } else {
944 options.add(StandardSocketOptions.IP_TOS);
945 options.add(StandardSocketOptions.SO_KEEPALIVE);
946 options.add(StandardSocketOptions.SO_SNDBUF);
947 options.add(StandardSocketOptions.SO_LINGER);
948 options.add(StandardSocketOptions.TCP_NODELAY);
949 options.addAll(ExtendedSocketOptions.clientSocketOptions());
950 }
951 if (Net.isReusePortAvailable())
952 options.add(StandardSocketOptions.SO_REUSEPORT);
953 options = Collections.unmodifiableSet(options);
954 if (server) {
955 serverSocketOptions = options;
956 } else {
957 clientSocketOptions = options;
958 }
959 }
960 return options;
961 }
962
963 @Override
964 protected <T> void setOption(SocketOption<T> opt, T value) throws IOException {
965 if (!supportedOptions().contains(opt))
966 throw new UnsupportedOperationException("'" + opt + "' not supported");
967 if (!opt.type().isInstance(value))
968 throw new IllegalArgumentException("Invalid value '" + value + "'");
969 synchronized (stateLock) {
970 ensureOpen();
971 if (opt == StandardSocketOptions.IP_TOS) {
972 // maps to IPV6_TCLASS and/or IP_TOS
973 Net.setIpSocketOption(fd, family(), opt, value);
974 } else if (opt == StandardSocketOptions.SO_REUSEADDR) {
975 boolean b = (boolean) value;
976 if (Net.useExclusiveBind()) {
977 isReuseAddress = b;
978 } else {
979 Net.setSocketOption(fd, opt, b);
980 }
981 } else {
982 // option does not need special handling
983 Net.setSocketOption(fd, opt, value);
984 }
985 }
986 }
987
988 @SuppressWarnings("unchecked")
989 protected <T> T getOption(SocketOption<T> opt) throws IOException {
990 if (!supportedOptions().contains(opt))
991 throw new UnsupportedOperationException("'" + opt + "' not supported");
992 synchronized (stateLock) {
993 ensureOpen();
994 if (opt == StandardSocketOptions.IP_TOS) {
995 return (T) Net.getSocketOption(fd, family(), opt);
996 } else if (opt == StandardSocketOptions.SO_REUSEADDR) {
997 if (Net.useExclusiveBind()) {
998 return (T) Boolean.valueOf(isReuseAddress);
999 } else {
1000 return (T) Net.getSocketOption(fd, opt);
1001 }
1002 } else {
1003 // option does not need special handling
1004 return (T) Net.getSocketOption(fd, opt);
1005 }
1006 }
1007 }
1008
1009 private boolean booleanValue(Object value, String desc) throws SocketException {
1010 if (!(value instanceof Boolean))
1011 throw new SocketException("Bad value for " + desc);
1012 return (boolean) value;
1013 }
1014
1015 private int intValue(Object value, String desc) throws SocketException {
1016 if (!(value instanceof Integer))
1017 throw new SocketException("Bad value for " + desc);
1018 return (int) value;
1019 }
1020
1021 @Override
1022 public void setOption(int opt, Object value) throws SocketException {
1023 synchronized (stateLock) {
1024 ensureOpen();
1025 try {
1026 switch (opt) {
1027 case SO_LINGER: {
1028 // the value is "false" to disable, or linger interval to enable
1029 int i;
1030 if (value instanceof Boolean && ((boolean) value) == false) {
1031 i = -1;
1032 } else {
1033 i = intValue(value, "SO_LINGER");
1034 }
1035 Net.setSocketOption(fd, StandardSocketOptions.SO_LINGER, i);
1036 break;
1037 }
1038 case SO_TIMEOUT: {
1039 int i = intValue(value, "SO_TIMEOUT");
1040 if (i < 0)
1041 throw new IllegalArgumentException("timeout < 0");
1042 timeout = i;
1043 break;
1044 }
1045 case IP_TOS: {
1046 int i = intValue(value, "IP_TOS");
1047 Net.setIpSocketOption(fd, family(), StandardSocketOptions.IP_TOS, i);
1048 break;
1049 }
1050 case TCP_NODELAY: {
1051 boolean b = booleanValue(value, "TCP_NODELAY");
1052 Net.setSocketOption(fd, StandardSocketOptions.TCP_NODELAY, b);
1053 break;
1054 }
1055 case SO_SNDBUF: {
1056 int i = intValue(value, "SO_SNDBUF");
1057 if (i <= 0)
1058 throw new SocketException("SO_SNDBUF <= 0");
1059 Net.setSocketOption(fd, StandardSocketOptions.SO_SNDBUF, i);
1060 break;
1061 }
1062 case SO_RCVBUF: {
1063 int i = intValue(value, "SO_RCVBUF");
1064 if (i <= 0)
1065 throw new SocketException("SO_RCVBUF <= 0");
1066 Net.setSocketOption(fd, StandardSocketOptions.SO_RCVBUF, i);
1067 break;
1068 }
1069 case SO_KEEPALIVE: {
1070 boolean b = booleanValue(value, "SO_KEEPALIVE");
1071 Net.setSocketOption(fd, StandardSocketOptions.SO_KEEPALIVE, b);
1072 break;
1073 }
1074 case SO_OOBINLINE: {
1075 boolean b = booleanValue(value, "SO_OOBINLINE");
1076 Net.setSocketOption(fd, ExtendedSocketOption.SO_OOBINLINE, b);
1077 break;
1078 }
1079 case SO_REUSEADDR: {
1080 boolean b = booleanValue(value, "SO_REUSEADDR");
1081 if (Net.useExclusiveBind()) {
1082 isReuseAddress = b;
1083 } else {
1084 Net.setSocketOption(fd, StandardSocketOptions.SO_REUSEADDR, b);
1085 }
1086 break;
1087 }
1088 case SO_REUSEPORT: {
1089 if (!Net.isReusePortAvailable())
1090 throw new SocketException("SO_REUSEPORT not supported");
1091 boolean b = booleanValue(value, "SO_REUSEPORT");
1092 Net.setSocketOption(fd, StandardSocketOptions.SO_REUSEPORT, b);
1093 break;
1094 }
1095 default:
1096 throw new SocketException("Unknown option " + opt);
1097 }
1098 } catch (SocketException e) {
1099 throw e;
1100 } catch (IllegalArgumentException | IOException e) {
1101 throw asSocketException(e);
1102 }
1103 }
1104 }
1105
1106 @Override
1107 public Object getOption(int opt) throws SocketException {
1108 synchronized (stateLock) {
1109 ensureOpen();
1110 try {
1111 switch (opt) {
1112 case SO_TIMEOUT:
1113 return timeout;
1114 case TCP_NODELAY:
1115 return Net.getSocketOption(fd, StandardSocketOptions.TCP_NODELAY);
1116 case SO_OOBINLINE:
1117 return Net.getSocketOption(fd, ExtendedSocketOption.SO_OOBINLINE);
1118 case SO_LINGER: {
1119 // return "false" when disabled, linger interval when enabled
1120 int i = (int) Net.getSocketOption(fd, StandardSocketOptions.SO_LINGER);
1121 if (i == -1) {
1122 return Boolean.FALSE;
1123 } else {
1124 return i;
1125 }
1126 }
1127 case SO_REUSEADDR:
1128 if (Net.useExclusiveBind()) {
1129 return isReuseAddress;
1130 } else {
1131 return Net.getSocketOption(fd, StandardSocketOptions.SO_REUSEADDR);
1132 }
1133 case SO_BINDADDR:
1134 return Net.localAddress(fd).getAddress();
1135 case SO_SNDBUF:
1136 return Net.getSocketOption(fd, StandardSocketOptions.SO_SNDBUF);
1137 case SO_RCVBUF:
1138 return Net.getSocketOption(fd, StandardSocketOptions.SO_RCVBUF);
1139 case IP_TOS:
1140 return Net.getSocketOption(fd, family(), StandardSocketOptions.IP_TOS);
1141 case SO_KEEPALIVE:
1142 return Net.getSocketOption(fd, StandardSocketOptions.SO_KEEPALIVE);
1143 case SO_REUSEPORT:
1144 if (!Net.isReusePortAvailable())
1145 throw new SocketException("SO_REUSEPORT not supported");
1146 return Net.getSocketOption(fd, StandardSocketOptions.SO_REUSEPORT);
1147 default:
1148 throw new SocketException("Unknown option " + opt);
1149 }
1150 } catch (SocketException e) {
1151 throw e;
1152 } catch (IllegalArgumentException | IOException e) {
1153 throw asSocketException(e);
1154 }
1155 }
1156 }
1157
1158 @Override
1159 protected void shutdownInput() throws IOException {
1160 synchronized (stateLock) {
1161 ensureOpenAndConnected();
1162 if (!isInputClosed) {
1163 Net.shutdown(fd, Net.SHUT_RD);
1164 if (readerThread != null && readerThread.isVirtual()) {
1165 Poller.stopPoll(readerThread);
1166 }
1167 isInputClosed = true;
1168 }
1169 }
1170 }
1171
1172 @Override
1173 protected void shutdownOutput() throws IOException {
1174 synchronized (stateLock) {
1175 ensureOpenAndConnected();
1176 if (!isOutputClosed) {
1177 Net.shutdown(fd, Net.SHUT_WR);
1178 if (writerThread != null && writerThread.isVirtual()) {
1179 Poller.stopPoll(writerThread);
1180 }
1181 isOutputClosed = true;
1182 }
1183 }
1184 }
1185
1186 @Override
1187 protected boolean supportsUrgentData() {
1188 return true;
1189 }
1190
1191 @Override
1192 protected void sendUrgentData(int data) throws IOException {
1193 writeLock.lock();
1194 try {
1195 int n = 0;
1196 FileDescriptor fd = beginWrite();
1197 try {
1198 configureNonBlockingIfNeeded(fd, false);
1199 do {
1200 n = Net.sendOOB(fd, (byte) data);
1201 } while (n == IOStatus.INTERRUPTED && isOpen());
1202 if (n == IOStatus.UNAVAILABLE) {
1203 throw new SocketException("No buffer space available");
1204 }
1205 } finally {
1206 endWrite(n > 0);
1207 }
1208 } finally {
1209 writeLock.unlock();
1210 }
1211 }
1212
1213 /**
1214 * Returns an action to close the given file descriptor.
1215 */
1216 private static Runnable closerFor(FileDescriptor fd) {
1217 return () -> {
1218 try {
1219 nd.close(fd);
1220 } catch (IOException ioe) {
1221 throw new UncheckedIOException(ioe);
1222 }
1223 };
1224 }
1225
1226 /**
1227 * Attempts to acquire the given lock within the given waiting time.
1228 * @return the remaining time in nanoseconds when the lock is acquired, zero
1229 * or less if the lock was not acquired before the timeout expired
1230 */
1231 private static long tryLock(ReentrantLock lock, long timeout, TimeUnit unit) {
1232 assert timeout > 0;
1233 boolean interrupted = false;
1234 long nanos = unit.toNanos(timeout);
1235 long remainingNanos = nanos;
1236 long startNanos = System.nanoTime();
1237 boolean acquired = false;
1238 while (!acquired && (remainingNanos > 0)) {
1239 try {
1240 acquired = lock.tryLock(remainingNanos, NANOSECONDS);
1241 } catch (InterruptedException e) {
1242 interrupted = true;
1243 }
1244 remainingNanos = nanos - (System.nanoTime() - startNanos);
1245 }
1246 if (acquired && remainingNanos <= 0L)
1247 lock.unlock(); // release lock if timeout has expired
1248 if (interrupted)
1249 Thread.currentThread().interrupt();
1250 return remainingNanos;
1251 }
1252
1253 /**
1254 * Creates a SocketException from the given exception.
1255 */
1256 private static SocketException asSocketException(Exception e) {
1257 if (e instanceof SocketException se) {
1258 return se;
1259 } else {
1260 var se = new SocketException(e.getMessage());
1261 se.setStackTrace(e.getStackTrace());
1262 return se;
1263 }
1264 }
1265
1266 /**
1267 * Returns the socket protocol family.
1268 */
1269 private static ProtocolFamily family() {
1270 if (Net.isIPv6Available()) {
1271 return StandardProtocolFamily.INET6;
1272 } else {
1273 return StandardProtocolFamily.INET;
1274 }
1275 }
1276
1277 /**
1278 * Return the file descriptor value.
1279 */
1280 private static int fdVal(FileDescriptor fd) {
1281 return JIOFDA.get(fd);
1282 }
1283
1284 private static final JavaIOFileDescriptorAccess JIOFDA = SharedSecrets.getJavaIOFileDescriptorAccess();
1285 }
--- EOF ---