1 /*
2 * Copyright (c) 2009, 2025, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package sun.nio.ch.sctp;
27
28 import java.net.InetAddress;
29 import java.net.SocketAddress;
30 import java.net.SocketException;
31 import java.net.InetSocketAddress;
32 import java.io.FileDescriptor;
33 import java.io.IOException;
34 import java.util.Collections;
35 import java.util.Set;
36 import java.util.HashSet;
37 import java.nio.ByteBuffer;
38 import java.nio.channels.SelectionKey;
39 import java.nio.channels.ClosedChannelException;
40 import java.nio.channels.ConnectionPendingException;
41 import java.nio.channels.NoConnectionPendingException;
42 import java.nio.channels.AlreadyConnectedException;
43 import java.nio.channels.NotYetBoundException;
44 import java.nio.channels.NotYetConnectedException;
45 import java.nio.channels.spi.SelectorProvider;
46 import com.sun.nio.sctp.AbstractNotificationHandler;
47 import com.sun.nio.sctp.Association;
48 import com.sun.nio.sctp.AssociationChangeNotification;
49 import com.sun.nio.sctp.HandlerResult;
50 import com.sun.nio.sctp.IllegalReceiveException;
51 import com.sun.nio.sctp.InvalidStreamException;
52 import com.sun.nio.sctp.IllegalUnbindException;
53 import com.sun.nio.sctp.MessageInfo;
54 import com.sun.nio.sctp.NotificationHandler;
55 import com.sun.nio.sctp.SctpChannel;
56 import com.sun.nio.sctp.SctpSocketOption;
57 import jdk.internal.access.JavaNioAccess;
58 import jdk.internal.access.SharedSecrets;
59 import sun.net.util.IPAddressUtil;
60 import sun.nio.ch.DirectBuffer;
61 import sun.nio.ch.IOStatus;
62 import sun.nio.ch.IOUtil;
63 import sun.nio.ch.NativeThread;
64 import sun.nio.ch.Net;
65 import sun.nio.ch.SelChImpl;
66 import sun.nio.ch.SelectionKeyImpl;
67 import sun.nio.ch.Util;
68 import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
69 import static sun.nio.ch.sctp.ResultContainer.SEND_FAILED;
70 import static sun.nio.ch.sctp.ResultContainer.ASSOCIATION_CHANGED;
71 import static sun.nio.ch.sctp.ResultContainer.PEER_ADDRESS_CHANGED;
72 import static sun.nio.ch.sctp.ResultContainer.SHUTDOWN;
73
74 /**
75 * An implementation of an SctpChannel
76 */
77 public class SctpChannelImpl extends SctpChannel
78 implements SelChImpl
79 {
80
81 private static final JavaNioAccess NIO_ACCESS = SharedSecrets.getJavaNioAccess();
82
83 private final FileDescriptor fd;
84
85 private final int fdVal;
86
87 /* IDs of native threads doing send and receive, for signalling */
88 private volatile long receiverThread;
89 private volatile long senderThread;
90
91 /* Lock held by current receiving or connecting thread */
92 private final Object receiveLock = new Object();
93
94 /* Lock held by current sending or connecting thread */
95 private final Object sendLock = new Object();
96
97 private final ThreadLocal<Boolean> receiveInvoked =
98 new ThreadLocal<>() {
99 @Override protected Boolean initialValue() {
100 return Boolean.FALSE;
101 }
102 };
103
104 /* Lock held by any thread that modifies the state fields declared below
105 DO NOT invoke a blocking I/O operation while holding this lock! */
106 private final Object stateLock = new Object();
107
108 private enum ChannelState {
109 UNINITIALIZED,
110 UNCONNECTED,
111 PENDING,
112 CONNECTED,
113 KILLPENDING,
114 KILLED,
115 }
116 /* -- The following fields are protected by stateLock -- */
117 private ChannelState state;
118
119 /* Binding; Once bound the port will remain constant. */
120 int port = -1;
121 private final Set<InetSocketAddress> localAddresses = new HashSet<>();
122 /* Has the channel been bound to the wildcard address */
123 private boolean wildcard; /* false */
124 //private InetSocketAddress remoteAddress = null;
125
126 /* Input/Output open */
127 private boolean readyToConnect;
128
129 /* Shutdown */
130 private boolean isShutdown;
131
132 private Association association;
133
134 private Set<SocketAddress> remoteAddresses = Collections.emptySet();
135
136 /* -- End of fields protected by stateLock -- */
137
138 /**
139 * Constructor for normal connecting sockets
140 */
141 public SctpChannelImpl(SelectorProvider provider) throws IOException {
142 //TODO: update provider remove public modifier
143 super(provider);
144 this.fd = SctpNet.socket(true);
145 this.fdVal = IOUtil.fdVal(fd);
146 this.state = ChannelState.UNCONNECTED;
147 }
148
149 /**
150 * Constructor for sockets obtained from server sockets
151 */
152 public SctpChannelImpl(SelectorProvider provider, FileDescriptor fd)
153 throws IOException {
154 this(provider, fd, null);
155 }
156
157 /**
158 * Constructor for sockets obtained from branching
159 */
160 public SctpChannelImpl(SelectorProvider provider,
161 FileDescriptor fd,
162 Association association)
163 throws IOException {
164 super(provider);
165 this.fd = fd;
166 this.fdVal = IOUtil.fdVal(fd);
167 this.state = ChannelState.CONNECTED;
168 port = (Net.localAddress(fd)).getPort();
169
170 if (association != null) { /* branched */
171 this.association = association;
172 } else { /* obtained from server channel */
173 /* Receive COMM_UP */
174 ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
175 try {
176 receive(buf, null, null, true);
177 } finally {
178 Util.releaseTemporaryDirectBuffer(buf);
179 }
180 }
181 }
182
183 /**
184 * Binds the channel's socket to a local address.
185 */
186 @Override
187 public SctpChannel bind(SocketAddress local) throws IOException {
188 synchronized (receiveLock) {
189 synchronized (sendLock) {
190 synchronized (stateLock) {
191 ensureOpenAndUnconnected();
192 if (isBound())
193 SctpNet.throwAlreadyBoundException();
194 InetSocketAddress isa = (local == null) ?
195 new InetSocketAddress(0) : Net.checkAddress(local);
196 Net.bind(fd, isa.getAddress(), isa.getPort());
197 InetSocketAddress boundIsa = Net.localAddress(fd);
198 port = boundIsa.getPort();
199 localAddresses.add(isa);
200 if (isa.getAddress().isAnyLocalAddress())
201 wildcard = true;
202 }
203 }
204 }
205 return this;
206 }
207
208 @Override
209 public SctpChannel bindAddress(InetAddress address)
210 throws IOException {
211 bindUnbindAddress(address, true);
212 localAddresses.add(new InetSocketAddress(address, port));
213 return this;
214 }
215
216 @Override
217 public SctpChannel unbindAddress(InetAddress address)
218 throws IOException {
219 bindUnbindAddress(address, false);
220 localAddresses.remove(new InetSocketAddress(address, port));
221 return this;
222 }
223
224 private void bindUnbindAddress(InetAddress address, boolean add)
225 throws IOException {
226 if (address == null)
227 throw new IllegalArgumentException();
228
229 synchronized (receiveLock) {
230 synchronized (sendLock) {
231 synchronized (stateLock) {
232 if (!isOpen())
233 throw new ClosedChannelException();
234 if (!isBound())
235 throw new NotYetBoundException();
236 if (wildcard)
237 throw new IllegalStateException(
238 "Cannot add or remove addresses from a channel that is bound to the wildcard address");
239 if (address.isAnyLocalAddress())
240 throw new IllegalArgumentException(
241 "Cannot add or remove the wildcard address");
242 if (add) {
243 for (InetSocketAddress addr : localAddresses) {
244 if (addr.getAddress().equals(address)) {
245 SctpNet.throwAlreadyBoundException();
246 }
247 }
248 } else { /*removing */
249 /* Verify that there is more than one address
250 * and that address is already bound */
251 if (localAddresses.size() <= 1)
252 throw new IllegalUnbindException("Cannot remove address from a channel with only one address bound");
253 boolean foundAddress = false;
254 for (InetSocketAddress addr : localAddresses) {
255 if (addr.getAddress().equals(address)) {
256 foundAddress = true;
257 break;
258 }
259 }
260 if (!foundAddress )
261 throw new IllegalUnbindException("Cannot remove address from a channel that is not bound to that address");
262 }
263
264 SctpNet.bindx(fdVal, new InetAddress[]{address}, port, add);
265
266 /* Update our internal Set to reflect the addition/removal */
267 if (add)
268 localAddresses.add(new InetSocketAddress(address, port));
269 else {
270 for (InetSocketAddress addr : localAddresses) {
271 if (addr.getAddress().equals(address)) {
272 localAddresses.remove(addr);
273 break;
274 }
275 }
276 }
277 }
278 }
279 }
280 }
281
282 private boolean isBound() {
283 synchronized (stateLock) {
284 return port != -1;
285 }
286 }
287
288 private boolean isConnected() {
289 synchronized (stateLock) {
290 return (state == ChannelState.CONNECTED);
291 }
292 }
293
294 private void ensureOpenAndUnconnected() throws IOException {
295 synchronized (stateLock) {
296 if (!isOpen())
297 throw new ClosedChannelException();
298 if (isConnected())
299 throw new AlreadyConnectedException();
300 if (state == ChannelState.PENDING)
301 throw new ConnectionPendingException();
302 }
303 }
304
305 private boolean ensureReceiveOpen() throws ClosedChannelException {
306 synchronized (stateLock) {
307 if (!isOpen())
308 throw new ClosedChannelException();
309 if (!isConnected())
310 throw new NotYetConnectedException();
311 else
312 return true;
313 }
314 }
315
316 private void ensureSendOpen() throws ClosedChannelException {
317 synchronized (stateLock) {
318 if (!isOpen())
319 throw new ClosedChannelException();
320 if (isShutdown)
321 throw new ClosedChannelException();
322 if (!isConnected())
323 throw new NotYetConnectedException();
324 }
325 }
326
327 private void receiverCleanup() throws IOException {
328 synchronized (stateLock) {
329 receiverThread = 0;
330 if (state == ChannelState.KILLPENDING)
331 kill();
332 }
333 }
334
335 private void senderCleanup() throws IOException {
336 synchronized (stateLock) {
337 senderThread = 0;
338 if (state == ChannelState.KILLPENDING)
339 kill();
340 }
341 }
342
343 @Override
344 public Association association() throws ClosedChannelException {
345 synchronized (stateLock) {
346 if (!isOpen())
347 throw new ClosedChannelException();
348 if (!isConnected())
349 return null;
350
351 return association;
352 }
353 }
354
355 @Override
356 public boolean connect(SocketAddress endpoint) throws IOException {
357 synchronized (receiveLock) {
358 synchronized (sendLock) {
359 ensureOpenAndUnconnected();
360 InetSocketAddress isa = Net.checkAddress(endpoint);
361 synchronized (blockingLock()) {
362 int n = 0;
363 try {
364 try {
365 begin();
366 synchronized (stateLock) {
367 if (!isOpen()) {
368 return false;
369 }
370 receiverThread = NativeThread.current();
371 }
372 for (;;) {
373 InetAddress ia = isa.getAddress();
374 if (ia.isAnyLocalAddress())
375 ia = InetAddress.getLocalHost();
376 n = SctpNet.connect(fdVal, ia, isa.getPort());
377 if ( (n == IOStatus.INTERRUPTED)
378 && isOpen())
379 continue;
380 break;
381 }
382 } finally {
383 receiverCleanup();
384 end((n > 0) || (n == IOStatus.UNAVAILABLE));
385 assert IOStatus.check(n);
386 }
387 } catch (IOException x) {
388 /* If an exception was thrown, close the channel after
389 * invoking end() so as to avoid bogus
390 * AsynchronousCloseExceptions */
391 close();
392 throw x;
393 }
394
395 if (n > 0) {
396 synchronized (stateLock) {
397 /* Connection succeeded */
398 state = ChannelState.CONNECTED;
399 if (!isBound()) {
400 InetSocketAddress boundIsa =
401 Net.localAddress(fd);
402 port = boundIsa.getPort();
403 }
404
405 /* Receive COMM_UP */
406 ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
407 try {
408 receive(buf, null, null, true);
409 } finally {
410 Util.releaseTemporaryDirectBuffer(buf);
411 }
412
413 /* cache remote addresses */
414 try {
415 remoteAddresses = getRemoteAddresses();
416 } catch (IOException unused) { /* swallow exception */ }
417
418 return true;
419 }
420 } else {
421 synchronized (stateLock) {
422 /* If nonblocking and no exception then connection
423 * pending; disallow another invocation */
424 if (!isBlocking())
425 state = ChannelState.PENDING;
426 else
427 assert false;
428 }
429 }
430 }
431 return false;
432 }
433 }
434 }
435
436 @Override
437 public boolean connect(SocketAddress endpoint,
438 int maxOutStreams,
439 int maxInStreams)
440 throws IOException {
441 ensureOpenAndUnconnected();
442 return setOption(SCTP_INIT_MAXSTREAMS, InitMaxStreams.
443 create(maxInStreams, maxOutStreams)).connect(endpoint);
444
445 }
446
447 @Override
448 public boolean isConnectionPending() {
449 synchronized (stateLock) {
450 return (state == ChannelState.PENDING);
451 }
452 }
453
454 @Override
455 public boolean finishConnect() throws IOException {
456 synchronized (receiveLock) {
457 synchronized (sendLock) {
458 synchronized (stateLock) {
459 if (!isOpen())
460 throw new ClosedChannelException();
461 if (isConnected())
462 return true;
463 if (state != ChannelState.PENDING)
464 throw new NoConnectionPendingException();
465 }
466 boolean connected = false;
467 try {
468 try {
469 begin();
470 synchronized (blockingLock()) {
471 synchronized (stateLock) {
472 if (!isOpen()) {
473 return false;
474 }
475 receiverThread = NativeThread.current();
476 }
477 if (!isBlocking()) {
478 connected = Net.pollConnect(fd, 0);
479 } else {
480 do {
481 connected = Net.pollConnect(fd, -1);
482 } while (!connected && isOpen());
483 }
484 }
485 } finally {
486 synchronized (stateLock) {
487 receiverThread = 0;
488 if (state == ChannelState.KILLPENDING) {
489 kill();
490 connected = false;
491 }
492 }
493 end(connected);
494 }
495 } catch (IOException x) {
496 /* If an exception was thrown, close the channel after
497 * invoking end() so as to avoid bogus
498 * AsynchronousCloseExceptions */
499 close();
500 throw x;
501 }
502
503 if (connected) {
504 synchronized (stateLock) {
505 state = ChannelState.CONNECTED;
506 if (!isBound()) {
507 InetSocketAddress boundIsa =
508 Net.localAddress(fd);
509 port = boundIsa.getPort();
510 }
511
512 /* Receive COMM_UP */
513 ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
514 try {
515 receive(buf, null, null, true);
516 } finally {
517 Util.releaseTemporaryDirectBuffer(buf);
518 }
519
520 /* cache remote addresses */
521 try {
522 remoteAddresses = getRemoteAddresses();
523 } catch (IOException unused) { /* swallow exception */ }
524
525 return true;
526 }
527 }
528 }
529 }
530 return false;
531 }
532
533 @Override
534 protected void implConfigureBlocking(boolean block) throws IOException {
535 IOUtil.configureBlocking(fd, block);
536 }
537
538 @Override
539 public void implCloseSelectableChannel() throws IOException {
540 synchronized (stateLock) {
541 if (state != ChannelState.KILLED)
542 SctpNet.preClose(fdVal);
543
544 if (receiverThread != 0)
545 NativeThread.signal(receiverThread);
546
547 if (senderThread != 0)
548 NativeThread.signal(senderThread);
549
550 if (!isRegistered())
551 kill();
552 }
553 }
554
555 @Override
556 public FileDescriptor getFD() {
557 return fd;
558 }
559
560 @Override
561 public int getFDVal() {
562 return fdVal;
563 }
564
565 /**
566 * Translates native poll revent ops into a ready operation ops
567 */
568 private boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) {
569 int intOps = sk.nioInterestOps();
570 int oldOps = sk.nioReadyOps();
571 int newOps = initialOps;
572
573 if ((ops & Net.POLLNVAL) != 0) {
574 /* This should only happen if this channel is pre-closed while a
575 * selection operation is in progress
576 * ## Throw an error if this channel has not been pre-closed */
577 return false;
578 }
579
580 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
581 newOps = intOps;
582 sk.nioReadyOps(newOps);
583 /* No need to poll again in checkConnect,
584 * the error will be detected there */
585 readyToConnect = true;
586 return (newOps & ~oldOps) != 0;
587 }
588
589 if (((ops & Net.POLLIN) != 0) &&
590 ((intOps & SelectionKey.OP_READ) != 0) &&
591 isConnected())
592 newOps |= SelectionKey.OP_READ;
593
594 if (((ops & Net.POLLCONN) != 0) &&
595 ((intOps & SelectionKey.OP_CONNECT) != 0) &&
596 ((state == ChannelState.UNCONNECTED) || (state == ChannelState.PENDING))) {
597 newOps |= SelectionKey.OP_CONNECT;
598 readyToConnect = true;
599 }
600
601 if (((ops & Net.POLLOUT) != 0) &&
602 ((intOps & SelectionKey.OP_WRITE) != 0) &&
603 isConnected())
604 newOps |= SelectionKey.OP_WRITE;
605
606 sk.nioReadyOps(newOps);
607 return (newOps & ~oldOps) != 0;
608 }
609
610 @Override
611 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
612 return translateReadyOps(ops, sk.nioReadyOps(), sk);
613 }
614
615 @Override
616 @SuppressWarnings("all")
617 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
618 return translateReadyOps(ops, 0, sk);
619 }
620
621 @Override
622 public int translateInterestOps(int ops) {
623 int newOps = 0;
624 if ((ops & SelectionKey.OP_READ) != 0)
625 newOps |= Net.POLLIN;
626 if ((ops & SelectionKey.OP_WRITE) != 0)
627 newOps |= Net.POLLOUT;
628 if ((ops & SelectionKey.OP_CONNECT) != 0)
629 newOps |= Net.POLLCONN;
630 return newOps;
631 }
632
633 @Override
634 public void kill() throws IOException {
635 synchronized (stateLock) {
636 if (state == ChannelState.KILLED)
637 return;
638 if (state == ChannelState.UNINITIALIZED) {
639 state = ChannelState.KILLED;
640 SctpNet.close(fdVal);
641 return;
642 }
643 assert !isOpen() && !isRegistered();
644
645 /* Postpone the kill if there is a waiting reader
646 * or writer thread. */
647 if (receiverThread == 0 && senderThread == 0) {
648 state = ChannelState.KILLED;
649 SctpNet.close(fdVal);
650 } else {
651 state = ChannelState.KILLPENDING;
652 }
653 }
654 }
655
656 @Override
657 public <T> SctpChannel setOption(SctpSocketOption<T> name, T value)
658 throws IOException {
659 if (name == null)
660 throw new NullPointerException();
661 if (!supportedOptions().contains(name))
662 throw new UnsupportedOperationException("'" + name + "' not supported");
663
664 synchronized (stateLock) {
665 if (!isOpen())
666 throw new ClosedChannelException();
667
668 SctpNet.setSocketOption(fdVal, name, value, 0 /*oneToOne*/);
669 }
670 return this;
671 }
672
673 @Override
674 @SuppressWarnings("unchecked")
675 public <T> T getOption(SctpSocketOption<T> name) throws IOException {
676 if (name == null)
677 throw new NullPointerException();
678 if (!supportedOptions().contains(name))
679 throw new UnsupportedOperationException("'" + name + "' not supported");
680
681 synchronized (stateLock) {
682 if (!isOpen())
683 throw new ClosedChannelException();
684
685 return (T)SctpNet.getSocketOption(fdVal, name, 0 /*oneToOne*/);
686 }
687 }
688
689 @Override
690 public final Set<SctpSocketOption<?>> supportedOptions() {
691 final class Holder {
692 static final Set<SctpSocketOption<?>> DEFAULT_OPTIONS = Set.of(
693 SCTP_DISABLE_FRAGMENTS,
694 SCTP_EXPLICIT_COMPLETE,
695 SCTP_FRAGMENT_INTERLEAVE,
696 SCTP_INIT_MAXSTREAMS,
697 SCTP_NODELAY,
698 SCTP_PRIMARY_ADDR,
699 SCTP_SET_PEER_PRIMARY_ADDR,
700 SO_SNDBUF,
701 SO_RCVBUF,
702 SO_LINGER);
703 }
704 return Holder.DEFAULT_OPTIONS;
705 }
706
707 @Override
708 public <T> MessageInfo receive(ByteBuffer buffer,
709 T attachment,
710 NotificationHandler<T> handler)
711 throws IOException {
712 return receive(buffer, attachment, handler, false);
713 }
714
715 private <T> MessageInfo receive(ByteBuffer buffer,
716 T attachment,
717 NotificationHandler<T> handler,
718 boolean fromConnect)
719 throws IOException {
720 if (buffer == null)
721 throw new IllegalArgumentException("buffer cannot be null");
722
723 if (buffer.isReadOnly())
724 throw new IllegalArgumentException("Read-only buffer");
725
726 if (receiveInvoked.get())
727 throw new IllegalReceiveException(
728 "cannot invoke receive from handler");
729 receiveInvoked.set(Boolean.TRUE);
730
731 try {
732 ResultContainer resultContainer = new ResultContainer();
733 do {
734 resultContainer.clear();
735 synchronized (receiveLock) {
736 if (!ensureReceiveOpen())
737 return null;
738
739 int n = 0;
740 try {
741 begin();
742
743 synchronized (stateLock) {
744 if(!isOpen())
745 return null;
746 receiverThread = NativeThread.current();
747 }
748
749 do {
750 n = receive(fdVal, buffer, resultContainer, fromConnect);
751 } while ((n == IOStatus.INTERRUPTED) && isOpen());
752 } finally {
753 receiverCleanup();
754 end((n > 0) || (n == IOStatus.UNAVAILABLE));
755 assert IOStatus.check(n);
756 }
757
758 if (!resultContainer.isNotification()) {
759 /* message or nothing */
760 if (resultContainer.hasSomething()) {
761 /* Set the association before returning */
762 MessageInfoImpl info =
763 resultContainer.getMessageInfo();
764 synchronized (stateLock) {
765 assert association != null;
766 info.setAssociation(association);
767 }
768 return info;
769 } else
770 /* Non-blocking may return null if nothing available*/
771 return null;
772 } else { /* notification */
773 synchronized (stateLock) {
774 handleNotificationInternal(
775 resultContainer);
776 }
777 }
778
779 if (fromConnect) {
780 /* If we reach here, then it was connect that invoked
781 * receive and received the COMM_UP. We have already
782 * handled the COMM_UP with the internal notification
783 * handler. Simply return. */
784 return null;
785 }
786 } /* receiveLock */
787 } while (handler == null ? true :
788 (invokeNotificationHandler(resultContainer, handler, attachment)
789 == HandlerResult.CONTINUE));
790
791 return null;
792 } finally {
793 receiveInvoked.set(Boolean.FALSE);
794 }
795 }
796
797 private int receive(int fd,
798 ByteBuffer dst,
799 ResultContainer resultContainer,
800 boolean peek)
801 throws IOException {
802 int pos = dst.position();
803 int lim = dst.limit();
804 assert (pos <= lim);
805 int rem = (pos <= lim ? lim - pos : 0);
806 if (dst instanceof DirectBuffer && rem > 0)
807 return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos, peek);
808
809 /* Substitute a native buffer */
810 int newSize = Math.max(rem, 1);
811 ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize);
812 try {
813 int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0, peek);
814 bb.flip();
815 if (n > 0 && rem > 0)
816 dst.put(bb);
817 return n;
818 } finally {
819 Util.releaseTemporaryDirectBuffer(bb);
820 }
821 }
822
823 private int receiveIntoNativeBuffer(int fd,
824 ResultContainer resultContainer,
825 ByteBuffer bb,
826 int rem,
827 int pos,
828 boolean peek)
829 throws IOException
830 {
831 NIO_ACCESS.acquireSession(bb);
832 try {
833 int n = receive0(fd, resultContainer, NIO_ACCESS.getBufferAddress(bb) + pos, rem, peek);
834
835 if (n > 0)
836 bb.position(pos + n);
837 return n;
838 } finally {
839 NIO_ACCESS.releaseSession(bb);
840 }
841 }
842
843 private final InternalNotificationHandler internalNotificationHandler =
844 new InternalNotificationHandler();
845
846 private void handleNotificationInternal(ResultContainer resultContainer) {
847 invokeNotificationHandler(resultContainer,
848 internalNotificationHandler, null);
849 }
850
851 private final class InternalNotificationHandler
852 extends AbstractNotificationHandler<Object> {
853 @Override
854 public HandlerResult handleNotification(AssociationChangeNotification not,
855 Object unused) {
856 if (not.event().equals(AssociationChangeNotification.AssocChangeEvent.COMM_UP) &&
857 association == null) {
858 AssociationChange sac = (AssociationChange) not;
859 association = new AssociationImpl
860 (sac.assocId(), sac.maxInStreams(), sac.maxOutStreams());
861 }
862 return HandlerResult.CONTINUE;
863 }
864 }
865
866 private <T> HandlerResult invokeNotificationHandler(ResultContainer resultContainer,
867 NotificationHandler<T> handler,
868 T attachment) {
869 SctpNotification notification = resultContainer.notification();
870 synchronized (stateLock) {
871 notification.setAssociation(association);
872 }
873
874 if (!(handler instanceof AbstractNotificationHandler<T> absHandler)) {
875 return handler.handleNotification(notification, attachment);
876 }
877
878 /* AbstractNotificationHandler */
879 return switch (resultContainer.type()) {
880 case ASSOCIATION_CHANGED -> absHandler.handleNotification(
881 resultContainer.getAssociationChanged(), attachment);
882 case PEER_ADDRESS_CHANGED -> absHandler.handleNotification(
883 resultContainer.getPeerAddressChanged(), attachment);
884 case SEND_FAILED -> absHandler.handleNotification(
885 resultContainer.getSendFailed(), attachment);
886 case SHUTDOWN -> absHandler.handleNotification(
887 resultContainer.getShutdown(), attachment);
888 /* implementation specific handlers */
889 default -> absHandler.handleNotification(
890 resultContainer.notification(), attachment);
891 };
892 }
893
894 private void checkAssociation(Association sendAssociation) {
895 synchronized (stateLock) {
896 if (sendAssociation != null && !sendAssociation.equals(association)) {
897 throw new IllegalArgumentException(
898 "Cannot send to another association");
899 }
900 }
901 }
902
903 private void checkStreamNumber(int streamNumber) {
904 synchronized (stateLock) {
905 if (association != null) {
906 if (streamNumber < 0 ||
907 streamNumber >= association.maxOutboundStreams())
908 throw new InvalidStreamException();
909 }
910 }
911 }
912
913 /* TODO: Add support for ttl and isComplete to both 121 12M
914 * SCTP_EOR not yet supported on reference platforms
915 * TTL support limited...
916 */
917 @Override
918 public int send(ByteBuffer buffer, MessageInfo messageInfo)
919 throws IOException {
920 if (buffer == null)
921 throw new IllegalArgumentException("buffer cannot be null");
922
923 if (messageInfo == null)
924 throw new IllegalArgumentException("messageInfo cannot be null");
925
926 checkAssociation(messageInfo.association());
927 checkStreamNumber(messageInfo.streamNumber());
928
929 synchronized (sendLock) {
930 ensureSendOpen();
931
932 int n = 0;
933 try {
934 begin();
935
936 synchronized (stateLock) {
937 if(!isOpen())
938 return 0;
939 senderThread = NativeThread.current();
940 }
941
942 do {
943 n = send(fdVal, buffer, messageInfo);
944 } while ((n == IOStatus.INTERRUPTED) && isOpen());
945
946 return IOStatus.normalize(n);
947 } finally {
948 senderCleanup();
949 end((n > 0) || (n == IOStatus.UNAVAILABLE));
950 assert IOStatus.check(n);
951 }
952 }
953 }
954
955 private int send(int fd, ByteBuffer src, MessageInfo messageInfo)
956 throws IOException {
957 int streamNumber = messageInfo.streamNumber();
958 SocketAddress target = messageInfo.address();
959 boolean unordered = messageInfo.isUnordered();
960 int ppid = messageInfo.payloadProtocolID();
961
962 if (src instanceof DirectBuffer)
963 return sendFromNativeBuffer(fd, src, target, streamNumber,
964 unordered, ppid);
965
966 /* Substitute a native buffer */
967 int pos = src.position();
968 int lim = src.limit();
969 assert (pos <= lim && streamNumber >= 0);
970
971 int rem = (pos <= lim ? lim - pos : 0);
972 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
973 try {
974 bb.put(src);
975 bb.flip();
976 /* Do not update src until we see how many bytes were written */
977 src.position(pos);
978
979 int n = sendFromNativeBuffer(fd, bb, target, streamNumber,
980 unordered, ppid);
981 if (n > 0) {
982 /* now update src */
983 src.position(pos + n);
984 }
985 return n;
986 } finally {
987 Util.releaseTemporaryDirectBuffer(bb);
988 }
989 }
990
991 private int sendFromNativeBuffer(int fd,
992 ByteBuffer bb,
993 SocketAddress target,
994 int streamNumber,
995 boolean unordered,
996 int ppid)
997 throws IOException {
998 InetAddress addr = null; // no preferred address
999 int port = 0;
1000 if (target != null) {
1001 InetSocketAddress isa = Net.checkAddress(target);
1002 addr = isa.getAddress();
1003 if (addr.isLinkLocalAddress()) {
1004 addr = IPAddressUtil.toScopedAddress(addr);
1005 }
1006 port = isa.getPort();
1007 }
1008
1009 int pos = bb.position();
1010 int lim = bb.limit();
1011 assert (pos <= lim);
1012 int rem = (pos <= lim ? lim - pos : 0);
1013
1014 NIO_ACCESS.acquireSession(bb);
1015 try {
1016 int written = send0(fd, NIO_ACCESS.getBufferAddress(bb) + pos, rem, addr,
1017 port, -1 /*121*/, streamNumber, unordered, ppid);
1018 if (written > 0)
1019 bb.position(pos + written);
1020 return written;
1021 } finally {
1022 NIO_ACCESS.releaseSession(bb);
1023 }
1024 }
1025
1026 @Override
1027 public SctpChannel shutdown() throws IOException {
1028 synchronized(stateLock) {
1029 if (isShutdown)
1030 return this;
1031
1032 ensureSendOpen();
1033 SctpNet.shutdown(fdVal, -1);
1034 if (senderThread != 0)
1035 NativeThread.signal(senderThread);
1036 isShutdown = true;
1037 }
1038 return this;
1039 }
1040
1041 @Override
1042 public Set<SocketAddress> getAllLocalAddresses()
1043 throws IOException {
1044 synchronized (stateLock) {
1045 if (!isOpen())
1046 throw new ClosedChannelException();
1047 if (!isBound())
1048 return Collections.emptySet();
1049
1050 return SctpNet.getLocalAddresses(fdVal);
1051 }
1052 }
1053
1054 @Override
1055 public Set<SocketAddress> getRemoteAddresses()
1056 throws IOException {
1057 synchronized (stateLock) {
1058 if (!isOpen())
1059 throw new ClosedChannelException();
1060 if (!isConnected() || isShutdown)
1061 return Collections.emptySet();
1062
1063 try {
1064 return SctpNet.getRemoteAddresses(fdVal, 0/*unused*/);
1065 } catch (SocketException unused) {
1066 /* an open connected channel should always have remote addresses */
1067 return remoteAddresses;
1068 }
1069 }
1070 }
1071
1072 /* Native */
1073 private static native void initIDs();
1074
1075 static native int receive0(int fd, ResultContainer resultContainer,
1076 long address, int length, boolean peek) throws IOException;
1077
1078 static native int send0(int fd, long address, int length,
1079 InetAddress addr, int port, int assocId, int streamNumber,
1080 boolean unordered, int ppid) throws IOException;
1081
1082 static {
1083 loadSctpLibrary();
1084 }
1085
1086 @SuppressWarnings("restricted")
1087 private static void loadSctpLibrary() {
1088 IOUtil.load(); /* loads nio & net native libraries */
1089 System.loadLibrary("sctp");
1090 initIDs();
1091 }
1092 }