1 /* 2 * Copyright (c) 2009, 2025, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch.sctp; 27 28 import java.net.InetAddress; 29 import java.net.SocketAddress; 30 import java.net.SocketException; 31 import java.net.InetSocketAddress; 32 import java.io.FileDescriptor; 33 import java.io.IOException; 34 import java.util.Collections; 35 import java.util.Map.Entry; 36 import java.util.Iterator; 37 import java.util.Set; 38 import java.util.HashSet; 39 import java.util.HashMap; 40 import java.util.Map; 41 import java.nio.ByteBuffer; 42 import java.nio.channels.SelectionKey; 43 import java.nio.channels.ClosedChannelException; 44 import java.nio.channels.NotYetBoundException; 45 import java.nio.channels.spi.SelectorProvider; 46 import com.sun.nio.sctp.AbstractNotificationHandler; 47 import com.sun.nio.sctp.Association; 48 import com.sun.nio.sctp.AssociationChangeNotification; 49 import com.sun.nio.sctp.HandlerResult; 50 import com.sun.nio.sctp.IllegalReceiveException; 51 import com.sun.nio.sctp.InvalidStreamException; 52 import com.sun.nio.sctp.IllegalUnbindException; 53 import com.sun.nio.sctp.NotificationHandler; 54 import com.sun.nio.sctp.MessageInfo; 55 import com.sun.nio.sctp.SctpChannel; 56 import com.sun.nio.sctp.SctpMultiChannel; 57 import com.sun.nio.sctp.SctpSocketOption; 58 import jdk.internal.access.JavaNioAccess; 59 import jdk.internal.access.SharedSecrets; 60 import sun.net.util.IPAddressUtil; 61 import sun.nio.ch.DirectBuffer; 62 import sun.nio.ch.NativeThread; 63 import sun.nio.ch.IOStatus; 64 import sun.nio.ch.IOUtil; 65 import sun.nio.ch.Net; 66 import sun.nio.ch.SelChImpl; 67 import sun.nio.ch.SelectionKeyImpl; 68 import sun.nio.ch.Util; 69 import static com.sun.nio.sctp.SctpStandardSocketOptions.*; 70 import static sun.nio.ch.sctp.ResultContainer.*; 71 72 /** 73 * An implementation of SctpMultiChannel 74 */ 75 public class SctpMultiChannelImpl extends SctpMultiChannel 76 implements SelChImpl 77 { 78 private static final JavaNioAccess NIO_ACCESS = SharedSecrets.getJavaNioAccess(); 79 80 private final FileDescriptor fd; 81 82 private final int fdVal; 83 84 /* IDs of native threads doing send and receives, for signalling */ 85 private volatile NativeThread receiverThread; 86 private volatile NativeThread senderThread; 87 88 /* Lock held by current receiving thread */ 89 private final Object receiveLock = new Object(); 90 91 /* Lock held by current sending thread */ 92 private final Object sendLock = new Object(); 93 94 /* Lock held by any thread that modifies the state fields declared below 95 * DO NOT invoke a blocking I/O operation while holding this lock! */ 96 private final Object stateLock = new Object(); 97 98 private enum ChannelState { 99 UNINITIALIZED, 100 KILLPENDING, 101 KILLED, 102 } 103 104 /* -- The following fields are protected by stateLock -- */ 105 private ChannelState state = ChannelState.UNINITIALIZED; 106 107 /* Binding: Once bound the port will remain constant. */ 108 int port = -1; 109 private final Set<InetSocketAddress> localAddresses = new HashSet<>(); 110 /* Has the channel been bound to the wildcard address */ 111 private boolean wildcard; /* false */ 112 113 /* Keeps a map of addresses to association, and vice versa */ 114 private final Map<SocketAddress, Association> addressMap = 115 new HashMap<>(); 116 private final Map<Association, Set<SocketAddress>> associationMap = 117 new HashMap<>(); 118 119 /* -- End of fields protected by stateLock -- */ 120 121 /* If an association has been shutdown mark it for removal after 122 * the user handler has been invoked */ 123 private final ThreadLocal<Association> associationToRemove = new ThreadLocal<>(); 124 125 /* A notification handler cannot invoke receive */ 126 private final ThreadLocal<Boolean> receiveInvoked = 127 new ThreadLocal<>() { 128 @Override protected Boolean initialValue() { 129 return Boolean.FALSE; 130 } 131 }; 132 133 public SctpMultiChannelImpl(SelectorProvider provider) 134 throws IOException { 135 //TODO: update provider, remove public modifier 136 super(provider); 137 this.fd = SctpNet.socket(false /*one-to-many*/); 138 this.fdVal = IOUtil.fdVal(fd); 139 } 140 141 @Override 142 public SctpMultiChannel bind(SocketAddress local, int backlog) 143 throws IOException { 144 synchronized (receiveLock) { 145 synchronized (sendLock) { 146 synchronized (stateLock) { 147 ensureOpen(); 148 if (isBound()) 149 SctpNet.throwAlreadyBoundException(); 150 InetSocketAddress isa = (local == null) ? 151 new InetSocketAddress(0) : Net.checkAddress(local); 152 153 Net.bind(fd, isa.getAddress(), isa.getPort()); 154 155 InetSocketAddress boundIsa = Net.localAddress(fd); 156 port = boundIsa.getPort(); 157 localAddresses.add(isa); 158 if (isa.getAddress().isAnyLocalAddress()) 159 wildcard = true; 160 161 SctpNet.listen(fdVal, backlog < 1 ? 50 : backlog); 162 } 163 } 164 } 165 return this; 166 } 167 168 @Override 169 public SctpMultiChannel bindAddress(InetAddress address) 170 throws IOException { 171 return bindUnbindAddress(address, true); 172 } 173 174 @Override 175 public SctpMultiChannel unbindAddress(InetAddress address) 176 throws IOException { 177 return bindUnbindAddress(address, false); 178 } 179 180 private SctpMultiChannel bindUnbindAddress(InetAddress address, 181 boolean add) 182 throws IOException { 183 if (address == null) 184 throw new IllegalArgumentException(); 185 186 synchronized (receiveLock) { 187 synchronized (sendLock) { 188 synchronized (stateLock) { 189 if (!isOpen()) 190 throw new ClosedChannelException(); 191 if (!isBound()) 192 throw new NotYetBoundException(); 193 if (wildcard) 194 throw new IllegalStateException( 195 "Cannot add or remove addresses from a channel that is bound to the wildcard address"); 196 if (address.isAnyLocalAddress()) 197 throw new IllegalArgumentException( 198 "Cannot add or remove the wildcard address"); 199 if (add) { 200 for (InetSocketAddress addr : localAddresses) { 201 if (addr.getAddress().equals(address)) { 202 SctpNet.throwAlreadyBoundException(); 203 } 204 } 205 } else { /*removing */ 206 /* Verify that there is more than one address 207 * and that address is already bound */ 208 if (localAddresses.size() <= 1) 209 throw new IllegalUnbindException("Cannot remove address from a channel with only one address bound"); 210 boolean foundAddress = false; 211 for (InetSocketAddress addr : localAddresses) { 212 if (addr.getAddress().equals(address)) { 213 foundAddress = true; 214 break; 215 } 216 } 217 if (!foundAddress ) 218 throw new IllegalUnbindException("Cannot remove address from a channel that is not bound to that address"); 219 } 220 221 SctpNet.bindx(fdVal, new InetAddress[]{address}, port, add); 222 223 /* Update our internal Set to reflect the addition/removal */ 224 if (add) 225 localAddresses.add(new InetSocketAddress(address, port)); 226 else { 227 for (InetSocketAddress addr : localAddresses) { 228 if (addr.getAddress().equals(address)) { 229 localAddresses.remove(addr); 230 break; 231 } 232 } 233 } 234 } 235 } 236 } 237 return this; 238 } 239 240 @Override 241 public Set<Association> associations() 242 throws ClosedChannelException, NotYetBoundException { 243 synchronized (stateLock) { 244 if (!isOpen()) 245 throw new ClosedChannelException(); 246 if (!isBound()) 247 throw new NotYetBoundException(); 248 249 return Collections.unmodifiableSet(associationMap.keySet()); 250 } 251 } 252 253 private boolean isBound() { 254 synchronized (stateLock) { 255 return port != -1; 256 } 257 } 258 259 private void ensureOpen() throws IOException { 260 synchronized (stateLock) { 261 if (!isOpen()) 262 throw new ClosedChannelException(); 263 } 264 } 265 266 private void receiverCleanup() throws IOException { 267 synchronized (stateLock) { 268 receiverThread = null; 269 if (state == ChannelState.KILLPENDING) 270 kill(); 271 } 272 } 273 274 private void senderCleanup() throws IOException { 275 synchronized (stateLock) { 276 senderThread = null; 277 if (state == ChannelState.KILLPENDING) 278 kill(); 279 } 280 } 281 282 @Override 283 protected void implConfigureBlocking(boolean block) throws IOException { 284 IOUtil.configureBlocking(fd, block); 285 } 286 287 @Override 288 public void implCloseSelectableChannel() throws IOException { 289 synchronized (stateLock) { 290 if (state != ChannelState.KILLED) 291 SctpNet.preClose(fdVal); 292 293 if (NativeThread.isNativeThread(receiverThread)) 294 receiverThread.signal(); 295 296 if (NativeThread.isNativeThread(senderThread)) 297 senderThread.signal(); 298 299 if (!isRegistered()) 300 kill(); 301 } 302 } 303 304 @Override 305 public FileDescriptor getFD() { 306 return fd; 307 } 308 309 @Override 310 public int getFDVal() { 311 return fdVal; 312 } 313 314 /** 315 * Translates native poll revent ops into a ready operation ops 316 */ 317 private boolean translateReadyOps(int ops, int initialOps, 318 SelectionKeyImpl sk) { 319 int intOps = sk.nioInterestOps(); 320 int oldOps = sk.nioReadyOps(); 321 int newOps = initialOps; 322 323 if ((ops & Net.POLLNVAL) != 0) { 324 /* This should only happen if this channel is pre-closed while a 325 * selection operation is in progress 326 * ## Throw an error if this channel has not been pre-closed */ 327 return false; 328 } 329 330 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) { 331 newOps = intOps; 332 sk.nioReadyOps(newOps); 333 return (newOps & ~oldOps) != 0; 334 } 335 336 if (((ops & Net.POLLIN) != 0) && 337 ((intOps & SelectionKey.OP_READ) != 0)) 338 newOps |= SelectionKey.OP_READ; 339 340 if (((ops & Net.POLLOUT) != 0) && 341 ((intOps & SelectionKey.OP_WRITE) != 0)) 342 newOps |= SelectionKey.OP_WRITE; 343 344 sk.nioReadyOps(newOps); 345 return (newOps & ~oldOps) != 0; 346 } 347 348 @Override 349 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) { 350 return translateReadyOps(ops, sk.nioReadyOps(), sk); 351 } 352 353 @Override 354 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) { 355 return translateReadyOps(ops, 0, sk); 356 } 357 358 @Override 359 public int translateInterestOps(int ops) { 360 int newOps = 0; 361 if ((ops & SelectionKey.OP_READ) != 0) 362 newOps |= Net.POLLIN; 363 if ((ops & SelectionKey.OP_WRITE) != 0) 364 newOps |= Net.POLLOUT; 365 return newOps; 366 } 367 368 @Override 369 public void kill() throws IOException { 370 synchronized (stateLock) { 371 if (state == ChannelState.KILLED) 372 return; 373 if (state == ChannelState.UNINITIALIZED) { 374 state = ChannelState.KILLED; 375 SctpNet.close(fdVal); 376 return; 377 } 378 assert !isOpen() && !isRegistered(); 379 380 /* Postpone the kill if there is a thread sending or receiving. */ 381 if (receiverThread == null && senderThread == null) { 382 state = ChannelState.KILLED; 383 SctpNet.close(fdVal); 384 } else { 385 state = ChannelState.KILLPENDING; 386 } 387 } 388 } 389 390 @Override 391 public <T> SctpMultiChannel setOption(SctpSocketOption<T> name, 392 T value, 393 Association association) 394 throws IOException { 395 if (name == null) 396 throw new NullPointerException(); 397 if (!(supportedOptions().contains(name))) 398 throw new UnsupportedOperationException("'" + name + "' not supported"); 399 400 synchronized (stateLock) { 401 if (association != null && (name.equals(SCTP_PRIMARY_ADDR) || 402 name.equals(SCTP_SET_PEER_PRIMARY_ADDR))) { 403 checkAssociation(association); 404 } 405 if (!isOpen()) 406 throw new ClosedChannelException(); 407 408 int assocId = association == null ? 0 : association.associationID(); 409 SctpNet.setSocketOption(fdVal, name, value, assocId); 410 } 411 return this; 412 } 413 414 @Override 415 @SuppressWarnings("unchecked") 416 public <T> T getOption(SctpSocketOption<T> name, Association association) 417 throws IOException { 418 if (name == null) 419 throw new NullPointerException(); 420 if (!supportedOptions().contains(name)) 421 throw new UnsupportedOperationException("'" + name + "' not supported"); 422 423 synchronized (stateLock) { 424 if (association != null && (name.equals(SCTP_PRIMARY_ADDR) || 425 name.equals(SCTP_SET_PEER_PRIMARY_ADDR))) { 426 checkAssociation(association); 427 } 428 if (!isOpen()) 429 throw new ClosedChannelException(); 430 431 int assocId = association == null ? 0 : association.associationID(); 432 return (T)SctpNet.getSocketOption(fdVal, name, assocId); 433 } 434 } 435 436 @Override 437 public final Set<SctpSocketOption<?>> supportedOptions() { 438 final class Holder { 439 static final Set<SctpSocketOption<?>> DEFAULT_OPTIONS = Set.of( 440 SCTP_DISABLE_FRAGMENTS, 441 SCTP_EXPLICIT_COMPLETE, 442 SCTP_FRAGMENT_INTERLEAVE, 443 SCTP_INIT_MAXSTREAMS, 444 SCTP_NODELAY, 445 SCTP_PRIMARY_ADDR, 446 SCTP_SET_PEER_PRIMARY_ADDR, 447 SO_SNDBUF, 448 SO_RCVBUF, 449 SO_LINGER); 450 451 } 452 return Holder.DEFAULT_OPTIONS; 453 } 454 455 @Override 456 public <T> MessageInfo receive(ByteBuffer buffer, 457 T attachment, 458 NotificationHandler<T> handler) 459 throws IOException { 460 if (buffer == null) 461 throw new IllegalArgumentException("buffer cannot be null"); 462 463 if (buffer.isReadOnly()) 464 throw new IllegalArgumentException("Read-only buffer"); 465 466 if (receiveInvoked.get()) 467 throw new IllegalReceiveException( 468 "cannot invoke receive from handler"); 469 receiveInvoked.set(Boolean.TRUE); 470 471 try { 472 ResultContainer resultContainer = new ResultContainer(); 473 do { 474 resultContainer.clear(); 475 synchronized (receiveLock) { 476 ensureOpen(); 477 if (!isBound()) 478 throw new NotYetBoundException(); 479 480 int n = 0; 481 try { 482 begin(); 483 484 synchronized (stateLock) { 485 if(!isOpen()) 486 return null; 487 receiverThread = NativeThread.current(); 488 } 489 490 do { 491 n = receive(fdVal, buffer, resultContainer); 492 } while ((n == IOStatus.INTERRUPTED) && isOpen()); 493 494 } finally { 495 receiverCleanup(); 496 end((n > 0) || (n == IOStatus.UNAVAILABLE)); 497 assert IOStatus.check(n); 498 } 499 500 if (!resultContainer.isNotification()) { 501 /* message or nothing */ 502 if (resultContainer.hasSomething()) { 503 /* Set the association before returning */ 504 MessageInfoImpl info = 505 resultContainer.getMessageInfo(); 506 info.setAssociation(lookupAssociation(info. 507 associationID())); 508 509 assert info.association() != null; 510 return info; 511 } else { 512 /* Non-blocking may return null if nothing available*/ 513 return null; 514 } 515 } else { /* notification */ 516 synchronized (stateLock) { 517 handleNotificationInternal( 518 resultContainer); 519 } 520 } 521 } /* receiveLock */ 522 } while (handler == null ? true : 523 (invokeNotificationHandler(resultContainer, handler, attachment) 524 == HandlerResult.CONTINUE)); 525 } finally { 526 receiveInvoked.set(Boolean.FALSE); 527 } 528 529 return null; 530 } 531 532 private int receive(int fd, 533 ByteBuffer dst, 534 ResultContainer resultContainer) 535 throws IOException { 536 int pos = dst.position(); 537 int lim = dst.limit(); 538 assert (pos <= lim); 539 int rem = (pos <= lim ? lim - pos : 0); 540 if (dst instanceof DirectBuffer && rem > 0) 541 return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos); 542 543 /* Substitute a native buffer. */ 544 int newSize = Math.max(rem, 1); 545 ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize); 546 try { 547 int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0); 548 bb.flip(); 549 if (n > 0 && rem > 0) 550 dst.put(bb); 551 return n; 552 } finally { 553 Util.releaseTemporaryDirectBuffer(bb); 554 } 555 } 556 557 private int receiveIntoNativeBuffer(int fd, 558 ResultContainer resultContainer, 559 ByteBuffer bb, 560 int rem, 561 int pos) 562 throws IOException { 563 NIO_ACCESS.acquireSession(bb); 564 try { 565 int n = receive0(fd, resultContainer, NIO_ACCESS.getBufferAddress(bb) + pos, rem); 566 if (n > 0) 567 bb.position(pos + n); 568 return n; 569 } finally { 570 NIO_ACCESS.releaseSession(bb); 571 } 572 } 573 574 private final InternalNotificationHandler internalNotificationHandler = 575 new InternalNotificationHandler(); 576 577 private void handleNotificationInternal(ResultContainer resultContainer) 578 { 579 invokeNotificationHandler(resultContainer, 580 internalNotificationHandler, null); 581 } 582 583 private final class InternalNotificationHandler 584 extends AbstractNotificationHandler<Object> 585 { 586 @Override 587 public HandlerResult handleNotification(AssociationChangeNotification not, 588 Object unused) { 589 AssociationChange sac = (AssociationChange) not; 590 591 /* Update map to reflect change in association */ 592 switch (not.event()) { 593 case COMM_UP -> { 594 Association newAssociation = new AssociationImpl 595 (sac.assocId(), sac.maxInStreams(), sac.maxOutStreams()); 596 addAssociation(newAssociation); 597 } 598 case SHUTDOWN, COMM_LOST -> 599 //case RESTART: ??? 600 /* mark association for removal after user handler invoked*/ 601 associationToRemove.set(lookupAssociation(sac.assocId())); 602 } 603 return HandlerResult.CONTINUE; 604 } 605 } 606 607 private <T> HandlerResult invokeNotificationHandler(ResultContainer resultContainer, 608 NotificationHandler<T> handler, 609 T attachment) { 610 HandlerResult result; 611 SctpNotification notification = resultContainer.notification(); 612 notification.setAssociation(lookupAssociation(notification.assocId())); 613 614 if (!(handler instanceof AbstractNotificationHandler<T> absHandler)) { 615 result = handler.handleNotification(notification, attachment); 616 } else { /* AbstractNotificationHandler */ 617 result = switch (resultContainer.type()) { 618 case ASSOCIATION_CHANGED -> absHandler.handleNotification( 619 resultContainer.getAssociationChanged(), attachment); 620 case PEER_ADDRESS_CHANGED -> absHandler.handleNotification( 621 resultContainer.getPeerAddressChanged(), attachment); 622 case SEND_FAILED -> absHandler.handleNotification( 623 resultContainer.getSendFailed(), attachment); 624 case SHUTDOWN -> absHandler.handleNotification( 625 resultContainer.getShutdown(), attachment); 626 /* implementation specific handlers */ 627 default -> absHandler.handleNotification( 628 resultContainer.notification(), attachment); 629 }; 630 } 631 632 if (!(handler instanceof InternalNotificationHandler)) { 633 /* Only remove associations after user handler 634 * has finished with them */ 635 Association assoc = associationToRemove.get(); 636 if (assoc != null) { 637 removeAssociation(assoc); 638 associationToRemove.set(null); 639 } 640 641 } 642 643 return result; 644 } 645 646 private Association lookupAssociation(int assocId) { 647 /* Lookup the association in our internal map */ 648 synchronized (stateLock) { 649 Set<Association> assocs = associationMap.keySet(); 650 for (Association a : assocs) { 651 if (a.associationID() == assocId) { 652 return a; 653 } 654 } 655 } 656 return null; 657 } 658 659 private void addAssociation(Association association) { 660 synchronized (stateLock) { 661 int assocId = association.associationID(); 662 Set<SocketAddress> addresses = null; 663 664 try { 665 addresses = SctpNet.getRemoteAddresses(fdVal, assocId); 666 } catch (IOException unused) { 667 /* OK, determining connected addresses may not be possible 668 * shutdown, connection lost, etc */ 669 } 670 671 associationMap.put(association, addresses); 672 if (addresses != null) { 673 for (SocketAddress addr : addresses) 674 addressMap.put(addr, association); 675 } 676 } 677 } 678 679 private void removeAssociation(Association association) { 680 synchronized (stateLock) { 681 int assocId = association.associationID(); 682 Set<SocketAddress> addresses = null; 683 684 try { 685 addresses = SctpNet.getRemoteAddresses(fdVal, assocId); 686 } catch (IOException unused) { 687 /* OK, determining connected addresses may not be possible 688 * shutdown, connection lost, etc */ 689 } 690 691 Set<Association> assocs = associationMap.keySet(); 692 for (Association a : assocs) { 693 if (a.associationID() == assocId) { 694 associationMap.remove(a); 695 break; 696 } 697 } 698 if (addresses != null) { 699 for (SocketAddress addr : addresses) 700 addressMap.remove(addr); 701 } else { 702 /* We cannot determine the connected addresses */ 703 Set<java.util.Map.Entry<SocketAddress, Association>> addrAssocs = 704 addressMap.entrySet(); 705 Iterator<Entry<SocketAddress, Association>> iterator = addrAssocs.iterator(); 706 while (iterator.hasNext()) { 707 Entry<SocketAddress, Association> entry = iterator.next(); 708 if (entry.getValue().equals(association)) { 709 iterator.remove(); 710 } 711 } 712 } 713 } 714 } 715 716 /** 717 * Checks if the given association is controlled by this channel. 718 * 719 * @throws IllegalArgumentException If the given association is not controlled by this channel 720 */ 721 private void checkAssociation(Association messageAssoc) { 722 synchronized (stateLock) { 723 for (Association association : associationMap.keySet()) { 724 if (messageAssoc.equals(association)) { 725 return; 726 } 727 } 728 } 729 throw new IllegalArgumentException( 730 "Given Association is not controlled by this channel"); 731 } 732 733 private void checkStreamNumber(Association assoc, int streamNumber) { 734 synchronized (stateLock) { 735 if (streamNumber < 0 || streamNumber >= assoc.maxOutboundStreams()) 736 throw new InvalidStreamException(); 737 } 738 } 739 740 /* TODO: Add support for ttl and isComplete to both 121 12M 741 * SCTP_EOR not yet supported on reference platforms 742 * TTL support limited... 743 */ 744 @Override 745 public int send(ByteBuffer buffer, MessageInfo messageInfo) 746 throws IOException { 747 if (buffer == null) 748 throw new IllegalArgumentException("buffer cannot be null"); 749 750 if (messageInfo == null) 751 throw new IllegalArgumentException("messageInfo cannot be null"); 752 753 synchronized (sendLock) { 754 ensureOpen(); 755 756 if (!isBound()) 757 bind(null, 0); 758 759 int n = 0; 760 try { 761 int assocId = -1; 762 SocketAddress address = null; 763 begin(); 764 765 synchronized (stateLock) { 766 if(!isOpen()) 767 return 0; 768 senderThread = NativeThread.current(); 769 770 /* Determine what address or association to send to */ 771 Association assoc = messageInfo.association(); 772 InetSocketAddress addr = (InetSocketAddress)messageInfo.address(); 773 if (assoc != null) { 774 checkAssociation(assoc); 775 checkStreamNumber(assoc, messageInfo.streamNumber()); 776 assocId = assoc.associationID(); 777 /* have we also got a preferred address */ 778 if (addr != null) { 779 if (!assoc.equals(addressMap.get(addr))) 780 throw new IllegalArgumentException("given preferred address is not part of this association"); 781 address = addr; 782 } 783 } else if (addr != null) { 784 address = addr; 785 Association association = addressMap.get(addr); 786 if (association != null) { 787 checkStreamNumber(association, messageInfo.streamNumber()); 788 assocId = association.associationID(); 789 790 } 791 } else { 792 throw new AssertionError( 793 "Both association and address cannot be null"); 794 } 795 } 796 797 do { 798 n = send(fdVal, buffer, assocId, address, messageInfo); 799 } while ((n == IOStatus.INTERRUPTED) && isOpen()); 800 801 return IOStatus.normalize(n); 802 } finally { 803 senderCleanup(); 804 end((n > 0) || (n == IOStatus.UNAVAILABLE)); 805 assert IOStatus.check(n); 806 } 807 } 808 } 809 810 private int send(int fd, 811 ByteBuffer src, 812 int assocId, 813 SocketAddress target, 814 MessageInfo messageInfo) 815 throws IOException { 816 int streamNumber = messageInfo.streamNumber(); 817 boolean unordered = messageInfo.isUnordered(); 818 int ppid = messageInfo.payloadProtocolID(); 819 820 if (src instanceof DirectBuffer) 821 return sendFromNativeBuffer(fd, src, target, assocId, 822 streamNumber, unordered, ppid); 823 824 /* Substitute a native buffer */ 825 int pos = src.position(); 826 int lim = src.limit(); 827 assert (pos <= lim && streamNumber >= 0); 828 829 int rem = (pos <= lim ? lim - pos : 0); 830 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); 831 try { 832 bb.put(src); 833 bb.flip(); 834 /* Do not update src until we see how many bytes were written */ 835 src.position(pos); 836 837 int n = sendFromNativeBuffer(fd, bb, target, assocId, 838 streamNumber, unordered, ppid); 839 if (n > 0) { 840 /* now update src */ 841 src.position(pos + n); 842 } 843 return n; 844 } finally { 845 Util.releaseTemporaryDirectBuffer(bb); 846 } 847 } 848 849 private int sendFromNativeBuffer(int fd, 850 ByteBuffer bb, 851 SocketAddress target, 852 int assocId, 853 int streamNumber, 854 boolean unordered, 855 int ppid) 856 throws IOException { 857 InetAddress addr = null; // no preferred address 858 int port = 0; 859 if (target != null) { 860 InetSocketAddress isa = Net.checkAddress(target); 861 addr = isa.getAddress(); 862 if (addr.isLinkLocalAddress()) { 863 addr = IPAddressUtil.toScopedAddress(addr); 864 } 865 port = isa.getPort(); 866 } 867 int pos = bb.position(); 868 int lim = bb.limit(); 869 assert (pos <= lim); 870 int rem = (pos <= lim ? lim - pos : 0); 871 872 NIO_ACCESS.acquireSession(bb); 873 try { 874 int written = send0(fd, NIO_ACCESS.getBufferAddress(bb) + pos, rem, addr, 875 port, assocId, streamNumber, unordered, ppid); 876 if (written > 0) 877 bb.position(pos + written); 878 return written; 879 } finally { 880 NIO_ACCESS.releaseSession(bb); 881 } 882 } 883 884 @Override 885 public SctpMultiChannel shutdown(Association association) 886 throws IOException { 887 synchronized (stateLock) { 888 checkAssociation(association); 889 if (!isOpen()) 890 throw new ClosedChannelException(); 891 892 SctpNet.shutdown(fdVal, association.associationID()); 893 } 894 return this; 895 } 896 897 @Override 898 public Set<SocketAddress> getAllLocalAddresses() 899 throws IOException { 900 synchronized (stateLock) { 901 if (!isOpen()) 902 throw new ClosedChannelException(); 903 if (!isBound()) 904 return Collections.emptySet(); 905 906 return SctpNet.getLocalAddresses(fdVal); 907 } 908 } 909 910 @Override 911 public Set<SocketAddress> getRemoteAddresses(Association association) 912 throws IOException { 913 synchronized (stateLock) { 914 checkAssociation(association); 915 if (!isOpen()) 916 throw new ClosedChannelException(); 917 918 try { 919 return SctpNet.getRemoteAddresses(fdVal, association.associationID()); 920 } catch (SocketException se) { 921 /* a valid association should always have remote addresses */ 922 return associationMap.getOrDefault(association, Collections.emptySet()); 923 } 924 } 925 } 926 927 @Override 928 public SctpChannel branch(Association association) 929 throws IOException { 930 synchronized (stateLock) { 931 checkAssociation(association); 932 if (!isOpen()) 933 throw new ClosedChannelException(); 934 935 FileDescriptor bFd = SctpNet.branch(fdVal, 936 association.associationID()); 937 /* successfully branched, we can now remove it from assoc list */ 938 removeAssociation(association); 939 940 return new SctpChannelImpl(provider(), bFd, association); 941 } 942 } 943 944 /* Use common native implementation shared between 945 * one-to-one and one-to-many */ 946 private static int receive0(int fd, 947 ResultContainer resultContainer, 948 long address, 949 int length) 950 throws IOException{ 951 return SctpChannelImpl.receive0(fd, resultContainer, address, 952 length, false /*peek */); 953 } 954 955 private static int send0(int fd, 956 long address, 957 int length, 958 InetAddress addr, 959 int port, 960 int assocId, 961 int streamNumber, 962 boolean unordered, 963 int ppid) 964 throws IOException { 965 return SctpChannelImpl.send0(fd, address, length, addr, port, assocId, 966 streamNumber, unordered, ppid); 967 } 968 }