1 /* 2 * Copyright (c) 2009, 2025, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch.sctp; 27 28 import java.net.InetAddress; 29 import java.net.SocketAddress; 30 import java.net.SocketException; 31 import java.net.InetSocketAddress; 32 import java.io.FileDescriptor; 33 import java.io.IOException; 34 import java.util.Collections; 35 import java.util.Set; 36 import java.util.HashSet; 37 import java.nio.ByteBuffer; 38 import java.nio.channels.SelectionKey; 39 import java.nio.channels.ClosedChannelException; 40 import java.nio.channels.ConnectionPendingException; 41 import java.nio.channels.NoConnectionPendingException; 42 import java.nio.channels.AlreadyConnectedException; 43 import java.nio.channels.NotYetBoundException; 44 import java.nio.channels.NotYetConnectedException; 45 import java.nio.channels.spi.SelectorProvider; 46 import com.sun.nio.sctp.AbstractNotificationHandler; 47 import com.sun.nio.sctp.Association; 48 import com.sun.nio.sctp.AssociationChangeNotification; 49 import com.sun.nio.sctp.HandlerResult; 50 import com.sun.nio.sctp.IllegalReceiveException; 51 import com.sun.nio.sctp.InvalidStreamException; 52 import com.sun.nio.sctp.IllegalUnbindException; 53 import com.sun.nio.sctp.MessageInfo; 54 import com.sun.nio.sctp.NotificationHandler; 55 import com.sun.nio.sctp.SctpChannel; 56 import com.sun.nio.sctp.SctpSocketOption; 57 import jdk.internal.access.JavaNioAccess; 58 import jdk.internal.access.SharedSecrets; 59 import sun.net.util.IPAddressUtil; 60 import sun.nio.ch.DirectBuffer; 61 import sun.nio.ch.IOStatus; 62 import sun.nio.ch.IOUtil; 63 import sun.nio.ch.NativeThread; 64 import sun.nio.ch.Net; 65 import sun.nio.ch.SelChImpl; 66 import sun.nio.ch.SelectionKeyImpl; 67 import sun.nio.ch.Util; 68 import static com.sun.nio.sctp.SctpStandardSocketOptions.*; 69 import static sun.nio.ch.sctp.ResultContainer.SEND_FAILED; 70 import static sun.nio.ch.sctp.ResultContainer.ASSOCIATION_CHANGED; 71 import static sun.nio.ch.sctp.ResultContainer.PEER_ADDRESS_CHANGED; 72 import static sun.nio.ch.sctp.ResultContainer.SHUTDOWN; 73 74 /** 75 * An implementation of an SctpChannel 76 */ 77 public class SctpChannelImpl extends SctpChannel 78 implements SelChImpl 79 { 80 81 private static final JavaNioAccess NIO_ACCESS = SharedSecrets.getJavaNioAccess(); 82 83 private final FileDescriptor fd; 84 85 private final int fdVal; 86 87 /* IDs of native threads doing send and receive, for signalling */ 88 private volatile NativeThread receiverThread; 89 private volatile NativeThread senderThread; 90 91 /* Lock held by current receiving or connecting thread */ 92 private final Object receiveLock = new Object(); 93 94 /* Lock held by current sending or connecting thread */ 95 private final Object sendLock = new Object(); 96 97 private final ThreadLocal<Boolean> receiveInvoked = 98 new ThreadLocal<>() { 99 @Override protected Boolean initialValue() { 100 return Boolean.FALSE; 101 } 102 }; 103 104 /* Lock held by any thread that modifies the state fields declared below 105 DO NOT invoke a blocking I/O operation while holding this lock! */ 106 private final Object stateLock = new Object(); 107 108 private enum ChannelState { 109 UNINITIALIZED, 110 UNCONNECTED, 111 PENDING, 112 CONNECTED, 113 KILLPENDING, 114 KILLED, 115 } 116 /* -- The following fields are protected by stateLock -- */ 117 private ChannelState state; 118 119 /* Binding; Once bound the port will remain constant. */ 120 int port = -1; 121 private final Set<InetSocketAddress> localAddresses = new HashSet<>(); 122 /* Has the channel been bound to the wildcard address */ 123 private boolean wildcard; /* false */ 124 //private InetSocketAddress remoteAddress = null; 125 126 /* Input/Output open */ 127 private boolean readyToConnect; 128 129 /* Shutdown */ 130 private boolean isShutdown; 131 132 private Association association; 133 134 private Set<SocketAddress> remoteAddresses = Collections.emptySet(); 135 136 /* -- End of fields protected by stateLock -- */ 137 138 /** 139 * Constructor for normal connecting sockets 140 */ 141 public SctpChannelImpl(SelectorProvider provider) throws IOException { 142 //TODO: update provider remove public modifier 143 super(provider); 144 this.fd = SctpNet.socket(true); 145 this.fdVal = IOUtil.fdVal(fd); 146 this.state = ChannelState.UNCONNECTED; 147 } 148 149 /** 150 * Constructor for sockets obtained from server sockets 151 */ 152 public SctpChannelImpl(SelectorProvider provider, FileDescriptor fd) 153 throws IOException { 154 this(provider, fd, null); 155 } 156 157 /** 158 * Constructor for sockets obtained from branching 159 */ 160 public SctpChannelImpl(SelectorProvider provider, 161 FileDescriptor fd, 162 Association association) 163 throws IOException { 164 super(provider); 165 this.fd = fd; 166 this.fdVal = IOUtil.fdVal(fd); 167 this.state = ChannelState.CONNECTED; 168 port = (Net.localAddress(fd)).getPort(); 169 170 if (association != null) { /* branched */ 171 this.association = association; 172 } else { /* obtained from server channel */ 173 /* Receive COMM_UP */ 174 ByteBuffer buf = Util.getTemporaryDirectBuffer(50); 175 try { 176 receive(buf, null, null, true); 177 } finally { 178 Util.releaseTemporaryDirectBuffer(buf); 179 } 180 } 181 } 182 183 /** 184 * Binds the channel's socket to a local address. 185 */ 186 @Override 187 public SctpChannel bind(SocketAddress local) throws IOException { 188 synchronized (receiveLock) { 189 synchronized (sendLock) { 190 synchronized (stateLock) { 191 ensureOpenAndUnconnected(); 192 if (isBound()) 193 SctpNet.throwAlreadyBoundException(); 194 InetSocketAddress isa = (local == null) ? 195 new InetSocketAddress(0) : Net.checkAddress(local); 196 Net.bind(fd, isa.getAddress(), isa.getPort()); 197 InetSocketAddress boundIsa = Net.localAddress(fd); 198 port = boundIsa.getPort(); 199 localAddresses.add(isa); 200 if (isa.getAddress().isAnyLocalAddress()) 201 wildcard = true; 202 } 203 } 204 } 205 return this; 206 } 207 208 @Override 209 public SctpChannel bindAddress(InetAddress address) 210 throws IOException { 211 bindUnbindAddress(address, true); 212 localAddresses.add(new InetSocketAddress(address, port)); 213 return this; 214 } 215 216 @Override 217 public SctpChannel unbindAddress(InetAddress address) 218 throws IOException { 219 bindUnbindAddress(address, false); 220 localAddresses.remove(new InetSocketAddress(address, port)); 221 return this; 222 } 223 224 private void bindUnbindAddress(InetAddress address, boolean add) 225 throws IOException { 226 if (address == null) 227 throw new IllegalArgumentException(); 228 229 synchronized (receiveLock) { 230 synchronized (sendLock) { 231 synchronized (stateLock) { 232 if (!isOpen()) 233 throw new ClosedChannelException(); 234 if (!isBound()) 235 throw new NotYetBoundException(); 236 if (wildcard) 237 throw new IllegalStateException( 238 "Cannot add or remove addresses from a channel that is bound to the wildcard address"); 239 if (address.isAnyLocalAddress()) 240 throw new IllegalArgumentException( 241 "Cannot add or remove the wildcard address"); 242 if (add) { 243 for (InetSocketAddress addr : localAddresses) { 244 if (addr.getAddress().equals(address)) { 245 SctpNet.throwAlreadyBoundException(); 246 } 247 } 248 } else { /*removing */ 249 /* Verify that there is more than one address 250 * and that address is already bound */ 251 if (localAddresses.size() <= 1) 252 throw new IllegalUnbindException("Cannot remove address from a channel with only one address bound"); 253 boolean foundAddress = false; 254 for (InetSocketAddress addr : localAddresses) { 255 if (addr.getAddress().equals(address)) { 256 foundAddress = true; 257 break; 258 } 259 } 260 if (!foundAddress ) 261 throw new IllegalUnbindException("Cannot remove address from a channel that is not bound to that address"); 262 } 263 264 SctpNet.bindx(fdVal, new InetAddress[]{address}, port, add); 265 266 /* Update our internal Set to reflect the addition/removal */ 267 if (add) 268 localAddresses.add(new InetSocketAddress(address, port)); 269 else { 270 for (InetSocketAddress addr : localAddresses) { 271 if (addr.getAddress().equals(address)) { 272 localAddresses.remove(addr); 273 break; 274 } 275 } 276 } 277 } 278 } 279 } 280 } 281 282 private boolean isBound() { 283 synchronized (stateLock) { 284 return port != -1; 285 } 286 } 287 288 private boolean isConnected() { 289 synchronized (stateLock) { 290 return (state == ChannelState.CONNECTED); 291 } 292 } 293 294 private void ensureOpenAndUnconnected() throws IOException { 295 synchronized (stateLock) { 296 if (!isOpen()) 297 throw new ClosedChannelException(); 298 if (isConnected()) 299 throw new AlreadyConnectedException(); 300 if (state == ChannelState.PENDING) 301 throw new ConnectionPendingException(); 302 } 303 } 304 305 private boolean ensureReceiveOpen() throws ClosedChannelException { 306 synchronized (stateLock) { 307 if (!isOpen()) 308 throw new ClosedChannelException(); 309 if (!isConnected()) 310 throw new NotYetConnectedException(); 311 else 312 return true; 313 } 314 } 315 316 private void ensureSendOpen() throws ClosedChannelException { 317 synchronized (stateLock) { 318 if (!isOpen()) 319 throw new ClosedChannelException(); 320 if (isShutdown) 321 throw new ClosedChannelException(); 322 if (!isConnected()) 323 throw new NotYetConnectedException(); 324 } 325 } 326 327 private void receiverCleanup() throws IOException { 328 synchronized (stateLock) { 329 receiverThread = null; 330 if (state == ChannelState.KILLPENDING) 331 kill(); 332 } 333 } 334 335 private void senderCleanup() throws IOException { 336 synchronized (stateLock) { 337 senderThread = null; 338 if (state == ChannelState.KILLPENDING) 339 kill(); 340 } 341 } 342 343 @Override 344 public Association association() throws ClosedChannelException { 345 synchronized (stateLock) { 346 if (!isOpen()) 347 throw new ClosedChannelException(); 348 if (!isConnected()) 349 return null; 350 351 return association; 352 } 353 } 354 355 @Override 356 public boolean connect(SocketAddress endpoint) throws IOException { 357 synchronized (receiveLock) { 358 synchronized (sendLock) { 359 ensureOpenAndUnconnected(); 360 InetSocketAddress isa = Net.checkAddress(endpoint); 361 synchronized (blockingLock()) { 362 int n = 0; 363 try { 364 try { 365 begin(); 366 synchronized (stateLock) { 367 if (!isOpen()) { 368 return false; 369 } 370 receiverThread = NativeThread.current(); 371 } 372 for (;;) { 373 InetAddress ia = isa.getAddress(); 374 if (ia.isAnyLocalAddress()) 375 ia = InetAddress.getLocalHost(); 376 n = SctpNet.connect(fdVal, ia, isa.getPort()); 377 if ( (n == IOStatus.INTERRUPTED) 378 && isOpen()) 379 continue; 380 break; 381 } 382 } finally { 383 receiverCleanup(); 384 end((n > 0) || (n == IOStatus.UNAVAILABLE)); 385 assert IOStatus.check(n); 386 } 387 } catch (IOException x) { 388 /* If an exception was thrown, close the channel after 389 * invoking end() so as to avoid bogus 390 * AsynchronousCloseExceptions */ 391 close(); 392 throw x; 393 } 394 395 if (n > 0) { 396 synchronized (stateLock) { 397 /* Connection succeeded */ 398 state = ChannelState.CONNECTED; 399 if (!isBound()) { 400 InetSocketAddress boundIsa = 401 Net.localAddress(fd); 402 port = boundIsa.getPort(); 403 } 404 405 /* Receive COMM_UP */ 406 ByteBuffer buf = Util.getTemporaryDirectBuffer(50); 407 try { 408 receive(buf, null, null, true); 409 } finally { 410 Util.releaseTemporaryDirectBuffer(buf); 411 } 412 413 /* cache remote addresses */ 414 try { 415 remoteAddresses = getRemoteAddresses(); 416 } catch (IOException unused) { /* swallow exception */ } 417 418 return true; 419 } 420 } else { 421 synchronized (stateLock) { 422 /* If nonblocking and no exception then connection 423 * pending; disallow another invocation */ 424 if (!isBlocking()) 425 state = ChannelState.PENDING; 426 else 427 assert false; 428 } 429 } 430 } 431 return false; 432 } 433 } 434 } 435 436 @Override 437 public boolean connect(SocketAddress endpoint, 438 int maxOutStreams, 439 int maxInStreams) 440 throws IOException { 441 ensureOpenAndUnconnected(); 442 return setOption(SCTP_INIT_MAXSTREAMS, InitMaxStreams. 443 create(maxInStreams, maxOutStreams)).connect(endpoint); 444 445 } 446 447 @Override 448 public boolean isConnectionPending() { 449 synchronized (stateLock) { 450 return (state == ChannelState.PENDING); 451 } 452 } 453 454 @Override 455 public boolean finishConnect() throws IOException { 456 synchronized (receiveLock) { 457 synchronized (sendLock) { 458 synchronized (stateLock) { 459 if (!isOpen()) 460 throw new ClosedChannelException(); 461 if (isConnected()) 462 return true; 463 if (state != ChannelState.PENDING) 464 throw new NoConnectionPendingException(); 465 } 466 boolean connected = false; 467 try { 468 try { 469 begin(); 470 synchronized (blockingLock()) { 471 synchronized (stateLock) { 472 if (!isOpen()) { 473 return false; 474 } 475 receiverThread = NativeThread.current(); 476 } 477 if (!isBlocking()) { 478 connected = Net.pollConnect(fd, 0); 479 } else { 480 do { 481 connected = Net.pollConnect(fd, -1); 482 } while (!connected && isOpen()); 483 } 484 } 485 } finally { 486 synchronized (stateLock) { 487 receiverThread = null; 488 if (state == ChannelState.KILLPENDING) { 489 kill(); 490 connected = false; 491 } 492 } 493 end(connected); 494 } 495 } catch (IOException x) { 496 /* If an exception was thrown, close the channel after 497 * invoking end() so as to avoid bogus 498 * AsynchronousCloseExceptions */ 499 close(); 500 throw x; 501 } 502 503 if (connected) { 504 synchronized (stateLock) { 505 state = ChannelState.CONNECTED; 506 if (!isBound()) { 507 InetSocketAddress boundIsa = 508 Net.localAddress(fd); 509 port = boundIsa.getPort(); 510 } 511 512 /* Receive COMM_UP */ 513 ByteBuffer buf = Util.getTemporaryDirectBuffer(50); 514 try { 515 receive(buf, null, null, true); 516 } finally { 517 Util.releaseTemporaryDirectBuffer(buf); 518 } 519 520 /* cache remote addresses */ 521 try { 522 remoteAddresses = getRemoteAddresses(); 523 } catch (IOException unused) { /* swallow exception */ } 524 525 return true; 526 } 527 } 528 } 529 } 530 return false; 531 } 532 533 @Override 534 protected void implConfigureBlocking(boolean block) throws IOException { 535 IOUtil.configureBlocking(fd, block); 536 } 537 538 @Override 539 public void implCloseSelectableChannel() throws IOException { 540 synchronized (stateLock) { 541 if (state != ChannelState.KILLED) 542 SctpNet.preClose(fdVal); 543 544 if (NativeThread.isNativeThread(receiverThread)) 545 receiverThread.signal(); 546 547 if (NativeThread.isNativeThread(senderThread)) 548 senderThread.signal(); 549 550 if (!isRegistered()) 551 kill(); 552 } 553 } 554 555 @Override 556 public FileDescriptor getFD() { 557 return fd; 558 } 559 560 @Override 561 public int getFDVal() { 562 return fdVal; 563 } 564 565 /** 566 * Translates native poll revent ops into a ready operation ops 567 */ 568 private boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) { 569 int intOps = sk.nioInterestOps(); 570 int oldOps = sk.nioReadyOps(); 571 int newOps = initialOps; 572 573 if ((ops & Net.POLLNVAL) != 0) { 574 /* This should only happen if this channel is pre-closed while a 575 * selection operation is in progress 576 * ## Throw an error if this channel has not been pre-closed */ 577 return false; 578 } 579 580 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) { 581 newOps = intOps; 582 sk.nioReadyOps(newOps); 583 /* No need to poll again in checkConnect, 584 * the error will be detected there */ 585 readyToConnect = true; 586 return (newOps & ~oldOps) != 0; 587 } 588 589 if (((ops & Net.POLLIN) != 0) && 590 ((intOps & SelectionKey.OP_READ) != 0) && 591 isConnected()) 592 newOps |= SelectionKey.OP_READ; 593 594 if (((ops & Net.POLLCONN) != 0) && 595 ((intOps & SelectionKey.OP_CONNECT) != 0) && 596 ((state == ChannelState.UNCONNECTED) || (state == ChannelState.PENDING))) { 597 newOps |= SelectionKey.OP_CONNECT; 598 readyToConnect = true; 599 } 600 601 if (((ops & Net.POLLOUT) != 0) && 602 ((intOps & SelectionKey.OP_WRITE) != 0) && 603 isConnected()) 604 newOps |= SelectionKey.OP_WRITE; 605 606 sk.nioReadyOps(newOps); 607 return (newOps & ~oldOps) != 0; 608 } 609 610 @Override 611 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) { 612 return translateReadyOps(ops, sk.nioReadyOps(), sk); 613 } 614 615 @Override 616 @SuppressWarnings("all") 617 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) { 618 return translateReadyOps(ops, 0, sk); 619 } 620 621 @Override 622 public int translateInterestOps(int ops) { 623 int newOps = 0; 624 if ((ops & SelectionKey.OP_READ) != 0) 625 newOps |= Net.POLLIN; 626 if ((ops & SelectionKey.OP_WRITE) != 0) 627 newOps |= Net.POLLOUT; 628 if ((ops & SelectionKey.OP_CONNECT) != 0) 629 newOps |= Net.POLLCONN; 630 return newOps; 631 } 632 633 @Override 634 public void kill() throws IOException { 635 synchronized (stateLock) { 636 if (state == ChannelState.KILLED) 637 return; 638 if (state == ChannelState.UNINITIALIZED) { 639 state = ChannelState.KILLED; 640 SctpNet.close(fdVal); 641 return; 642 } 643 assert !isOpen() && !isRegistered(); 644 645 /* Postpone the kill if there is a waiting reader 646 * or writer thread. */ 647 if (receiverThread == null && senderThread == null) { 648 state = ChannelState.KILLED; 649 SctpNet.close(fdVal); 650 } else { 651 state = ChannelState.KILLPENDING; 652 } 653 } 654 } 655 656 @Override 657 public <T> SctpChannel setOption(SctpSocketOption<T> name, T value) 658 throws IOException { 659 if (name == null) 660 throw new NullPointerException(); 661 if (!supportedOptions().contains(name)) 662 throw new UnsupportedOperationException("'" + name + "' not supported"); 663 664 synchronized (stateLock) { 665 if (!isOpen()) 666 throw new ClosedChannelException(); 667 668 SctpNet.setSocketOption(fdVal, name, value, 0 /*oneToOne*/); 669 } 670 return this; 671 } 672 673 @Override 674 @SuppressWarnings("unchecked") 675 public <T> T getOption(SctpSocketOption<T> name) throws IOException { 676 if (name == null) 677 throw new NullPointerException(); 678 if (!supportedOptions().contains(name)) 679 throw new UnsupportedOperationException("'" + name + "' not supported"); 680 681 synchronized (stateLock) { 682 if (!isOpen()) 683 throw new ClosedChannelException(); 684 685 return (T)SctpNet.getSocketOption(fdVal, name, 0 /*oneToOne*/); 686 } 687 } 688 689 @Override 690 public final Set<SctpSocketOption<?>> supportedOptions() { 691 final class Holder { 692 static final Set<SctpSocketOption<?>> DEFAULT_OPTIONS = Set.of( 693 SCTP_DISABLE_FRAGMENTS, 694 SCTP_EXPLICIT_COMPLETE, 695 SCTP_FRAGMENT_INTERLEAVE, 696 SCTP_INIT_MAXSTREAMS, 697 SCTP_NODELAY, 698 SCTP_PRIMARY_ADDR, 699 SCTP_SET_PEER_PRIMARY_ADDR, 700 SO_SNDBUF, 701 SO_RCVBUF, 702 SO_LINGER); 703 } 704 return Holder.DEFAULT_OPTIONS; 705 } 706 707 @Override 708 public <T> MessageInfo receive(ByteBuffer buffer, 709 T attachment, 710 NotificationHandler<T> handler) 711 throws IOException { 712 return receive(buffer, attachment, handler, false); 713 } 714 715 private <T> MessageInfo receive(ByteBuffer buffer, 716 T attachment, 717 NotificationHandler<T> handler, 718 boolean fromConnect) 719 throws IOException { 720 if (buffer == null) 721 throw new IllegalArgumentException("buffer cannot be null"); 722 723 if (buffer.isReadOnly()) 724 throw new IllegalArgumentException("Read-only buffer"); 725 726 if (receiveInvoked.get()) 727 throw new IllegalReceiveException( 728 "cannot invoke receive from handler"); 729 receiveInvoked.set(Boolean.TRUE); 730 731 try { 732 ResultContainer resultContainer = new ResultContainer(); 733 do { 734 resultContainer.clear(); 735 synchronized (receiveLock) { 736 if (!ensureReceiveOpen()) 737 return null; 738 739 int n = 0; 740 try { 741 begin(); 742 743 synchronized (stateLock) { 744 if(!isOpen()) 745 return null; 746 receiverThread = NativeThread.current(); 747 } 748 749 do { 750 n = receive(fdVal, buffer, resultContainer, fromConnect); 751 } while ((n == IOStatus.INTERRUPTED) && isOpen()); 752 } finally { 753 receiverCleanup(); 754 end((n > 0) || (n == IOStatus.UNAVAILABLE)); 755 assert IOStatus.check(n); 756 } 757 758 if (!resultContainer.isNotification()) { 759 /* message or nothing */ 760 if (resultContainer.hasSomething()) { 761 /* Set the association before returning */ 762 MessageInfoImpl info = 763 resultContainer.getMessageInfo(); 764 synchronized (stateLock) { 765 assert association != null; 766 info.setAssociation(association); 767 } 768 return info; 769 } else 770 /* Non-blocking may return null if nothing available*/ 771 return null; 772 } else { /* notification */ 773 synchronized (stateLock) { 774 handleNotificationInternal( 775 resultContainer); 776 } 777 } 778 779 if (fromConnect) { 780 /* If we reach here, then it was connect that invoked 781 * receive and received the COMM_UP. We have already 782 * handled the COMM_UP with the internal notification 783 * handler. Simply return. */ 784 return null; 785 } 786 } /* receiveLock */ 787 } while (handler == null ? true : 788 (invokeNotificationHandler(resultContainer, handler, attachment) 789 == HandlerResult.CONTINUE)); 790 791 return null; 792 } finally { 793 receiveInvoked.set(Boolean.FALSE); 794 } 795 } 796 797 private int receive(int fd, 798 ByteBuffer dst, 799 ResultContainer resultContainer, 800 boolean peek) 801 throws IOException { 802 int pos = dst.position(); 803 int lim = dst.limit(); 804 assert (pos <= lim); 805 int rem = (pos <= lim ? lim - pos : 0); 806 if (dst instanceof DirectBuffer && rem > 0) 807 return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos, peek); 808 809 /* Substitute a native buffer */ 810 int newSize = Math.max(rem, 1); 811 ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize); 812 try { 813 int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0, peek); 814 bb.flip(); 815 if (n > 0 && rem > 0) 816 dst.put(bb); 817 return n; 818 } finally { 819 Util.releaseTemporaryDirectBuffer(bb); 820 } 821 } 822 823 private int receiveIntoNativeBuffer(int fd, 824 ResultContainer resultContainer, 825 ByteBuffer bb, 826 int rem, 827 int pos, 828 boolean peek) 829 throws IOException 830 { 831 NIO_ACCESS.acquireSession(bb); 832 try { 833 int n = receive0(fd, resultContainer, NIO_ACCESS.getBufferAddress(bb) + pos, rem, peek); 834 835 if (n > 0) 836 bb.position(pos + n); 837 return n; 838 } finally { 839 NIO_ACCESS.releaseSession(bb); 840 } 841 } 842 843 private final InternalNotificationHandler internalNotificationHandler = 844 new InternalNotificationHandler(); 845 846 private void handleNotificationInternal(ResultContainer resultContainer) { 847 invokeNotificationHandler(resultContainer, 848 internalNotificationHandler, null); 849 } 850 851 private final class InternalNotificationHandler 852 extends AbstractNotificationHandler<Object> { 853 @Override 854 public HandlerResult handleNotification(AssociationChangeNotification not, 855 Object unused) { 856 if (not.event().equals(AssociationChangeNotification.AssocChangeEvent.COMM_UP) && 857 association == null) { 858 AssociationChange sac = (AssociationChange) not; 859 association = new AssociationImpl 860 (sac.assocId(), sac.maxInStreams(), sac.maxOutStreams()); 861 } 862 return HandlerResult.CONTINUE; 863 } 864 } 865 866 private <T> HandlerResult invokeNotificationHandler(ResultContainer resultContainer, 867 NotificationHandler<T> handler, 868 T attachment) { 869 SctpNotification notification = resultContainer.notification(); 870 synchronized (stateLock) { 871 notification.setAssociation(association); 872 } 873 874 if (!(handler instanceof AbstractNotificationHandler<T> absHandler)) { 875 return handler.handleNotification(notification, attachment); 876 } 877 878 /* AbstractNotificationHandler */ 879 return switch (resultContainer.type()) { 880 case ASSOCIATION_CHANGED -> absHandler.handleNotification( 881 resultContainer.getAssociationChanged(), attachment); 882 case PEER_ADDRESS_CHANGED -> absHandler.handleNotification( 883 resultContainer.getPeerAddressChanged(), attachment); 884 case SEND_FAILED -> absHandler.handleNotification( 885 resultContainer.getSendFailed(), attachment); 886 case SHUTDOWN -> absHandler.handleNotification( 887 resultContainer.getShutdown(), attachment); 888 /* implementation specific handlers */ 889 default -> absHandler.handleNotification( 890 resultContainer.notification(), attachment); 891 }; 892 } 893 894 private void checkAssociation(Association sendAssociation) { 895 synchronized (stateLock) { 896 if (sendAssociation != null && !sendAssociation.equals(association)) { 897 throw new IllegalArgumentException( 898 "Cannot send to another association"); 899 } 900 } 901 } 902 903 private void checkStreamNumber(int streamNumber) { 904 synchronized (stateLock) { 905 if (association != null) { 906 if (streamNumber < 0 || 907 streamNumber >= association.maxOutboundStreams()) 908 throw new InvalidStreamException(); 909 } 910 } 911 } 912 913 /* TODO: Add support for ttl and isComplete to both 121 12M 914 * SCTP_EOR not yet supported on reference platforms 915 * TTL support limited... 916 */ 917 @Override 918 public int send(ByteBuffer buffer, MessageInfo messageInfo) 919 throws IOException { 920 if (buffer == null) 921 throw new IllegalArgumentException("buffer cannot be null"); 922 923 if (messageInfo == null) 924 throw new IllegalArgumentException("messageInfo cannot be null"); 925 926 checkAssociation(messageInfo.association()); 927 checkStreamNumber(messageInfo.streamNumber()); 928 929 synchronized (sendLock) { 930 ensureSendOpen(); 931 932 int n = 0; 933 try { 934 begin(); 935 936 synchronized (stateLock) { 937 if(!isOpen()) 938 return 0; 939 senderThread = NativeThread.current(); 940 } 941 942 do { 943 n = send(fdVal, buffer, messageInfo); 944 } while ((n == IOStatus.INTERRUPTED) && isOpen()); 945 946 return IOStatus.normalize(n); 947 } finally { 948 senderCleanup(); 949 end((n > 0) || (n == IOStatus.UNAVAILABLE)); 950 assert IOStatus.check(n); 951 } 952 } 953 } 954 955 private int send(int fd, ByteBuffer src, MessageInfo messageInfo) 956 throws IOException { 957 int streamNumber = messageInfo.streamNumber(); 958 SocketAddress target = messageInfo.address(); 959 boolean unordered = messageInfo.isUnordered(); 960 int ppid = messageInfo.payloadProtocolID(); 961 962 if (src instanceof DirectBuffer) 963 return sendFromNativeBuffer(fd, src, target, streamNumber, 964 unordered, ppid); 965 966 /* Substitute a native buffer */ 967 int pos = src.position(); 968 int lim = src.limit(); 969 assert (pos <= lim && streamNumber >= 0); 970 971 int rem = (pos <= lim ? lim - pos : 0); 972 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); 973 try { 974 bb.put(src); 975 bb.flip(); 976 /* Do not update src until we see how many bytes were written */ 977 src.position(pos); 978 979 int n = sendFromNativeBuffer(fd, bb, target, streamNumber, 980 unordered, ppid); 981 if (n > 0) { 982 /* now update src */ 983 src.position(pos + n); 984 } 985 return n; 986 } finally { 987 Util.releaseTemporaryDirectBuffer(bb); 988 } 989 } 990 991 private int sendFromNativeBuffer(int fd, 992 ByteBuffer bb, 993 SocketAddress target, 994 int streamNumber, 995 boolean unordered, 996 int ppid) 997 throws IOException { 998 InetAddress addr = null; // no preferred address 999 int port = 0; 1000 if (target != null) { 1001 InetSocketAddress isa = Net.checkAddress(target); 1002 addr = isa.getAddress(); 1003 if (addr.isLinkLocalAddress()) { 1004 addr = IPAddressUtil.toScopedAddress(addr); 1005 } 1006 port = isa.getPort(); 1007 } 1008 1009 int pos = bb.position(); 1010 int lim = bb.limit(); 1011 assert (pos <= lim); 1012 int rem = (pos <= lim ? lim - pos : 0); 1013 1014 NIO_ACCESS.acquireSession(bb); 1015 try { 1016 int written = send0(fd, NIO_ACCESS.getBufferAddress(bb) + pos, rem, addr, 1017 port, -1 /*121*/, streamNumber, unordered, ppid); 1018 if (written > 0) 1019 bb.position(pos + written); 1020 return written; 1021 } finally { 1022 NIO_ACCESS.releaseSession(bb); 1023 } 1024 } 1025 1026 @Override 1027 public SctpChannel shutdown() throws IOException { 1028 synchronized(stateLock) { 1029 if (isShutdown) 1030 return this; 1031 1032 ensureSendOpen(); 1033 SctpNet.shutdown(fdVal, -1); 1034 if (NativeThread.isNativeThread(senderThread)) 1035 senderThread.signal(); 1036 isShutdown = true; 1037 } 1038 return this; 1039 } 1040 1041 @Override 1042 public Set<SocketAddress> getAllLocalAddresses() 1043 throws IOException { 1044 synchronized (stateLock) { 1045 if (!isOpen()) 1046 throw new ClosedChannelException(); 1047 if (!isBound()) 1048 return Collections.emptySet(); 1049 1050 return SctpNet.getLocalAddresses(fdVal); 1051 } 1052 } 1053 1054 @Override 1055 public Set<SocketAddress> getRemoteAddresses() 1056 throws IOException { 1057 synchronized (stateLock) { 1058 if (!isOpen()) 1059 throw new ClosedChannelException(); 1060 if (!isConnected() || isShutdown) 1061 return Collections.emptySet(); 1062 1063 try { 1064 return SctpNet.getRemoteAddresses(fdVal, 0/*unused*/); 1065 } catch (SocketException unused) { 1066 /* an open connected channel should always have remote addresses */ 1067 return remoteAddresses; 1068 } 1069 } 1070 } 1071 1072 /* Native */ 1073 private static native void initIDs(); 1074 1075 static native int receive0(int fd, ResultContainer resultContainer, 1076 long address, int length, boolean peek) throws IOException; 1077 1078 static native int send0(int fd, long address, int length, 1079 InetAddress addr, int port, int assocId, int streamNumber, 1080 boolean unordered, int ppid) throws IOException; 1081 1082 static { 1083 loadSctpLibrary(); 1084 } 1085 1086 @SuppressWarnings("restricted") 1087 private static void loadSctpLibrary() { 1088 IOUtil.load(); /* loads nio & net native libraries */ 1089 System.loadLibrary("sctp"); 1090 initIDs(); 1091 } 1092 }