67 import sun.nio.ch.Util;
68 import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
69 import static sun.nio.ch.sctp.ResultContainer.SEND_FAILED;
70 import static sun.nio.ch.sctp.ResultContainer.ASSOCIATION_CHANGED;
71 import static sun.nio.ch.sctp.ResultContainer.PEER_ADDRESS_CHANGED;
72 import static sun.nio.ch.sctp.ResultContainer.SHUTDOWN;
73
74 /**
75 * An implementation of an SctpChannel
76 */
77 public class SctpChannelImpl extends SctpChannel
78 implements SelChImpl
79 {
80
81 private static final JavaNioAccess NIO_ACCESS = SharedSecrets.getJavaNioAccess();
82
83 private final FileDescriptor fd;
84
85 private final int fdVal;
86
87 /* IDs of native threads doing send and receive, for signalling */
88 private volatile long receiverThread;
89 private volatile long senderThread;
90
91 /* Lock held by current receiving or connecting thread */
92 private final Object receiveLock = new Object();
93
94 /* Lock held by current sending or connecting thread */
95 private final Object sendLock = new Object();
96
97 private final ThreadLocal<Boolean> receiveInvoked =
98 new ThreadLocal<>() {
99 @Override protected Boolean initialValue() {
100 return Boolean.FALSE;
101 }
102 };
103
104 /* Lock held by any thread that modifies the state fields declared below
105 DO NOT invoke a blocking I/O operation while holding this lock! */
106 private final Object stateLock = new Object();
107
108 private enum ChannelState {
109 UNINITIALIZED,
309 if (!isConnected())
310 throw new NotYetConnectedException();
311 else
312 return true;
313 }
314 }
315
316 private void ensureSendOpen() throws ClosedChannelException {
317 synchronized (stateLock) {
318 if (!isOpen())
319 throw new ClosedChannelException();
320 if (isShutdown)
321 throw new ClosedChannelException();
322 if (!isConnected())
323 throw new NotYetConnectedException();
324 }
325 }
326
327 private void receiverCleanup() throws IOException {
328 synchronized (stateLock) {
329 receiverThread = 0;
330 if (state == ChannelState.KILLPENDING)
331 kill();
332 }
333 }
334
335 private void senderCleanup() throws IOException {
336 synchronized (stateLock) {
337 senderThread = 0;
338 if (state == ChannelState.KILLPENDING)
339 kill();
340 }
341 }
342
343 @Override
344 public Association association() throws ClosedChannelException {
345 synchronized (stateLock) {
346 if (!isOpen())
347 throw new ClosedChannelException();
348 if (!isConnected())
349 return null;
350
351 return association;
352 }
353 }
354
355 @Override
356 public boolean connect(SocketAddress endpoint) throws IOException {
357 synchronized (receiveLock) {
358 synchronized (sendLock) {
359 ensureOpenAndUnconnected();
360 InetSocketAddress isa = Net.checkAddress(endpoint);
361 synchronized (blockingLock()) {
362 int n = 0;
363 try {
364 try {
365 begin();
366 synchronized (stateLock) {
367 if (!isOpen()) {
368 return false;
369 }
370 receiverThread = NativeThread.current();
371 }
372 for (;;) {
373 InetAddress ia = isa.getAddress();
374 if (ia.isAnyLocalAddress())
375 ia = InetAddress.getLocalHost();
376 n = SctpNet.connect(fdVal, ia, isa.getPort());
377 if ( (n == IOStatus.INTERRUPTED)
378 && isOpen())
379 continue;
380 break;
381 }
382 } finally {
383 receiverCleanup();
384 end((n > 0) || (n == IOStatus.UNAVAILABLE));
385 assert IOStatus.check(n);
386 }
387 } catch (IOException x) {
388 /* If an exception was thrown, close the channel after
389 * invoking end() so as to avoid bogus
390 * AsynchronousCloseExceptions */
455 public boolean finishConnect() throws IOException {
456 synchronized (receiveLock) {
457 synchronized (sendLock) {
458 synchronized (stateLock) {
459 if (!isOpen())
460 throw new ClosedChannelException();
461 if (isConnected())
462 return true;
463 if (state != ChannelState.PENDING)
464 throw new NoConnectionPendingException();
465 }
466 boolean connected = false;
467 try {
468 try {
469 begin();
470 synchronized (blockingLock()) {
471 synchronized (stateLock) {
472 if (!isOpen()) {
473 return false;
474 }
475 receiverThread = NativeThread.current();
476 }
477 if (!isBlocking()) {
478 connected = Net.pollConnect(fd, 0);
479 } else {
480 do {
481 connected = Net.pollConnect(fd, -1);
482 } while (!connected && isOpen());
483 }
484 }
485 } finally {
486 synchronized (stateLock) {
487 receiverThread = 0;
488 if (state == ChannelState.KILLPENDING) {
489 kill();
490 connected = false;
491 }
492 }
493 end(connected);
494 }
495 } catch (IOException x) {
496 /* If an exception was thrown, close the channel after
497 * invoking end() so as to avoid bogus
498 * AsynchronousCloseExceptions */
499 close();
500 throw x;
501 }
502
503 if (connected) {
504 synchronized (stateLock) {
505 state = ChannelState.CONNECTED;
506 if (!isBound()) {
507 InetSocketAddress boundIsa =
524
525 return true;
526 }
527 }
528 }
529 }
530 return false;
531 }
532
533 @Override
534 protected void implConfigureBlocking(boolean block) throws IOException {
535 IOUtil.configureBlocking(fd, block);
536 }
537
538 @Override
539 public void implCloseSelectableChannel() throws IOException {
540 synchronized (stateLock) {
541 if (state != ChannelState.KILLED)
542 SctpNet.preClose(fdVal);
543
544 if (receiverThread != 0)
545 NativeThread.signal(receiverThread);
546
547 if (senderThread != 0)
548 NativeThread.signal(senderThread);
549
550 if (!isRegistered())
551 kill();
552 }
553 }
554
555 @Override
556 public FileDescriptor getFD() {
557 return fd;
558 }
559
560 @Override
561 public int getFDVal() {
562 return fdVal;
563 }
564
565 /**
566 * Translates native poll revent ops into a ready operation ops
567 */
627 newOps |= Net.POLLOUT;
628 if ((ops & SelectionKey.OP_CONNECT) != 0)
629 newOps |= Net.POLLCONN;
630 return newOps;
631 }
632
633 @Override
634 public void kill() throws IOException {
635 synchronized (stateLock) {
636 if (state == ChannelState.KILLED)
637 return;
638 if (state == ChannelState.UNINITIALIZED) {
639 state = ChannelState.KILLED;
640 SctpNet.close(fdVal);
641 return;
642 }
643 assert !isOpen() && !isRegistered();
644
645 /* Postpone the kill if there is a waiting reader
646 * or writer thread. */
647 if (receiverThread == 0 && senderThread == 0) {
648 state = ChannelState.KILLED;
649 SctpNet.close(fdVal);
650 } else {
651 state = ChannelState.KILLPENDING;
652 }
653 }
654 }
655
656 @Override
657 public <T> SctpChannel setOption(SctpSocketOption<T> name, T value)
658 throws IOException {
659 if (name == null)
660 throw new NullPointerException();
661 if (!supportedOptions().contains(name))
662 throw new UnsupportedOperationException("'" + name + "' not supported");
663
664 synchronized (stateLock) {
665 if (!isOpen())
666 throw new ClosedChannelException();
667
726 if (receiveInvoked.get())
727 throw new IllegalReceiveException(
728 "cannot invoke receive from handler");
729 receiveInvoked.set(Boolean.TRUE);
730
731 try {
732 ResultContainer resultContainer = new ResultContainer();
733 do {
734 resultContainer.clear();
735 synchronized (receiveLock) {
736 if (!ensureReceiveOpen())
737 return null;
738
739 int n = 0;
740 try {
741 begin();
742
743 synchronized (stateLock) {
744 if(!isOpen())
745 return null;
746 receiverThread = NativeThread.current();
747 }
748
749 do {
750 n = receive(fdVal, buffer, resultContainer, fromConnect);
751 } while ((n == IOStatus.INTERRUPTED) && isOpen());
752 } finally {
753 receiverCleanup();
754 end((n > 0) || (n == IOStatus.UNAVAILABLE));
755 assert IOStatus.check(n);
756 }
757
758 if (!resultContainer.isNotification()) {
759 /* message or nothing */
760 if (resultContainer.hasSomething()) {
761 /* Set the association before returning */
762 MessageInfoImpl info =
763 resultContainer.getMessageInfo();
764 synchronized (stateLock) {
765 assert association != null;
766 info.setAssociation(association);
919 throws IOException {
920 if (buffer == null)
921 throw new IllegalArgumentException("buffer cannot be null");
922
923 if (messageInfo == null)
924 throw new IllegalArgumentException("messageInfo cannot be null");
925
926 checkAssociation(messageInfo.association());
927 checkStreamNumber(messageInfo.streamNumber());
928
929 synchronized (sendLock) {
930 ensureSendOpen();
931
932 int n = 0;
933 try {
934 begin();
935
936 synchronized (stateLock) {
937 if(!isOpen())
938 return 0;
939 senderThread = NativeThread.current();
940 }
941
942 do {
943 n = send(fdVal, buffer, messageInfo);
944 } while ((n == IOStatus.INTERRUPTED) && isOpen());
945
946 return IOStatus.normalize(n);
947 } finally {
948 senderCleanup();
949 end((n > 0) || (n == IOStatus.UNAVAILABLE));
950 assert IOStatus.check(n);
951 }
952 }
953 }
954
955 private int send(int fd, ByteBuffer src, MessageInfo messageInfo)
956 throws IOException {
957 int streamNumber = messageInfo.streamNumber();
958 SocketAddress target = messageInfo.address();
959 boolean unordered = messageInfo.isUnordered();
1014 NIO_ACCESS.acquireSession(bb);
1015 try {
1016 int written = send0(fd, NIO_ACCESS.getBufferAddress(bb) + pos, rem, addr,
1017 port, -1 /*121*/, streamNumber, unordered, ppid);
1018 if (written > 0)
1019 bb.position(pos + written);
1020 return written;
1021 } finally {
1022 NIO_ACCESS.releaseSession(bb);
1023 }
1024 }
1025
1026 @Override
1027 public SctpChannel shutdown() throws IOException {
1028 synchronized(stateLock) {
1029 if (isShutdown)
1030 return this;
1031
1032 ensureSendOpen();
1033 SctpNet.shutdown(fdVal, -1);
1034 if (senderThread != 0)
1035 NativeThread.signal(senderThread);
1036 isShutdown = true;
1037 }
1038 return this;
1039 }
1040
1041 @Override
1042 public Set<SocketAddress> getAllLocalAddresses()
1043 throws IOException {
1044 synchronized (stateLock) {
1045 if (!isOpen())
1046 throw new ClosedChannelException();
1047 if (!isBound())
1048 return Collections.emptySet();
1049
1050 return SctpNet.getLocalAddresses(fdVal);
1051 }
1052 }
1053
1054 @Override
|
67 import sun.nio.ch.Util;
68 import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
69 import static sun.nio.ch.sctp.ResultContainer.SEND_FAILED;
70 import static sun.nio.ch.sctp.ResultContainer.ASSOCIATION_CHANGED;
71 import static sun.nio.ch.sctp.ResultContainer.PEER_ADDRESS_CHANGED;
72 import static sun.nio.ch.sctp.ResultContainer.SHUTDOWN;
73
74 /**
75 * An implementation of an SctpChannel
76 */
77 public class SctpChannelImpl extends SctpChannel
78 implements SelChImpl
79 {
80
81 private static final JavaNioAccess NIO_ACCESS = SharedSecrets.getJavaNioAccess();
82
83 private final FileDescriptor fd;
84
85 private final int fdVal;
86
87 /* Threads doing send and receive, for signalling */
88 private volatile Thread receiverThread;
89 private volatile Thread senderThread;
90
91 /* Lock held by current receiving or connecting thread */
92 private final Object receiveLock = new Object();
93
94 /* Lock held by current sending or connecting thread */
95 private final Object sendLock = new Object();
96
97 private final ThreadLocal<Boolean> receiveInvoked =
98 new ThreadLocal<>() {
99 @Override protected Boolean initialValue() {
100 return Boolean.FALSE;
101 }
102 };
103
104 /* Lock held by any thread that modifies the state fields declared below
105 DO NOT invoke a blocking I/O operation while holding this lock! */
106 private final Object stateLock = new Object();
107
108 private enum ChannelState {
109 UNINITIALIZED,
309 if (!isConnected())
310 throw new NotYetConnectedException();
311 else
312 return true;
313 }
314 }
315
316 private void ensureSendOpen() throws ClosedChannelException {
317 synchronized (stateLock) {
318 if (!isOpen())
319 throw new ClosedChannelException();
320 if (isShutdown)
321 throw new ClosedChannelException();
322 if (!isConnected())
323 throw new NotYetConnectedException();
324 }
325 }
326
327 private void receiverCleanup() throws IOException {
328 synchronized (stateLock) {
329 receiverThread = null;
330 if (state == ChannelState.KILLPENDING)
331 kill();
332 }
333 }
334
335 private void senderCleanup() throws IOException {
336 synchronized (stateLock) {
337 senderThread = null;
338 if (state == ChannelState.KILLPENDING)
339 kill();
340 }
341 }
342
343 @Override
344 public Association association() throws ClosedChannelException {
345 synchronized (stateLock) {
346 if (!isOpen())
347 throw new ClosedChannelException();
348 if (!isConnected())
349 return null;
350
351 return association;
352 }
353 }
354
355 @Override
356 public boolean connect(SocketAddress endpoint) throws IOException {
357 synchronized (receiveLock) {
358 synchronized (sendLock) {
359 ensureOpenAndUnconnected();
360 InetSocketAddress isa = Net.checkAddress(endpoint);
361 synchronized (blockingLock()) {
362 int n = 0;
363 try {
364 try {
365 begin();
366 synchronized (stateLock) {
367 if (!isOpen()) {
368 return false;
369 }
370 receiverThread = NativeThread.threadToSignal();
371 }
372 for (;;) {
373 InetAddress ia = isa.getAddress();
374 if (ia.isAnyLocalAddress())
375 ia = InetAddress.getLocalHost();
376 n = SctpNet.connect(fdVal, ia, isa.getPort());
377 if ( (n == IOStatus.INTERRUPTED)
378 && isOpen())
379 continue;
380 break;
381 }
382 } finally {
383 receiverCleanup();
384 end((n > 0) || (n == IOStatus.UNAVAILABLE));
385 assert IOStatus.check(n);
386 }
387 } catch (IOException x) {
388 /* If an exception was thrown, close the channel after
389 * invoking end() so as to avoid bogus
390 * AsynchronousCloseExceptions */
455 public boolean finishConnect() throws IOException {
456 synchronized (receiveLock) {
457 synchronized (sendLock) {
458 synchronized (stateLock) {
459 if (!isOpen())
460 throw new ClosedChannelException();
461 if (isConnected())
462 return true;
463 if (state != ChannelState.PENDING)
464 throw new NoConnectionPendingException();
465 }
466 boolean connected = false;
467 try {
468 try {
469 begin();
470 synchronized (blockingLock()) {
471 synchronized (stateLock) {
472 if (!isOpen()) {
473 return false;
474 }
475 receiverThread = NativeThread.threadToSignal();
476 }
477 if (!isBlocking()) {
478 connected = Net.pollConnect(fd, 0);
479 } else {
480 do {
481 connected = Net.pollConnect(fd, -1);
482 } while (!connected && isOpen());
483 }
484 }
485 } finally {
486 synchronized (stateLock) {
487 receiverThread = null;
488 if (state == ChannelState.KILLPENDING) {
489 kill();
490 connected = false;
491 }
492 }
493 end(connected);
494 }
495 } catch (IOException x) {
496 /* If an exception was thrown, close the channel after
497 * invoking end() so as to avoid bogus
498 * AsynchronousCloseExceptions */
499 close();
500 throw x;
501 }
502
503 if (connected) {
504 synchronized (stateLock) {
505 state = ChannelState.CONNECTED;
506 if (!isBound()) {
507 InetSocketAddress boundIsa =
524
525 return true;
526 }
527 }
528 }
529 }
530 return false;
531 }
532
533 @Override
534 protected void implConfigureBlocking(boolean block) throws IOException {
535 IOUtil.configureBlocking(fd, block);
536 }
537
538 @Override
539 public void implCloseSelectableChannel() throws IOException {
540 synchronized (stateLock) {
541 if (state != ChannelState.KILLED)
542 SctpNet.preClose(fdVal);
543
544 if (receiverThread != null)
545 NativeThread.signal(receiverThread);
546
547 if (senderThread != null)
548 NativeThread.signal(senderThread);
549
550 if (!isRegistered())
551 kill();
552 }
553 }
554
555 @Override
556 public FileDescriptor getFD() {
557 return fd;
558 }
559
560 @Override
561 public int getFDVal() {
562 return fdVal;
563 }
564
565 /**
566 * Translates native poll revent ops into a ready operation ops
567 */
627 newOps |= Net.POLLOUT;
628 if ((ops & SelectionKey.OP_CONNECT) != 0)
629 newOps |= Net.POLLCONN;
630 return newOps;
631 }
632
633 @Override
634 public void kill() throws IOException {
635 synchronized (stateLock) {
636 if (state == ChannelState.KILLED)
637 return;
638 if (state == ChannelState.UNINITIALIZED) {
639 state = ChannelState.KILLED;
640 SctpNet.close(fdVal);
641 return;
642 }
643 assert !isOpen() && !isRegistered();
644
645 /* Postpone the kill if there is a waiting reader
646 * or writer thread. */
647 if (receiverThread == null && senderThread == null) {
648 state = ChannelState.KILLED;
649 SctpNet.close(fdVal);
650 } else {
651 state = ChannelState.KILLPENDING;
652 }
653 }
654 }
655
656 @Override
657 public <T> SctpChannel setOption(SctpSocketOption<T> name, T value)
658 throws IOException {
659 if (name == null)
660 throw new NullPointerException();
661 if (!supportedOptions().contains(name))
662 throw new UnsupportedOperationException("'" + name + "' not supported");
663
664 synchronized (stateLock) {
665 if (!isOpen())
666 throw new ClosedChannelException();
667
726 if (receiveInvoked.get())
727 throw new IllegalReceiveException(
728 "cannot invoke receive from handler");
729 receiveInvoked.set(Boolean.TRUE);
730
731 try {
732 ResultContainer resultContainer = new ResultContainer();
733 do {
734 resultContainer.clear();
735 synchronized (receiveLock) {
736 if (!ensureReceiveOpen())
737 return null;
738
739 int n = 0;
740 try {
741 begin();
742
743 synchronized (stateLock) {
744 if(!isOpen())
745 return null;
746 receiverThread = NativeThread.threadToSignal();
747 }
748
749 do {
750 n = receive(fdVal, buffer, resultContainer, fromConnect);
751 } while ((n == IOStatus.INTERRUPTED) && isOpen());
752 } finally {
753 receiverCleanup();
754 end((n > 0) || (n == IOStatus.UNAVAILABLE));
755 assert IOStatus.check(n);
756 }
757
758 if (!resultContainer.isNotification()) {
759 /* message or nothing */
760 if (resultContainer.hasSomething()) {
761 /* Set the association before returning */
762 MessageInfoImpl info =
763 resultContainer.getMessageInfo();
764 synchronized (stateLock) {
765 assert association != null;
766 info.setAssociation(association);
919 throws IOException {
920 if (buffer == null)
921 throw new IllegalArgumentException("buffer cannot be null");
922
923 if (messageInfo == null)
924 throw new IllegalArgumentException("messageInfo cannot be null");
925
926 checkAssociation(messageInfo.association());
927 checkStreamNumber(messageInfo.streamNumber());
928
929 synchronized (sendLock) {
930 ensureSendOpen();
931
932 int n = 0;
933 try {
934 begin();
935
936 synchronized (stateLock) {
937 if(!isOpen())
938 return 0;
939 senderThread = NativeThread.threadToSignal();
940 }
941
942 do {
943 n = send(fdVal, buffer, messageInfo);
944 } while ((n == IOStatus.INTERRUPTED) && isOpen());
945
946 return IOStatus.normalize(n);
947 } finally {
948 senderCleanup();
949 end((n > 0) || (n == IOStatus.UNAVAILABLE));
950 assert IOStatus.check(n);
951 }
952 }
953 }
954
955 private int send(int fd, ByteBuffer src, MessageInfo messageInfo)
956 throws IOException {
957 int streamNumber = messageInfo.streamNumber();
958 SocketAddress target = messageInfo.address();
959 boolean unordered = messageInfo.isUnordered();
1014 NIO_ACCESS.acquireSession(bb);
1015 try {
1016 int written = send0(fd, NIO_ACCESS.getBufferAddress(bb) + pos, rem, addr,
1017 port, -1 /*121*/, streamNumber, unordered, ppid);
1018 if (written > 0)
1019 bb.position(pos + written);
1020 return written;
1021 } finally {
1022 NIO_ACCESS.releaseSession(bb);
1023 }
1024 }
1025
1026 @Override
1027 public SctpChannel shutdown() throws IOException {
1028 synchronized(stateLock) {
1029 if (isShutdown)
1030 return this;
1031
1032 ensureSendOpen();
1033 SctpNet.shutdown(fdVal, -1);
1034 if (senderThread != null)
1035 NativeThread.signal(senderThread);
1036 isShutdown = true;
1037 }
1038 return this;
1039 }
1040
1041 @Override
1042 public Set<SocketAddress> getAllLocalAddresses()
1043 throws IOException {
1044 synchronized (stateLock) {
1045 if (!isOpen())
1046 throw new ClosedChannelException();
1047 if (!isBound())
1048 return Collections.emptySet();
1049
1050 return SctpNet.getLocalAddresses(fdVal);
1051 }
1052 }
1053
1054 @Override
|