1 /*
  2  * Copyright (c) 2009, 2025, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 
 26 package sun.nio.ch.sctp;
 27 
 28 import java.net.InetAddress;
 29 import java.net.SocketAddress;
 30 import java.net.SocketException;
 31 import java.net.InetSocketAddress;
 32 import java.io.FileDescriptor;
 33 import java.io.IOException;
 34 import java.util.Collections;
 35 import java.util.Map.Entry;
 36 import java.util.Iterator;
 37 import java.util.Set;
 38 import java.util.HashSet;
 39 import java.util.HashMap;
 40 import java.util.Map;
 41 import java.nio.ByteBuffer;
 42 import java.nio.channels.SelectionKey;
 43 import java.nio.channels.ClosedChannelException;
 44 import java.nio.channels.NotYetBoundException;
 45 import java.nio.channels.spi.SelectorProvider;
 46 import com.sun.nio.sctp.AbstractNotificationHandler;
 47 import com.sun.nio.sctp.Association;
 48 import com.sun.nio.sctp.AssociationChangeNotification;
 49 import com.sun.nio.sctp.HandlerResult;
 50 import com.sun.nio.sctp.IllegalReceiveException;
 51 import com.sun.nio.sctp.InvalidStreamException;
 52 import com.sun.nio.sctp.IllegalUnbindException;
 53 import com.sun.nio.sctp.NotificationHandler;
 54 import com.sun.nio.sctp.MessageInfo;
 55 import com.sun.nio.sctp.SctpChannel;
 56 import com.sun.nio.sctp.SctpMultiChannel;
 57 import com.sun.nio.sctp.SctpSocketOption;
 58 import jdk.internal.access.JavaNioAccess;
 59 import jdk.internal.access.SharedSecrets;
 60 import sun.net.util.IPAddressUtil;
 61 import sun.nio.ch.DirectBuffer;
 62 import sun.nio.ch.NativeThread;
 63 import sun.nio.ch.IOStatus;
 64 import sun.nio.ch.IOUtil;
 65 import sun.nio.ch.Net;
 66 import sun.nio.ch.SelChImpl;
 67 import sun.nio.ch.SelectionKeyImpl;
 68 import sun.nio.ch.Util;
 69 import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
 70 import static sun.nio.ch.sctp.ResultContainer.*;
 71 
 72 /**
 73  * An implementation of SctpMultiChannel
 74  */
 75 public class SctpMultiChannelImpl extends SctpMultiChannel
 76     implements SelChImpl
 77 {
 78     private static final JavaNioAccess NIO_ACCESS = SharedSecrets.getJavaNioAccess();
 79 
 80     private final FileDescriptor fd;
 81 
 82     private final int fdVal;
 83 
 84     /* IDs of native threads doing send and receives, for signalling */
 85     private volatile long receiverThread;
 86     private volatile long senderThread;
 87 
 88     /* Lock held by current receiving thread */
 89     private final Object receiveLock = new Object();
 90 
 91     /* Lock held by current sending thread */
 92     private final Object sendLock = new Object();
 93 
 94     /* Lock held by any thread that modifies the state fields declared below
 95      * DO NOT invoke a blocking I/O operation while holding this lock! */
 96     private final Object stateLock = new Object();
 97 
 98     private enum ChannelState {
 99         UNINITIALIZED,
100         KILLPENDING,
101         KILLED,
102     }
103 
104     /* -- The following fields are protected by stateLock -- */
105     private ChannelState state = ChannelState.UNINITIALIZED;
106 
107     /* Binding: Once bound the port will remain constant. */
108     int port = -1;
109     private final Set<InetSocketAddress> localAddresses = new HashSet<>();
110     /* Has the channel been bound to the wildcard address */
111     private boolean wildcard; /* false */
112 
113     /* Keeps a map of addresses to association, and vice versa */
114     private final Map<SocketAddress, Association> addressMap =
115                          new HashMap<>();
116     private final Map<Association, Set<SocketAddress>> associationMap =
117                          new HashMap<>();
118 
119     /* -- End of fields protected by stateLock -- */
120 
121     /* If an association has been shutdown mark it for removal after
122      * the user handler has been invoked */
123     private final ThreadLocal<Association> associationToRemove = new ThreadLocal<>();
124 
125     /* A notification handler cannot invoke receive */
126     private final ThreadLocal<Boolean> receiveInvoked =
127         new ThreadLocal<>() {
128              @Override protected Boolean initialValue() {
129                  return Boolean.FALSE;
130             }
131     };
132 
133     public SctpMultiChannelImpl(SelectorProvider provider)
134             throws IOException {
135         //TODO: update provider, remove public modifier
136         super(provider);
137         this.fd = SctpNet.socket(false /*one-to-many*/);
138         this.fdVal = IOUtil.fdVal(fd);
139     }
140 
141     @Override
142     public SctpMultiChannel bind(SocketAddress local, int backlog)
143             throws IOException {
144         synchronized (receiveLock) {
145             synchronized (sendLock) {
146                 synchronized (stateLock) {
147                     ensureOpen();
148                     if (isBound())
149                         SctpNet.throwAlreadyBoundException();
150                     InetSocketAddress isa = (local == null) ?
151                         new InetSocketAddress(0) : Net.checkAddress(local);
152 
153                     Net.bind(fd, isa.getAddress(), isa.getPort());
154 
155                     InetSocketAddress boundIsa = Net.localAddress(fd);
156                     port = boundIsa.getPort();
157                     localAddresses.add(isa);
158                     if (isa.getAddress().isAnyLocalAddress())
159                         wildcard = true;
160 
161                     SctpNet.listen(fdVal, backlog < 1 ? 50 : backlog);
162                 }
163             }
164         }
165         return this;
166     }
167 
168     @Override
169     public SctpMultiChannel bindAddress(InetAddress address)
170             throws IOException {
171         return bindUnbindAddress(address, true);
172     }
173 
174     @Override
175     public SctpMultiChannel unbindAddress(InetAddress address)
176             throws IOException {
177         return bindUnbindAddress(address, false);
178     }
179 
180     private SctpMultiChannel bindUnbindAddress(InetAddress address,
181                                                boolean add)
182             throws IOException {
183         if (address == null)
184             throw new IllegalArgumentException();
185 
186         synchronized (receiveLock) {
187             synchronized (sendLock) {
188                 synchronized (stateLock) {
189                     if (!isOpen())
190                         throw new ClosedChannelException();
191                     if (!isBound())
192                         throw new NotYetBoundException();
193                     if (wildcard)
194                         throw new IllegalStateException(
195                                 "Cannot add or remove addresses from a channel that is bound to the wildcard address");
196                     if (address.isAnyLocalAddress())
197                         throw new IllegalArgumentException(
198                                 "Cannot add or remove the wildcard address");
199                     if (add) {
200                         for (InetSocketAddress addr : localAddresses) {
201                             if (addr.getAddress().equals(address)) {
202                                 SctpNet.throwAlreadyBoundException();
203                             }
204                         }
205                     } else { /*removing */
206                         /* Verify that there is more than one address
207                          * and that address is already bound */
208                         if (localAddresses.size() <= 1)
209                             throw new IllegalUnbindException("Cannot remove address from a channel with only one address bound");
210                         boolean foundAddress = false;
211                         for (InetSocketAddress addr : localAddresses) {
212                             if (addr.getAddress().equals(address)) {
213                                 foundAddress = true;
214                                 break;
215                             }
216                         }
217                         if (!foundAddress )
218                             throw new IllegalUnbindException("Cannot remove address from a channel that is not bound to that address");
219                     }
220 
221                     SctpNet.bindx(fdVal, new InetAddress[]{address}, port, add);
222 
223                     /* Update our internal Set to reflect the addition/removal */
224                     if (add)
225                         localAddresses.add(new InetSocketAddress(address, port));
226                     else {
227                         for (InetSocketAddress addr : localAddresses) {
228                             if (addr.getAddress().equals(address)) {
229                                 localAddresses.remove(addr);
230                                 break;
231                             }
232                         }
233                     }
234                 }
235             }
236         }
237         return this;
238     }
239 
240     @Override
241     public Set<Association> associations()
242             throws ClosedChannelException, NotYetBoundException {
243         synchronized (stateLock) {
244             if (!isOpen())
245                 throw new ClosedChannelException();
246             if (!isBound())
247                 throw new NotYetBoundException();
248 
249             return Collections.unmodifiableSet(associationMap.keySet());
250         }
251     }
252 
253     private boolean isBound() {
254         synchronized (stateLock) {
255             return port != -1;
256         }
257     }
258 
259     private void ensureOpen() throws IOException {
260         synchronized (stateLock) {
261             if (!isOpen())
262                 throw new ClosedChannelException();
263         }
264     }
265 
266     private void receiverCleanup() throws IOException {
267         synchronized (stateLock) {
268             receiverThread = 0;
269             if (state == ChannelState.KILLPENDING)
270                 kill();
271         }
272     }
273 
274     private void senderCleanup() throws IOException {
275         synchronized (stateLock) {
276             senderThread = 0;
277             if (state == ChannelState.KILLPENDING)
278                 kill();
279         }
280     }
281 
282     @Override
283     protected void implConfigureBlocking(boolean block) throws IOException {
284         IOUtil.configureBlocking(fd, block);
285     }
286 
287     @Override
288     public void implCloseSelectableChannel() throws IOException {
289         synchronized (stateLock) {
290             if (state != ChannelState.KILLED)
291                 SctpNet.preClose(fdVal);
292 
293             if (receiverThread != 0)
294                 NativeThread.signal(receiverThread);
295 
296             if (senderThread != 0)
297                 NativeThread.signal(senderThread);
298 
299             if (!isRegistered())
300                 kill();
301         }
302     }
303 
304     @Override
305     public FileDescriptor getFD() {
306         return fd;
307     }
308 
309     @Override
310     public int getFDVal() {
311         return fdVal;
312     }
313 
314     /**
315      * Translates native poll revent ops into a ready operation ops
316      */
317     private boolean translateReadyOps(int ops, int initialOps,
318                                       SelectionKeyImpl sk) {
319         int intOps = sk.nioInterestOps();
320         int oldOps = sk.nioReadyOps();
321         int newOps = initialOps;
322 
323         if ((ops & Net.POLLNVAL) != 0) {
324             /* This should only happen if this channel is pre-closed while a
325              * selection operation is in progress
326              * ## Throw an error if this channel has not been pre-closed */
327             return false;
328         }
329 
330         if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
331             newOps = intOps;
332             sk.nioReadyOps(newOps);
333             return (newOps & ~oldOps) != 0;
334         }
335 
336         if (((ops & Net.POLLIN) != 0) &&
337             ((intOps & SelectionKey.OP_READ) != 0))
338             newOps |= SelectionKey.OP_READ;
339 
340         if (((ops & Net.POLLOUT) != 0) &&
341             ((intOps & SelectionKey.OP_WRITE) != 0))
342             newOps |= SelectionKey.OP_WRITE;
343 
344         sk.nioReadyOps(newOps);
345         return (newOps & ~oldOps) != 0;
346     }
347 
348     @Override
349     public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
350         return translateReadyOps(ops, sk.nioReadyOps(), sk);
351     }
352 
353     @Override
354     public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
355         return translateReadyOps(ops, 0, sk);
356     }
357 
358     @Override
359     public int translateInterestOps(int ops) {
360         int newOps = 0;
361         if ((ops & SelectionKey.OP_READ) != 0)
362             newOps |= Net.POLLIN;
363         if ((ops & SelectionKey.OP_WRITE) != 0)
364             newOps |= Net.POLLOUT;
365         return newOps;
366     }
367 
368     @Override
369     public void kill() throws IOException {
370         synchronized (stateLock) {
371             if (state == ChannelState.KILLED)
372                 return;
373             if (state == ChannelState.UNINITIALIZED) {
374                 state = ChannelState.KILLED;
375                 SctpNet.close(fdVal);
376                 return;
377             }
378             assert !isOpen() && !isRegistered();
379 
380             /* Postpone the kill if there is a thread sending or receiving. */
381             if (receiverThread == 0 && senderThread == 0) {
382                 state = ChannelState.KILLED;
383                 SctpNet.close(fdVal);
384             } else {
385                 state = ChannelState.KILLPENDING;
386             }
387         }
388     }
389 
390     @Override
391     public <T> SctpMultiChannel setOption(SctpSocketOption<T> name,
392                                           T value,
393                                           Association association)
394             throws IOException {
395         if (name == null)
396             throw new NullPointerException();
397         if (!(supportedOptions().contains(name)))
398             throw new UnsupportedOperationException("'" + name + "' not supported");
399 
400         synchronized (stateLock) {
401             if (association != null && (name.equals(SCTP_PRIMARY_ADDR) ||
402                     name.equals(SCTP_SET_PEER_PRIMARY_ADDR))) {
403                 checkAssociation(association);
404             }
405             if (!isOpen())
406                 throw new ClosedChannelException();
407 
408             int assocId = association == null ? 0 : association.associationID();
409             SctpNet.setSocketOption(fdVal, name, value, assocId);
410         }
411         return this;
412     }
413 
414     @Override
415     @SuppressWarnings("unchecked")
416     public <T> T getOption(SctpSocketOption<T> name, Association association)
417             throws IOException {
418         if (name == null)
419             throw new NullPointerException();
420         if (!supportedOptions().contains(name))
421             throw new UnsupportedOperationException("'" + name + "' not supported");
422 
423         synchronized (stateLock) {
424             if (association != null && (name.equals(SCTP_PRIMARY_ADDR) ||
425                     name.equals(SCTP_SET_PEER_PRIMARY_ADDR))) {
426                 checkAssociation(association);
427             }
428             if (!isOpen())
429                 throw new ClosedChannelException();
430 
431             int assocId = association == null ? 0 : association.associationID();
432             return (T)SctpNet.getSocketOption(fdVal, name, assocId);
433         }
434     }
435 
436     @Override
437     public final Set<SctpSocketOption<?>> supportedOptions() {
438         final class Holder {
439             static final Set<SctpSocketOption<?>> DEFAULT_OPTIONS = Set.of(
440                     SCTP_DISABLE_FRAGMENTS,
441                     SCTP_EXPLICIT_COMPLETE,
442                     SCTP_FRAGMENT_INTERLEAVE,
443                     SCTP_INIT_MAXSTREAMS,
444                     SCTP_NODELAY,
445                     SCTP_PRIMARY_ADDR,
446                     SCTP_SET_PEER_PRIMARY_ADDR,
447                     SO_SNDBUF,
448                     SO_RCVBUF,
449                     SO_LINGER);
450 
451         }
452         return Holder.DEFAULT_OPTIONS;
453     }
454 
455     @Override
456     public <T> MessageInfo receive(ByteBuffer buffer,
457                                    T attachment,
458                                    NotificationHandler<T> handler)
459             throws IOException {
460         if (buffer == null)
461             throw new IllegalArgumentException("buffer cannot be null");
462 
463         if (buffer.isReadOnly())
464             throw new IllegalArgumentException("Read-only buffer");
465 
466         if (receiveInvoked.get())
467             throw new IllegalReceiveException(
468                     "cannot invoke receive from handler");
469         receiveInvoked.set(Boolean.TRUE);
470 
471         try {
472             ResultContainer resultContainer = new ResultContainer();
473             do {
474                 resultContainer.clear();
475                 synchronized (receiveLock) {
476                     ensureOpen();
477                     if (!isBound())
478                         throw new NotYetBoundException();
479 
480                     int n = 0;
481                     try {
482                         begin();
483 
484                         synchronized (stateLock) {
485                             if(!isOpen())
486                                 return null;
487                             receiverThread = NativeThread.current();
488                         }
489 
490                         do {
491                             n = receive(fdVal, buffer, resultContainer);
492                         } while ((n == IOStatus.INTERRUPTED) && isOpen());
493 
494                     } finally {
495                         receiverCleanup();
496                         end((n > 0) || (n == IOStatus.UNAVAILABLE));
497                         assert IOStatus.check(n);
498                     }
499 
500                     if (!resultContainer.isNotification()) {
501                         /* message or nothing */
502                         if (resultContainer.hasSomething()) {
503                             /* Set the association before returning */
504                             MessageInfoImpl info =
505                                     resultContainer.getMessageInfo();
506                             info.setAssociation(lookupAssociation(info.
507                                     associationID()));
508 
509                             assert info.association() != null;
510                             return info;
511                         } else  {
512                           /* Non-blocking may return null if nothing available*/
513                             return null;
514                         }
515                     } else { /* notification */
516                         synchronized (stateLock) {
517                             handleNotificationInternal(
518                                     resultContainer);
519                         }
520                     }
521                 } /* receiveLock */
522             } while (handler == null ? true :
523                 (invokeNotificationHandler(resultContainer, handler, attachment)
524                  == HandlerResult.CONTINUE));
525         } finally {
526             receiveInvoked.set(Boolean.FALSE);
527         }
528 
529         return null;
530     }
531 
532     private int receive(int fd,
533                         ByteBuffer dst,
534                         ResultContainer resultContainer)
535             throws IOException {
536         int pos = dst.position();
537         int lim = dst.limit();
538         assert (pos <= lim);
539         int rem = (pos <= lim ? lim - pos : 0);
540         if (dst instanceof DirectBuffer && rem > 0)
541             return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos);
542 
543         /* Substitute a native buffer. */
544         int newSize = Math.max(rem, 1);
545         ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize);
546         try {
547             int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0);
548             bb.flip();
549             if (n > 0 && rem > 0)
550                 dst.put(bb);
551             return n;
552         } finally {
553             Util.releaseTemporaryDirectBuffer(bb);
554         }
555     }
556 
557     private int receiveIntoNativeBuffer(int fd,
558                                         ResultContainer resultContainer,
559                                         ByteBuffer bb,
560                                         int rem,
561                                         int pos)
562             throws IOException {
563         NIO_ACCESS.acquireSession(bb);
564         try {
565             int n = receive0(fd, resultContainer, NIO_ACCESS.getBufferAddress(bb) + pos, rem);
566             if (n > 0)
567                 bb.position(pos + n);
568             return n;
569         } finally {
570             NIO_ACCESS.releaseSession(bb);
571         }
572     }
573 
574     private final InternalNotificationHandler internalNotificationHandler =
575             new InternalNotificationHandler();
576 
577     private void handleNotificationInternal(ResultContainer resultContainer)
578     {
579         invokeNotificationHandler(resultContainer,
580                 internalNotificationHandler, null);
581     }
582 
583     private final class InternalNotificationHandler
584             extends AbstractNotificationHandler<Object>
585     {
586         @Override
587         public HandlerResult handleNotification(AssociationChangeNotification not,
588                                                 Object unused) {
589             AssociationChange sac = (AssociationChange) not;
590 
591             /* Update map to reflect change in association */
592             switch (not.event()) {
593                 case COMM_UP -> {
594                     Association newAssociation = new AssociationImpl
595                             (sac.assocId(), sac.maxInStreams(), sac.maxOutStreams());
596                     addAssociation(newAssociation);
597                 }
598                 case SHUTDOWN, COMM_LOST ->
599                     //case RESTART: ???
600                     /* mark association for removal after user handler invoked*/
601                         associationToRemove.set(lookupAssociation(sac.assocId()));
602             }
603             return HandlerResult.CONTINUE;
604         }
605     }
606 
607     private <T> HandlerResult invokeNotificationHandler(ResultContainer resultContainer,
608                                                         NotificationHandler<T> handler,
609                                                         T attachment) {
610         HandlerResult result;
611         SctpNotification notification = resultContainer.notification();
612         notification.setAssociation(lookupAssociation(notification.assocId()));
613 
614         if (!(handler instanceof AbstractNotificationHandler<T> absHandler)) {
615             result = handler.handleNotification(notification, attachment);
616         } else { /* AbstractNotificationHandler */
617             result = switch (resultContainer.type()) {
618                 case ASSOCIATION_CHANGED  -> absHandler.handleNotification(
619                         resultContainer.getAssociationChanged(), attachment);
620                 case PEER_ADDRESS_CHANGED -> absHandler.handleNotification(
621                         resultContainer.getPeerAddressChanged(), attachment);
622                 case SEND_FAILED          -> absHandler.handleNotification(
623                         resultContainer.getSendFailed(), attachment);
624                 case SHUTDOWN             -> absHandler.handleNotification(
625                         resultContainer.getShutdown(), attachment);
626                 /* implementation specific handlers */
627                 default                   -> absHandler.handleNotification(
628                         resultContainer.notification(), attachment);
629             };
630         }
631 
632         if (!(handler instanceof InternalNotificationHandler)) {
633             /* Only remove associations after user handler
634              * has finished with them */
635             Association assoc = associationToRemove.get();
636             if (assoc != null) {
637                 removeAssociation(assoc);
638                 associationToRemove.set(null);
639             }
640 
641         }
642 
643         return result;
644     }
645 
646     private Association lookupAssociation(int assocId) {
647         /* Lookup the association in our internal map */
648         synchronized (stateLock) {
649             Set<Association> assocs = associationMap.keySet();
650             for (Association a : assocs) {
651                 if (a.associationID() == assocId) {
652                     return a;
653                 }
654             }
655         }
656         return null;
657     }
658 
659     private void addAssociation(Association association) {
660         synchronized (stateLock) {
661             int assocId = association.associationID();
662             Set<SocketAddress> addresses = null;
663 
664             try {
665                 addresses = SctpNet.getRemoteAddresses(fdVal, assocId);
666             } catch (IOException unused) {
667                 /* OK, determining connected addresses may not be possible
668                  * shutdown, connection lost, etc */
669             }
670 
671             associationMap.put(association, addresses);
672             if (addresses != null) {
673                 for (SocketAddress addr : addresses)
674                     addressMap.put(addr, association);
675             }
676         }
677     }
678 
679     private void removeAssociation(Association association) {
680         synchronized (stateLock) {
681             int assocId = association.associationID();
682             Set<SocketAddress> addresses = null;
683 
684              try {
685                 addresses = SctpNet.getRemoteAddresses(fdVal, assocId);
686             } catch (IOException unused) {
687                 /* OK, determining connected addresses may not be possible
688                  * shutdown, connection lost, etc */
689             }
690 
691             Set<Association> assocs = associationMap.keySet();
692             for (Association a : assocs) {
693                 if (a.associationID() == assocId) {
694                     associationMap.remove(a);
695                     break;
696                 }
697             }
698             if (addresses != null) {
699                 for (SocketAddress addr : addresses)
700                     addressMap.remove(addr);
701             } else {
702                 /* We cannot determine the connected addresses */
703                 Set<java.util.Map.Entry<SocketAddress, Association>> addrAssocs =
704                         addressMap.entrySet();
705                 Iterator<Entry<SocketAddress, Association>> iterator = addrAssocs.iterator();
706                 while (iterator.hasNext()) {
707                     Entry<SocketAddress, Association> entry = iterator.next();
708                     if (entry.getValue().equals(association)) {
709                         iterator.remove();
710                     }
711                 }
712             }
713         }
714     }
715 
716     /**
717      * Checks if the given association is controlled by this channel.
718      *
719      * @throws IllegalArgumentException If the given association is not controlled by this channel
720      */
721     private void checkAssociation(Association messageAssoc) {
722         synchronized (stateLock) {
723             for (Association association : associationMap.keySet()) {
724                 if (messageAssoc.equals(association)) {
725                     return;
726                 }
727             }
728         }
729         throw new IllegalArgumentException(
730               "Given Association is not controlled by this channel");
731     }
732 
733     private void checkStreamNumber(Association assoc, int streamNumber) {
734         synchronized (stateLock) {
735             if (streamNumber < 0 || streamNumber >= assoc.maxOutboundStreams())
736                 throw new InvalidStreamException();
737         }
738     }
739 
740     /* TODO: Add support for ttl and isComplete to both 121 12M
741      *       SCTP_EOR not yet supported on reference platforms
742      *       TTL support limited...
743      */
744     @Override
745     public int send(ByteBuffer buffer, MessageInfo messageInfo)
746             throws IOException {
747         if (buffer == null)
748             throw new IllegalArgumentException("buffer cannot be null");
749 
750         if (messageInfo == null)
751             throw new IllegalArgumentException("messageInfo cannot be null");
752 
753         synchronized (sendLock) {
754             ensureOpen();
755 
756             if (!isBound())
757                 bind(null, 0);
758 
759             int n = 0;
760             try {
761                 int assocId = -1;
762                 SocketAddress address = null;
763                 begin();
764 
765                 synchronized (stateLock) {
766                     if(!isOpen())
767                         return 0;
768                     senderThread = NativeThread.current();
769 
770                     /* Determine what address or association to send to */
771                     Association assoc = messageInfo.association();
772                     InetSocketAddress addr = (InetSocketAddress)messageInfo.address();
773                     if (assoc != null) {
774                         checkAssociation(assoc);
775                         checkStreamNumber(assoc, messageInfo.streamNumber());
776                         assocId = assoc.associationID();
777                         /* have we also got a preferred address */
778                         if (addr != null) {
779                             if (!assoc.equals(addressMap.get(addr)))
780                                 throw new IllegalArgumentException("given preferred address is not part of this association");
781                             address = addr;
782                         }
783                     } else if (addr != null) {
784                         address = addr;
785                         Association association = addressMap.get(addr);
786                         if (association != null) {
787                             checkStreamNumber(association, messageInfo.streamNumber());
788                             assocId = association.associationID();
789 
790                         }
791                     } else {
792                         throw new AssertionError(
793                             "Both association and address cannot be null");
794                     }
795                 }
796 
797                 do {
798                     n = send(fdVal, buffer, assocId, address, messageInfo);
799                 } while ((n == IOStatus.INTERRUPTED) && isOpen());
800 
801                 return IOStatus.normalize(n);
802             } finally {
803                 senderCleanup();
804                 end((n > 0) || (n == IOStatus.UNAVAILABLE));
805                 assert IOStatus.check(n);
806             }
807         }
808     }
809 
810     private int send(int fd,
811                      ByteBuffer src,
812                      int assocId,
813                      SocketAddress target,
814                      MessageInfo messageInfo)
815             throws IOException {
816         int streamNumber = messageInfo.streamNumber();
817         boolean unordered = messageInfo.isUnordered();
818         int ppid = messageInfo.payloadProtocolID();
819 
820         if (src instanceof DirectBuffer)
821             return sendFromNativeBuffer(fd, src, target, assocId,
822                     streamNumber, unordered, ppid);
823 
824         /* Substitute a native buffer */
825         int pos = src.position();
826         int lim = src.limit();
827         assert (pos <= lim && streamNumber >= 0);
828 
829         int rem = (pos <= lim ? lim - pos : 0);
830         ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
831         try {
832             bb.put(src);
833             bb.flip();
834             /* Do not update src until we see how many bytes were written */
835             src.position(pos);
836 
837             int n = sendFromNativeBuffer(fd, bb, target, assocId,
838                     streamNumber, unordered, ppid);
839             if (n > 0) {
840                 /* now update src */
841                 src.position(pos + n);
842             }
843             return n;
844         } finally {
845             Util.releaseTemporaryDirectBuffer(bb);
846         }
847     }
848 
849     private int sendFromNativeBuffer(int fd,
850                                      ByteBuffer bb,
851                                      SocketAddress target,
852                                      int assocId,
853                                      int streamNumber,
854                                      boolean unordered,
855                                      int ppid)
856             throws IOException {
857         InetAddress addr = null;     // no preferred address
858         int port = 0;
859         if (target != null) {
860             InetSocketAddress isa = Net.checkAddress(target);
861             addr = isa.getAddress();
862             if (addr.isLinkLocalAddress()) {
863                 addr = IPAddressUtil.toScopedAddress(addr);
864             }
865             port = isa.getPort();
866         }
867         int pos = bb.position();
868         int lim = bb.limit();
869         assert (pos <= lim);
870         int rem = (pos <= lim ? lim - pos : 0);
871 
872         NIO_ACCESS.acquireSession(bb);
873         try {
874             int written = send0(fd, NIO_ACCESS.getBufferAddress(bb) + pos, rem, addr,
875                     port, assocId, streamNumber, unordered, ppid);
876             if (written > 0)
877                 bb.position(pos + written);
878             return written;
879         } finally {
880             NIO_ACCESS.releaseSession(bb);
881         }
882     }
883 
884     @Override
885     public SctpMultiChannel shutdown(Association association)
886             throws IOException {
887         synchronized (stateLock) {
888             checkAssociation(association);
889             if (!isOpen())
890                 throw new ClosedChannelException();
891 
892             SctpNet.shutdown(fdVal, association.associationID());
893         }
894         return this;
895     }
896 
897     @Override
898     public Set<SocketAddress> getAllLocalAddresses()
899             throws IOException {
900         synchronized (stateLock) {
901             if (!isOpen())
902                 throw new ClosedChannelException();
903             if (!isBound())
904                 return Collections.emptySet();
905 
906             return SctpNet.getLocalAddresses(fdVal);
907         }
908     }
909 
910     @Override
911     public Set<SocketAddress> getRemoteAddresses(Association association)
912             throws IOException {
913         synchronized (stateLock) {
914             checkAssociation(association);
915             if (!isOpen())
916                 throw new ClosedChannelException();
917 
918             try {
919                 return SctpNet.getRemoteAddresses(fdVal, association.associationID());
920             } catch (SocketException se) {
921                 /* a valid association should always have remote addresses */
922                 return associationMap.getOrDefault(association, Collections.emptySet());
923             }
924         }
925     }
926 
927     @Override
928     public SctpChannel branch(Association association)
929             throws IOException {
930         synchronized (stateLock) {
931             checkAssociation(association);
932             if (!isOpen())
933                 throw new ClosedChannelException();
934 
935             FileDescriptor bFd = SctpNet.branch(fdVal,
936                                                 association.associationID());
937             /* successfully branched, we can now remove it from assoc list */
938             removeAssociation(association);
939 
940             return new SctpChannelImpl(provider(), bFd, association);
941         }
942     }
943 
944     /* Use common native implementation shared between
945      * one-to-one and one-to-many */
946     private static int receive0(int fd,
947                                 ResultContainer resultContainer,
948                                 long address,
949                                 int length)
950             throws IOException{
951         return SctpChannelImpl.receive0(fd, resultContainer, address,
952                 length, false /*peek */);
953     }
954 
955     private static int send0(int fd,
956                              long address,
957                              int length,
958                              InetAddress addr,
959                              int port,
960                              int assocId,
961                              int streamNumber,
962                              boolean unordered,
963                              int ppid)
964             throws IOException {
965         return SctpChannelImpl.send0(fd, address, length, addr, port, assocId,
966                 streamNumber, unordered, ppid);
967     }
968 }