1 /* 2 * Copyright (c) 2019, 2025, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.io.FileDescriptor; 29 import java.io.IOException; 30 import java.io.InputStream; 31 import java.io.InterruptedIOException; 32 import java.io.OutputStream; 33 import java.io.UncheckedIOException; 34 import java.lang.ref.Cleaner.Cleanable; 35 import java.net.InetAddress; 36 import java.net.InetSocketAddress; 37 import java.net.ProtocolFamily; 38 import java.net.SocketAddress; 39 import java.net.SocketException; 40 import java.net.SocketImpl; 41 import java.net.SocketOption; 42 import java.net.SocketTimeoutException; 43 import java.net.StandardProtocolFamily; 44 import java.net.StandardSocketOptions; 45 import java.net.UnknownHostException; 46 import java.nio.ByteBuffer; 47 import java.util.Collections; 48 import java.util.HashSet; 49 import java.util.Objects; 50 import java.util.Set; 51 import java.util.concurrent.TimeUnit; 52 import java.util.concurrent.locks.ReentrantLock; 53 54 import jdk.internal.access.JavaIOFileDescriptorAccess; 55 import jdk.internal.access.SharedSecrets; 56 import jdk.internal.ref.CleanerFactory; 57 import sun.net.ConnectionResetException; 58 import sun.net.NetHooks; 59 import sun.net.PlatformSocketImpl; 60 import sun.net.ext.ExtendedSocketOptions; 61 import jdk.internal.util.Exceptions; 62 63 import static java.util.concurrent.TimeUnit.MILLISECONDS; 64 import static java.util.concurrent.TimeUnit.NANOSECONDS; 65 import static jdk.internal.util.Exceptions.filterNonSocketInfo; 66 import static jdk.internal.util.Exceptions.formatMsg; 67 68 /** 69 * NIO based SocketImpl. 70 * 71 * The underlying socket used by this SocketImpl is initially configured blocking. 72 * If a connect, accept or read is attempted with a timeout, or a virtual 73 * thread invokes a blocking operation, then the socket is changed to non-blocking 74 * When in non-blocking mode, operations that don't complete immediately will 75 * poll the socket (or park when invoked on a virtual thread) and preserve 76 * the semantics of blocking operations. 77 */ 78 79 public final class NioSocketImpl extends SocketImpl implements PlatformSocketImpl { 80 private static final NativeDispatcher nd = new SocketDispatcher(); 81 82 // The maximum number of bytes to read/write per syscall to avoid needing 83 // a huge buffer from the temporary buffer cache 84 private static final int MAX_BUFFER_SIZE = 128 * 1024; 85 86 // true if this is a SocketImpl for a ServerSocket 87 private final boolean server; 88 89 // Lock held when reading (also used when accepting or connecting) 90 private final ReentrantLock readLock = new ReentrantLock(); 91 92 // Lock held when writing 93 private final ReentrantLock writeLock = new ReentrantLock(); 94 95 // The stateLock for read/changing state 96 private final Object stateLock = new Object(); 97 private static final int ST_NEW = 0; 98 private static final int ST_UNCONNECTED = 1; 99 private static final int ST_CONNECTING = 2; 100 private static final int ST_CONNECTED = 3; 101 private static final int ST_CLOSING = 4; 102 private static final int ST_CLOSED = 5; 103 private volatile int state; // need stateLock to change 104 105 private Cleanable cleaner; 106 107 // set to true when the socket is in non-blocking mode 108 private volatile boolean nonBlocking; 109 110 // used by connect/read/write/accept, protected by stateLock 111 private Thread readerThread; 112 private Thread writerThread; 113 114 // used when SO_REUSEADDR is emulated, protected by stateLock 115 private boolean isReuseAddress; 116 117 // read or accept timeout in millis 118 private volatile int timeout; 119 120 // flags to indicate if the connection is shutdown for input and output 121 private volatile boolean isInputClosed; 122 private volatile boolean isOutputClosed; 123 124 // used by read to emulate legacy behavior, protected by readLock 125 private boolean readEOF; 126 private boolean connectionReset; 127 128 /** 129 * Creates an instance of this SocketImpl. 130 * @param server true if this is a SocketImpl for a ServerSocket 131 */ 132 public NioSocketImpl(boolean server) { 133 this.server = server; 134 } 135 136 /** 137 * Returns true if the socket is open. 138 */ 139 private boolean isOpen() { 140 return state < ST_CLOSING; 141 } 142 143 /** 144 * Throws SocketException if the socket is not open. 145 */ 146 private void ensureOpen() throws SocketException { 147 int state = this.state; 148 if (state == ST_NEW) 149 throw new SocketException("Socket not created"); 150 if (state >= ST_CLOSING) 151 throw new SocketException("Socket closed"); 152 } 153 154 /** 155 * Throws SocketException if the socket is not open and connected. 156 */ 157 private void ensureOpenAndConnected() throws SocketException { 158 int state = this.state; 159 if (state < ST_CONNECTED) 160 throw new SocketException("Not connected"); 161 if (state > ST_CONNECTED) 162 throw new SocketException("Socket closed"); 163 } 164 165 /** 166 * Disables the current thread for scheduling purposes until the 167 * socket is ready for I/O or is asynchronously closed, for up to the 168 * specified waiting time. 169 * @throws IOException if an I/O error occurs 170 */ 171 private void park(FileDescriptor fd, int event, long nanos) throws IOException { 172 Thread t = Thread.currentThread(); 173 if (t.isVirtual()) { 174 Poller.poll(fdVal(fd), event, nanos, this::isOpen); 175 if (t.isInterrupted()) { 176 throw new InterruptedIOException(); 177 } 178 } else { 179 long millis; 180 if (nanos == 0) { 181 millis = -1; 182 } else { 183 millis = NANOSECONDS.toMillis(nanos); 184 if (nanos > MILLISECONDS.toNanos(millis)) { 185 // Round up any excess nanos to the nearest millisecond to 186 // avoid parking for less than requested. 187 millis++; 188 } 189 } 190 Net.poll(fd, event, millis); 191 } 192 } 193 194 /** 195 * Disables the current thread for scheduling purposes until the socket is 196 * ready for I/O or is asynchronously closed. 197 * @throws IOException if an I/O error occurs 198 */ 199 private void park(FileDescriptor fd, int event) throws IOException { 200 park(fd, event, 0); 201 } 202 203 /** 204 * Ensures that the socket is configured non-blocking invoked on a virtual 205 * thread or the operation has a timeout 206 * @throws IOException if there is an I/O error changing the blocking mode 207 */ 208 private void configureNonBlockingIfNeeded(FileDescriptor fd, boolean timed) 209 throws IOException 210 { 211 if (!nonBlocking 212 && (timed || Thread.currentThread().isVirtual())) { 213 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); 214 IOUtil.configureBlocking(fd, false); 215 nonBlocking = true; 216 } 217 } 218 219 /** 220 * Marks the beginning of a read operation that might block. 221 * @throws SocketException if the socket is closed or not connected 222 */ 223 private FileDescriptor beginRead() throws SocketException { 224 synchronized (stateLock) { 225 ensureOpenAndConnected(); 226 readerThread = NativeThread.threadToSignal(); 227 return fd; 228 } 229 } 230 231 /** 232 * Marks the end of a read operation that may have blocked. 233 * @throws SocketException is the socket is closed 234 */ 235 private void endRead(boolean completed) throws SocketException { 236 synchronized (stateLock) { 237 readerThread = null; 238 int state = this.state; 239 if (state == ST_CLOSING) 240 tryFinishClose(); 241 if (!completed && state >= ST_CLOSING) 242 throw new SocketException("Socket closed"); 243 } 244 } 245 246 /** 247 * Attempts to read bytes from the socket into the given byte array. 248 */ 249 private int tryRead(FileDescriptor fd, byte[] b, int off, int len) 250 throws IOException 251 { 252 ByteBuffer dst = Util.getTemporaryDirectBuffer(len); 253 assert dst.position() == 0; 254 try { 255 int n = nd.read(fd, ((DirectBuffer)dst).address(), len); 256 if (n > 0) { 257 dst.get(b, off, n); 258 } 259 return n; 260 } finally { 261 Util.offerFirstTemporaryDirectBuffer(dst); 262 } 263 } 264 265 /** 266 * Reads bytes from the socket into the given byte array with a timeout. 267 * @throws SocketTimeoutException if the read timeout elapses 268 */ 269 private int timedRead(FileDescriptor fd, byte[] b, int off, int len, long nanos) 270 throws IOException 271 { 272 long startNanos = System.nanoTime(); 273 int n = tryRead(fd, b, off, len); 274 while (n == IOStatus.UNAVAILABLE && isOpen()) { 275 long remainingNanos = nanos - (System.nanoTime() - startNanos); 276 if (remainingNanos <= 0) { 277 throw new SocketTimeoutException("Read timed out"); 278 } 279 park(fd, Net.POLLIN, remainingNanos); 280 n = tryRead(fd, b, off, len); 281 } 282 return n; 283 } 284 285 /** 286 * Reads bytes from the socket into the given byte array. 287 * @return the number of bytes read or -1 at EOF 288 * @throws SocketException if the socket is closed or a socket I/O error occurs 289 * @throws SocketTimeoutException if the read timeout elapses 290 */ 291 private int implRead(byte[] b, int off, int len, long remainingNanos) throws IOException { 292 int n = 0; 293 FileDescriptor fd = beginRead(); 294 try { 295 if (connectionReset) 296 throw new SocketException("Connection reset"); 297 if (isInputClosed) 298 return -1; 299 300 // experimental 301 if (Poller.supportReadOps() && Thread.currentThread().isVirtual()) { 302 n = Poller.read(fdVal(fd), b, off, len, remainingNanos, this::isOpen); 303 if (n != IOStatus.UNAVAILABLE) return n; 304 } 305 306 configureNonBlockingIfNeeded(fd, remainingNanos > 0); 307 if (remainingNanos > 0) { 308 // read with timeout 309 n = timedRead(fd, b, off, len, remainingNanos); 310 } else { 311 // read, no timeout 312 n = tryRead(fd, b, off, len); 313 while (IOStatus.okayToRetry(n) && isOpen()) { 314 park(fd, Net.POLLIN); 315 n = tryRead(fd, b, off, len); 316 } 317 } 318 return n; 319 } catch (InterruptedIOException e) { 320 throw e; 321 } catch (ConnectionResetException e) { 322 connectionReset = true; 323 throw new SocketException("Connection reset"); 324 } catch (IOException ioe) { 325 // throw SocketException to maintain compatibility 326 throw asSocketException(ioe); 327 } finally { 328 endRead(n > 0); 329 } 330 } 331 332 /** 333 * Reads bytes from the socket into the given byte array. 334 * @return the number of bytes read or -1 at EOF 335 * @throws IndexOutOfBoundsException if the bound checks fail 336 * @throws SocketException if the socket is closed or a socket I/O error occurs 337 * @throws SocketTimeoutException if the read timeout elapses 338 */ 339 private int read(byte[] b, int off, int len) throws IOException { 340 Objects.checkFromIndexSize(off, len, b.length); 341 if (len == 0) { 342 return 0; 343 } else { 344 long remainingNanos = 0; 345 int timeout = this.timeout; 346 if (timeout > 0) { 347 remainingNanos = tryLock(readLock, timeout, MILLISECONDS); 348 if (remainingNanos <= 0) { 349 assert !readLock.isHeldByCurrentThread(); 350 throw new SocketTimeoutException("Read timed out"); 351 } 352 } else { 353 readLock.lock(); 354 } 355 try { 356 // emulate legacy behavior to return -1, even if socket is closed 357 if (readEOF) 358 return -1; 359 // read up to MAX_BUFFER_SIZE bytes 360 int size = Math.min(len, MAX_BUFFER_SIZE); 361 int n = implRead(b, off, size, remainingNanos); 362 if (n == -1) 363 readEOF = true; 364 return n; 365 } finally { 366 readLock.unlock(); 367 } 368 } 369 } 370 371 /** 372 * Marks the beginning of a write operation that might block. 373 * @throws SocketException if the socket is closed or not connected 374 */ 375 private FileDescriptor beginWrite() throws SocketException { 376 synchronized (stateLock) { 377 ensureOpenAndConnected(); 378 writerThread = NativeThread.threadToSignal(); 379 return fd; 380 } 381 } 382 383 /** 384 * Marks the end of a write operation that may have blocked. 385 * @throws SocketException is the socket is closed 386 */ 387 private void endWrite(boolean completed) throws SocketException { 388 synchronized (stateLock) { 389 writerThread = null; 390 int state = this.state; 391 if (state == ST_CLOSING) 392 tryFinishClose(); 393 if (!completed && state >= ST_CLOSING) 394 throw new SocketException("Socket closed"); 395 } 396 } 397 398 /** 399 * Attempts to write a sequence of bytes to the socket from the given 400 * byte array. 401 */ 402 private int tryWrite(FileDescriptor fd, byte[] b, int off, int len) 403 throws IOException 404 { 405 ByteBuffer src = Util.getTemporaryDirectBuffer(len); 406 assert src.position() == 0; 407 try { 408 src.put(b, off, len); 409 return nd.write(fd, ((DirectBuffer)src).address(), len); 410 } finally { 411 Util.offerFirstTemporaryDirectBuffer(src); 412 } 413 } 414 415 /** 416 * Writes a sequence of bytes to the socket from the given byte array. 417 * @return the number of bytes written 418 * @throws SocketException if the socket is closed or a socket I/O error occurs 419 */ 420 private int implWrite(byte[] b, int off, int len) throws IOException { 421 int n = 0; 422 FileDescriptor fd = beginWrite(); 423 try { 424 425 // experimental 426 if (Poller.supportWriteOps() && Thread.currentThread().isVirtual()) { 427 n = Poller.write(fdVal(fd), b, off, len, this::isOpen); 428 if (n != IOStatus.UNAVAILABLE) return n; 429 } 430 431 configureNonBlockingIfNeeded(fd, false); 432 n = tryWrite(fd, b, off, len); 433 while (IOStatus.okayToRetry(n) && isOpen()) { 434 park(fd, Net.POLLOUT); 435 n = tryWrite(fd, b, off, len); 436 } 437 return n; 438 } catch (InterruptedIOException e) { 439 throw e; 440 } catch (IOException ioe) { 441 // throw SocketException to maintain compatibility 442 throw asSocketException(ioe); 443 } finally { 444 endWrite(n > 0); 445 } 446 } 447 448 /** 449 * Writes a sequence of bytes to the socket from the given byte array. 450 * @throws SocketException if the socket is closed or a socket I/O error occurs 451 */ 452 private void write(byte[] b, int off, int len) throws IOException { 453 Objects.checkFromIndexSize(off, len, b.length); 454 if (len > 0) { 455 writeLock.lock(); 456 try { 457 int pos = off; 458 int end = off + len; 459 while (pos < end) { 460 // write up to MAX_BUFFER_SIZE bytes 461 int size = Math.min((end - pos), MAX_BUFFER_SIZE); 462 int n = implWrite(b, pos, size); 463 pos += n; 464 } 465 } finally { 466 writeLock.unlock(); 467 } 468 } 469 } 470 471 /** 472 * Creates the socket. 473 * @param stream {@code true} for a streams socket 474 */ 475 @Override 476 protected void create(boolean stream) throws IOException { 477 if (!stream) { 478 throw new IOException("Datagram socket creation not supported"); 479 } 480 synchronized (stateLock) { 481 if (state != ST_NEW) 482 throw new IOException("Already created"); 483 FileDescriptor fd; 484 if (server) { 485 fd = Net.serverSocket(); 486 } else { 487 fd = Net.socket(); 488 } 489 Runnable closer = closerFor(fd); 490 this.fd = fd; 491 this.cleaner = CleanerFactory.cleaner().register(this, closer); 492 this.state = ST_UNCONNECTED; 493 } 494 } 495 496 /** 497 * Marks the beginning of a connect operation that might block. 498 * @throws SocketException if the socket is closed or already connected 499 */ 500 private FileDescriptor beginConnect(InetAddress address, int port) 501 throws IOException 502 { 503 synchronized (stateLock) { 504 int state = this.state; 505 if (state != ST_UNCONNECTED) { 506 if (state == ST_NEW) 507 throw new SocketException("Not created"); 508 if (state == ST_CONNECTING) 509 throw new SocketException("Connection in progress"); 510 if (state == ST_CONNECTED) 511 throw new SocketException("Already connected"); 512 if (state >= ST_CLOSING) 513 throw new SocketException("Socket closed"); 514 assert false; 515 } 516 this.state = ST_CONNECTING; 517 518 // invoke beforeTcpConnect hook if not already bound 519 if (localport == 0) { 520 NetHooks.beforeTcpConnect(fd, address, port); 521 } 522 523 // save the remote address/port 524 this.address = address; 525 this.port = port; 526 527 readerThread = NativeThread.threadToSignal(); 528 return fd; 529 } 530 } 531 532 /** 533 * Marks the end of a connect operation that may have blocked. 534 * @throws SocketException is the socket is closed 535 */ 536 private void endConnect(FileDescriptor fd, boolean completed) throws IOException { 537 synchronized (stateLock) { 538 readerThread = null; 539 int state = this.state; 540 if (state == ST_CLOSING) 541 tryFinishClose(); 542 if (completed && state == ST_CONNECTING) { 543 this.state = ST_CONNECTED; 544 localport = Net.localAddress(fd).getPort(); 545 } else if (!completed && state >= ST_CLOSING) { 546 throw new SocketException("Socket closed"); 547 } 548 } 549 } 550 551 /** 552 * Waits for a connection attempt to finish with a timeout 553 * @throws SocketTimeoutException if the connect timeout elapses 554 */ 555 private boolean timedFinishConnect(FileDescriptor fd, long nanos) throws IOException { 556 long startNanos = System.nanoTime(); 557 boolean polled = Net.pollConnectNow(fd); 558 while (!polled && isOpen()) { 559 long remainingNanos = nanos - (System.nanoTime() - startNanos); 560 if (remainingNanos <= 0) { 561 throw new SocketTimeoutException("Connect timed out"); 562 } 563 park(fd, Net.POLLOUT, remainingNanos); 564 polled = Net.pollConnectNow(fd); 565 } 566 return polled && isOpen(); 567 } 568 569 /** 570 * Attempts to establish a connection to the given socket address with a 571 * timeout. Closes the socket if connection cannot be established. 572 * @throws IOException if the address is not a resolved InetSocketAddress or 573 * the connection cannot be established 574 */ 575 @Override 576 protected void connect(SocketAddress remote, int millis) throws IOException { 577 // SocketImpl connect only specifies IOException 578 if (!(remote instanceof InetSocketAddress)) 579 throw new IOException("Unsupported address type"); 580 InetSocketAddress isa = (InetSocketAddress) remote; 581 if (isa.isUnresolved()) { 582 throw new UnknownHostException( 583 formatMsg(filterNonSocketInfo(isa.getHostName()))); 584 } 585 586 InetAddress address = isa.getAddress(); 587 if (address.isAnyLocalAddress()) 588 address = InetAddress.getLocalHost(); 589 int port = isa.getPort(); 590 591 ReentrantLock connectLock = readLock; 592 try { 593 connectLock.lock(); 594 try { 595 boolean connected = false; 596 FileDescriptor fd = beginConnect(address, port); 597 try { 598 configureNonBlockingIfNeeded(fd, millis > 0); 599 int n = Net.connect(fd, address, port); 600 if (n > 0) { 601 // connection established 602 connected = true; 603 } else { 604 assert IOStatus.okayToRetry(n); 605 if (millis > 0) { 606 // finish connect with timeout 607 long nanos = MILLISECONDS.toNanos(millis); 608 connected = timedFinishConnect(fd, nanos); 609 } else { 610 // finish connect, no timeout 611 boolean polled = false; 612 while (!polled && isOpen()) { 613 park(fd, Net.POLLOUT); 614 polled = Net.pollConnectNow(fd); 615 } 616 connected = polled && isOpen(); 617 } 618 } 619 } finally { 620 endConnect(fd, connected); 621 } 622 } finally { 623 connectLock.unlock(); 624 } 625 } catch (IOException ioe) { 626 close(); 627 if (ioe instanceof SocketTimeoutException) { 628 throw ioe; 629 } else if (ioe instanceof InterruptedIOException) { 630 assert Thread.currentThread().isVirtual(); 631 throw new SocketException("Closed by interrupt"); 632 } else { 633 throw Exceptions.ioException(ioe, isa); 634 } 635 } 636 } 637 638 @Override 639 protected void connect(String host, int port) throws IOException { 640 connect(new InetSocketAddress(host, port), 0); 641 } 642 643 @Override 644 protected void connect(InetAddress address, int port) throws IOException { 645 connect(new InetSocketAddress(address, port), 0); 646 } 647 648 @Override 649 protected void bind(InetAddress host, int port) throws IOException { 650 synchronized (stateLock) { 651 ensureOpen(); 652 if (localport != 0) 653 throw new SocketException("Already bound"); 654 NetHooks.beforeTcpBind(fd, host, port); 655 Net.bind(fd, host, port); 656 // set the address field to the given host address to 657 // maintain long standing behavior. When binding to 0.0.0.0 658 // then the actual local address will be ::0 when IPv6 is enabled. 659 address = host; 660 localport = Net.localAddress(fd).getPort(); 661 } 662 } 663 664 @Override 665 protected void listen(int backlog) throws IOException { 666 synchronized (stateLock) { 667 ensureOpen(); 668 if (localport == 0) 669 throw new SocketException("Not bound"); 670 Net.listen(fd, backlog < 1 ? 50 : backlog); 671 } 672 } 673 674 /** 675 * Marks the beginning of an accept operation that might block. 676 * @throws SocketException if the socket is closed 677 */ 678 private FileDescriptor beginAccept() throws SocketException { 679 synchronized (stateLock) { 680 ensureOpen(); 681 if (localport == 0) 682 throw new SocketException("Not bound"); 683 readerThread = NativeThread.threadToSignal(); 684 return fd; 685 } 686 } 687 688 /** 689 * Marks the end of an accept operation that may have blocked. 690 * @throws SocketException is the socket is closed 691 */ 692 private void endAccept(boolean completed) throws SocketException { 693 synchronized (stateLock) { 694 int state = this.state; 695 readerThread = null; 696 if (state == ST_CLOSING) 697 tryFinishClose(); 698 if (!completed && state >= ST_CLOSING) 699 throw new SocketException("Socket closed"); 700 } 701 } 702 703 /** 704 * Accepts a new connection with a timeout. 705 * @throws SocketTimeoutException if the accept timeout elapses 706 */ 707 private int timedAccept(FileDescriptor fd, 708 FileDescriptor newfd, 709 InetSocketAddress[] isaa, 710 long nanos) 711 throws IOException 712 { 713 long startNanos = System.nanoTime(); 714 int n = Net.accept(fd, newfd, isaa); 715 while (n == IOStatus.UNAVAILABLE && isOpen()) { 716 long remainingNanos = nanos - (System.nanoTime() - startNanos); 717 if (remainingNanos <= 0) { 718 throw new SocketTimeoutException("Accept timed out"); 719 } 720 park(fd, Net.POLLIN, remainingNanos); 721 n = Net.accept(fd, newfd, isaa); 722 } 723 return n; 724 } 725 726 /** 727 * Accepts a new connection so that the given SocketImpl is connected to 728 * the peer. The SocketImpl must be a newly created NioSocketImpl. 729 */ 730 @Override 731 protected void accept(SocketImpl si) throws IOException { 732 NioSocketImpl nsi = (NioSocketImpl) si; 733 if (nsi.state != ST_NEW) 734 throw new SocketException("Not a newly created SocketImpl"); 735 736 FileDescriptor newfd = new FileDescriptor(); 737 InetSocketAddress[] isaa = new InetSocketAddress[1]; 738 739 // acquire the lock, adjusting the timeout for cases where several 740 // threads are accepting connections and there is a timeout set 741 ReentrantLock acceptLock = readLock; 742 int timeout = this.timeout; 743 long remainingNanos = 0; 744 if (timeout > 0) { 745 remainingNanos = tryLock(acceptLock, timeout, MILLISECONDS); 746 if (remainingNanos <= 0) { 747 assert !acceptLock.isHeldByCurrentThread(); 748 throw new SocketTimeoutException("Accept timed out"); 749 } 750 } else { 751 acceptLock.lock(); 752 } 753 754 // accept a connection 755 try { 756 int n = 0; 757 FileDescriptor fd = beginAccept(); 758 try { 759 configureNonBlockingIfNeeded(fd, remainingNanos > 0); 760 if (remainingNanos > 0) { 761 // accept with timeout 762 n = timedAccept(fd, newfd, isaa, remainingNanos); 763 } else { 764 // accept, no timeout 765 n = Net.accept(fd, newfd, isaa); 766 while (IOStatus.okayToRetry(n) && isOpen()) { 767 park(fd, Net.POLLIN); 768 n = Net.accept(fd, newfd, isaa); 769 } 770 } 771 } finally { 772 endAccept(n > 0); 773 assert IOStatus.check(n); 774 } 775 } finally { 776 acceptLock.unlock(); 777 } 778 779 // get local address and configure accepted socket to blocking mode 780 InetSocketAddress localAddress; 781 try { 782 localAddress = Net.localAddress(newfd); 783 IOUtil.configureBlocking(newfd, true); 784 } catch (IOException ioe) { 785 nd.close(newfd); 786 throw ioe; 787 } 788 789 // set the fields 790 Runnable closer = closerFor(newfd); 791 synchronized (nsi.stateLock) { 792 nsi.fd = newfd; 793 nsi.cleaner = CleanerFactory.cleaner().register(nsi, closer); 794 nsi.localport = localAddress.getPort(); 795 nsi.address = isaa[0].getAddress(); 796 nsi.port = isaa[0].getPort(); 797 nsi.state = ST_CONNECTED; 798 } 799 } 800 801 @Override 802 protected InputStream getInputStream() { 803 return new InputStream() { 804 @Override 805 public int read() throws IOException { 806 byte[] a = new byte[1]; 807 int n = read(a, 0, 1); 808 return (n > 0) ? (a[0] & 0xff) : -1; 809 } 810 @Override 811 public int read(byte[] b, int off, int len) throws IOException { 812 return NioSocketImpl.this.read(b, off, len); 813 } 814 @Override 815 public int available() throws IOException { 816 return NioSocketImpl.this.available(); 817 } 818 @Override 819 public void close() throws IOException { 820 NioSocketImpl.this.close(); 821 } 822 }; 823 } 824 825 @Override 826 protected OutputStream getOutputStream() { 827 return new OutputStream() { 828 @Override 829 public void write(int b) throws IOException { 830 byte[] a = new byte[]{(byte) b}; 831 write(a, 0, 1); 832 } 833 @Override 834 public void write(byte[] b, int off, int len) throws IOException { 835 NioSocketImpl.this.write(b, off, len); 836 } 837 @Override 838 public void close() throws IOException { 839 NioSocketImpl.this.close(); 840 } 841 }; 842 } 843 844 @Override 845 protected int available() throws IOException { 846 synchronized (stateLock) { 847 ensureOpenAndConnected(); 848 if (isInputClosed) { 849 return 0; 850 } else { 851 return Net.available(fd); 852 } 853 } 854 } 855 856 /** 857 * Closes the socket if there are no I/O operations in progress. 858 */ 859 private boolean tryClose() throws IOException { 860 assert Thread.holdsLock(stateLock) && state == ST_CLOSING; 861 if (readerThread == null && writerThread == null) { 862 try { 863 cleaner.clean(); 864 } catch (UncheckedIOException ioe) { 865 throw ioe.getCause(); 866 } finally { 867 state = ST_CLOSED; 868 } 869 return true; 870 } else { 871 return false; 872 } 873 } 874 875 /** 876 * Invokes tryClose to attempt to close the socket. 877 * 878 * This method is used for deferred closing by I/O operations. 879 */ 880 private void tryFinishClose() { 881 try { 882 tryClose(); 883 } catch (IOException ignore) { } 884 } 885 886 /** 887 * Closes the socket. If there are I/O operations in progress then the 888 * socket is pre-closed and the threads are signalled. The socket will be 889 * closed when the last I/O operation aborts. 890 */ 891 @Override 892 protected void close() throws IOException { 893 synchronized (stateLock) { 894 int state = this.state; 895 if (state >= ST_CLOSING) 896 return; 897 if (state == ST_NEW) { 898 // stillborn 899 this.state = ST_CLOSED; 900 return; 901 } 902 boolean connected = (state == ST_CONNECTED); 903 this.state = ST_CLOSING; 904 905 // shutdown output when linger interval not set to 0 906 if (connected) { 907 try { 908 var SO_LINGER = StandardSocketOptions.SO_LINGER; 909 if ((int) Net.getSocketOption(fd, SO_LINGER) != 0) { 910 Net.shutdown(fd, Net.SHUT_WR); 911 } 912 } catch (IOException ignore) { } 913 } 914 915 // attempt to close the socket. If there are I/O operations in progress 916 // then the socket is pre-closed and the thread(s) signalled. The 917 // last thread will close the file descriptor. 918 if (!tryClose()) { 919 nd.preClose(fd, readerThread, writerThread); 920 } 921 } 922 } 923 924 // the socket options supported by client and server sockets 925 private static volatile Set<SocketOption<?>> clientSocketOptions; 926 private static volatile Set<SocketOption<?>> serverSocketOptions; 927 928 @Override 929 protected Set<SocketOption<?>> supportedOptions() { 930 Set<SocketOption<?>> options = (server) ? serverSocketOptions : clientSocketOptions; 931 if (options == null) { 932 options = new HashSet<>(); 933 options.add(StandardSocketOptions.SO_RCVBUF); 934 options.add(StandardSocketOptions.SO_REUSEADDR); 935 if (server) { 936 // IP_TOS added for server socket to maintain compatibility 937 options.add(StandardSocketOptions.IP_TOS); 938 options.addAll(ExtendedSocketOptions.serverSocketOptions()); 939 } else { 940 options.add(StandardSocketOptions.IP_TOS); 941 options.add(StandardSocketOptions.SO_KEEPALIVE); 942 options.add(StandardSocketOptions.SO_SNDBUF); 943 options.add(StandardSocketOptions.SO_LINGER); 944 options.add(StandardSocketOptions.TCP_NODELAY); 945 options.addAll(ExtendedSocketOptions.clientSocketOptions()); 946 } 947 if (Net.isReusePortAvailable()) 948 options.add(StandardSocketOptions.SO_REUSEPORT); 949 options = Collections.unmodifiableSet(options); 950 if (server) { 951 serverSocketOptions = options; 952 } else { 953 clientSocketOptions = options; 954 } 955 } 956 return options; 957 } 958 959 @Override 960 protected <T> void setOption(SocketOption<T> opt, T value) throws IOException { 961 if (!supportedOptions().contains(opt)) 962 throw new UnsupportedOperationException("'" + opt + "' not supported"); 963 if (!opt.type().isInstance(value)) 964 throw new IllegalArgumentException("Invalid value '" + value + "'"); 965 synchronized (stateLock) { 966 ensureOpen(); 967 if (opt == StandardSocketOptions.IP_TOS) { 968 // maps to IPV6_TCLASS and/or IP_TOS 969 Net.setIpSocketOption(fd, family(), opt, value); 970 } else if (opt == StandardSocketOptions.SO_REUSEADDR) { 971 boolean b = (boolean) value; 972 if (Net.useExclusiveBind()) { 973 isReuseAddress = b; 974 } else { 975 Net.setSocketOption(fd, opt, b); 976 } 977 } else { 978 // option does not need special handling 979 Net.setSocketOption(fd, opt, value); 980 } 981 } 982 } 983 984 @SuppressWarnings("unchecked") 985 protected <T> T getOption(SocketOption<T> opt) throws IOException { 986 if (!supportedOptions().contains(opt)) 987 throw new UnsupportedOperationException("'" + opt + "' not supported"); 988 synchronized (stateLock) { 989 ensureOpen(); 990 if (opt == StandardSocketOptions.IP_TOS) { 991 return (T) Net.getSocketOption(fd, family(), opt); 992 } else if (opt == StandardSocketOptions.SO_REUSEADDR) { 993 if (Net.useExclusiveBind()) { 994 return (T) Boolean.valueOf(isReuseAddress); 995 } else { 996 return (T) Net.getSocketOption(fd, opt); 997 } 998 } else { 999 // option does not need special handling 1000 return (T) Net.getSocketOption(fd, opt); 1001 } 1002 } 1003 } 1004 1005 private boolean booleanValue(Object value, String desc) throws SocketException { 1006 if (!(value instanceof Boolean)) 1007 throw new SocketException("Bad value for " + desc); 1008 return (boolean) value; 1009 } 1010 1011 private int intValue(Object value, String desc) throws SocketException { 1012 if (!(value instanceof Integer)) 1013 throw new SocketException("Bad value for " + desc); 1014 return (int) value; 1015 } 1016 1017 @Override 1018 public void setOption(int opt, Object value) throws SocketException { 1019 synchronized (stateLock) { 1020 ensureOpen(); 1021 try { 1022 switch (opt) { 1023 case SO_LINGER: { 1024 // the value is "false" to disable, or linger interval to enable 1025 int i; 1026 if (value instanceof Boolean && ((boolean) value) == false) { 1027 i = -1; 1028 } else { 1029 i = intValue(value, "SO_LINGER"); 1030 } 1031 Net.setSocketOption(fd, StandardSocketOptions.SO_LINGER, i); 1032 break; 1033 } 1034 case SO_TIMEOUT: { 1035 int i = intValue(value, "SO_TIMEOUT"); 1036 if (i < 0) 1037 throw new IllegalArgumentException("timeout < 0"); 1038 timeout = i; 1039 break; 1040 } 1041 case IP_TOS: { 1042 int i = intValue(value, "IP_TOS"); 1043 Net.setIpSocketOption(fd, family(), StandardSocketOptions.IP_TOS, i); 1044 break; 1045 } 1046 case TCP_NODELAY: { 1047 boolean b = booleanValue(value, "TCP_NODELAY"); 1048 Net.setSocketOption(fd, StandardSocketOptions.TCP_NODELAY, b); 1049 break; 1050 } 1051 case SO_SNDBUF: { 1052 int i = intValue(value, "SO_SNDBUF"); 1053 if (i <= 0) 1054 throw new SocketException("SO_SNDBUF <= 0"); 1055 Net.setSocketOption(fd, StandardSocketOptions.SO_SNDBUF, i); 1056 break; 1057 } 1058 case SO_RCVBUF: { 1059 int i = intValue(value, "SO_RCVBUF"); 1060 if (i <= 0) 1061 throw new SocketException("SO_RCVBUF <= 0"); 1062 Net.setSocketOption(fd, StandardSocketOptions.SO_RCVBUF, i); 1063 break; 1064 } 1065 case SO_KEEPALIVE: { 1066 boolean b = booleanValue(value, "SO_KEEPALIVE"); 1067 Net.setSocketOption(fd, StandardSocketOptions.SO_KEEPALIVE, b); 1068 break; 1069 } 1070 case SO_OOBINLINE: { 1071 boolean b = booleanValue(value, "SO_OOBINLINE"); 1072 Net.setSocketOption(fd, ExtendedSocketOption.SO_OOBINLINE, b); 1073 break; 1074 } 1075 case SO_REUSEADDR: { 1076 boolean b = booleanValue(value, "SO_REUSEADDR"); 1077 if (Net.useExclusiveBind()) { 1078 isReuseAddress = b; 1079 } else { 1080 Net.setSocketOption(fd, StandardSocketOptions.SO_REUSEADDR, b); 1081 } 1082 break; 1083 } 1084 case SO_REUSEPORT: { 1085 if (!Net.isReusePortAvailable()) 1086 throw new SocketException("SO_REUSEPORT not supported"); 1087 boolean b = booleanValue(value, "SO_REUSEPORT"); 1088 Net.setSocketOption(fd, StandardSocketOptions.SO_REUSEPORT, b); 1089 break; 1090 } 1091 default: 1092 throw new SocketException("Unknown option " + opt); 1093 } 1094 } catch (SocketException e) { 1095 throw e; 1096 } catch (IllegalArgumentException | IOException e) { 1097 throw asSocketException(e); 1098 } 1099 } 1100 } 1101 1102 @Override 1103 public Object getOption(int opt) throws SocketException { 1104 synchronized (stateLock) { 1105 ensureOpen(); 1106 try { 1107 switch (opt) { 1108 case SO_TIMEOUT: 1109 return timeout; 1110 case TCP_NODELAY: 1111 return Net.getSocketOption(fd, StandardSocketOptions.TCP_NODELAY); 1112 case SO_OOBINLINE: 1113 return Net.getSocketOption(fd, ExtendedSocketOption.SO_OOBINLINE); 1114 case SO_LINGER: { 1115 // return "false" when disabled, linger interval when enabled 1116 int i = (int) Net.getSocketOption(fd, StandardSocketOptions.SO_LINGER); 1117 if (i == -1) { 1118 return Boolean.FALSE; 1119 } else { 1120 return i; 1121 } 1122 } 1123 case SO_REUSEADDR: 1124 if (Net.useExclusiveBind()) { 1125 return isReuseAddress; 1126 } else { 1127 return Net.getSocketOption(fd, StandardSocketOptions.SO_REUSEADDR); 1128 } 1129 case SO_BINDADDR: 1130 return Net.localAddress(fd).getAddress(); 1131 case SO_SNDBUF: 1132 return Net.getSocketOption(fd, StandardSocketOptions.SO_SNDBUF); 1133 case SO_RCVBUF: 1134 return Net.getSocketOption(fd, StandardSocketOptions.SO_RCVBUF); 1135 case IP_TOS: 1136 return Net.getSocketOption(fd, family(), StandardSocketOptions.IP_TOS); 1137 case SO_KEEPALIVE: 1138 return Net.getSocketOption(fd, StandardSocketOptions.SO_KEEPALIVE); 1139 case SO_REUSEPORT: 1140 if (!Net.isReusePortAvailable()) 1141 throw new SocketException("SO_REUSEPORT not supported"); 1142 return Net.getSocketOption(fd, StandardSocketOptions.SO_REUSEPORT); 1143 default: 1144 throw new SocketException("Unknown option " + opt); 1145 } 1146 } catch (SocketException e) { 1147 throw e; 1148 } catch (IllegalArgumentException | IOException e) { 1149 throw asSocketException(e); 1150 } 1151 } 1152 } 1153 1154 @Override 1155 protected void shutdownInput() throws IOException { 1156 synchronized (stateLock) { 1157 ensureOpenAndConnected(); 1158 if (!isInputClosed) { 1159 Net.shutdown(fd, Net.SHUT_RD); 1160 if (readerThread != null && readerThread.isVirtual()) { 1161 Poller.stopPoll(readerThread); 1162 } 1163 isInputClosed = true; 1164 } 1165 } 1166 } 1167 1168 @Override 1169 protected void shutdownOutput() throws IOException { 1170 synchronized (stateLock) { 1171 ensureOpenAndConnected(); 1172 if (!isOutputClosed) { 1173 Net.shutdown(fd, Net.SHUT_WR); 1174 if (writerThread != null && writerThread.isVirtual()) { 1175 Poller.stopPoll(writerThread); 1176 } 1177 isOutputClosed = true; 1178 } 1179 } 1180 } 1181 1182 @Override 1183 protected boolean supportsUrgentData() { 1184 return true; 1185 } 1186 1187 @Override 1188 protected void sendUrgentData(int data) throws IOException { 1189 writeLock.lock(); 1190 try { 1191 int n = 0; 1192 FileDescriptor fd = beginWrite(); 1193 try { 1194 configureNonBlockingIfNeeded(fd, false); 1195 do { 1196 n = Net.sendOOB(fd, (byte) data); 1197 } while (n == IOStatus.INTERRUPTED && isOpen()); 1198 if (n == IOStatus.UNAVAILABLE) { 1199 throw new SocketException("No buffer space available"); 1200 } 1201 } finally { 1202 endWrite(n > 0); 1203 } 1204 } finally { 1205 writeLock.unlock(); 1206 } 1207 } 1208 1209 /** 1210 * Returns an action to close the given file descriptor. 1211 */ 1212 private static Runnable closerFor(FileDescriptor fd) { 1213 return () -> { 1214 try { 1215 nd.close(fd); 1216 } catch (IOException ioe) { 1217 throw new UncheckedIOException(ioe); 1218 } 1219 }; 1220 } 1221 1222 /** 1223 * Attempts to acquire the given lock within the given waiting time. 1224 * @return the remaining time in nanoseconds when the lock is acquired, zero 1225 * or less if the lock was not acquired before the timeout expired 1226 */ 1227 private static long tryLock(ReentrantLock lock, long timeout, TimeUnit unit) { 1228 assert timeout > 0; 1229 boolean interrupted = false; 1230 long nanos = unit.toNanos(timeout); 1231 long remainingNanos = nanos; 1232 long startNanos = System.nanoTime(); 1233 boolean acquired = false; 1234 while (!acquired && (remainingNanos > 0)) { 1235 try { 1236 acquired = lock.tryLock(remainingNanos, NANOSECONDS); 1237 } catch (InterruptedException e) { 1238 interrupted = true; 1239 } 1240 remainingNanos = nanos - (System.nanoTime() - startNanos); 1241 } 1242 if (acquired && remainingNanos <= 0L) 1243 lock.unlock(); // release lock if timeout has expired 1244 if (interrupted) 1245 Thread.currentThread().interrupt(); 1246 return remainingNanos; 1247 } 1248 1249 /** 1250 * Creates a SocketException from the given exception. 1251 */ 1252 private static SocketException asSocketException(Exception e) { 1253 if (e instanceof SocketException se) { 1254 return se; 1255 } else { 1256 var se = new SocketException(e.getMessage()); 1257 se.setStackTrace(e.getStackTrace()); 1258 return se; 1259 } 1260 } 1261 1262 /** 1263 * Returns the socket protocol family. 1264 */ 1265 private static ProtocolFamily family() { 1266 if (Net.isIPv6Available()) { 1267 return StandardProtocolFamily.INET6; 1268 } else { 1269 return StandardProtocolFamily.INET; 1270 } 1271 } 1272 1273 /** 1274 * Return the file descriptor value. 1275 */ 1276 private static int fdVal(FileDescriptor fd) { 1277 return JIOFDA.get(fd); 1278 } 1279 1280 private static final JavaIOFileDescriptorAccess JIOFDA = SharedSecrets.getJavaIOFileDescriptorAccess(); 1281 } --- EOF ---