68 import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
69 import static sun.nio.ch.sctp.ResultContainer.SEND_FAILED;
70 import static sun.nio.ch.sctp.ResultContainer.ASSOCIATION_CHANGED;
71 import static sun.nio.ch.sctp.ResultContainer.PEER_ADDRESS_CHANGED;
72 import static sun.nio.ch.sctp.ResultContainer.SHUTDOWN;
73
74 /**
75 * An implementation of an SctpChannel
76 */
77 public class SctpChannelImpl extends SctpChannel
78 implements SelChImpl
79 {
80
81 private static final JavaNioAccess NIO_ACCESS = SharedSecrets.getJavaNioAccess();
82
83 private final FileDescriptor fd;
84
85 private final int fdVal;
86
87 /* IDs of native threads doing send and receive, for signalling */
88 private volatile long receiverThread;
89 private volatile long senderThread;
90
91 /* Lock held by current receiving or connecting thread */
92 private final Object receiveLock = new Object();
93
94 /* Lock held by current sending or connecting thread */
95 private final Object sendLock = new Object();
96
97 private final ThreadLocal<Boolean> receiveInvoked =
98 new ThreadLocal<>() {
99 @Override protected Boolean initialValue() {
100 return Boolean.FALSE;
101 }
102 };
103
104 /* Lock held by any thread that modifies the state fields declared below
105 DO NOT invoke a blocking I/O operation while holding this lock! */
106 private final Object stateLock = new Object();
107
108 private enum ChannelState {
109 UNINITIALIZED,
309 if (!isConnected())
310 throw new NotYetConnectedException();
311 else
312 return true;
313 }
314 }
315
316 private void ensureSendOpen() throws ClosedChannelException {
317 synchronized (stateLock) {
318 if (!isOpen())
319 throw new ClosedChannelException();
320 if (isShutdown)
321 throw new ClosedChannelException();
322 if (!isConnected())
323 throw new NotYetConnectedException();
324 }
325 }
326
327 private void receiverCleanup() throws IOException {
328 synchronized (stateLock) {
329 receiverThread = 0;
330 if (state == ChannelState.KILLPENDING)
331 kill();
332 }
333 }
334
335 private void senderCleanup() throws IOException {
336 synchronized (stateLock) {
337 senderThread = 0;
338 if (state == ChannelState.KILLPENDING)
339 kill();
340 }
341 }
342
343 @Override
344 public Association association() throws ClosedChannelException {
345 synchronized (stateLock) {
346 if (!isOpen())
347 throw new ClosedChannelException();
348 if (!isConnected())
349 return null;
350
351 return association;
352 }
353 }
354
355 @Override
356 public boolean connect(SocketAddress endpoint) throws IOException {
357 synchronized (receiveLock) {
467 try {
468 try {
469 begin();
470 synchronized (blockingLock()) {
471 synchronized (stateLock) {
472 if (!isOpen()) {
473 return false;
474 }
475 receiverThread = NativeThread.current();
476 }
477 if (!isBlocking()) {
478 connected = Net.pollConnect(fd, 0);
479 } else {
480 do {
481 connected = Net.pollConnect(fd, -1);
482 } while (!connected && isOpen());
483 }
484 }
485 } finally {
486 synchronized (stateLock) {
487 receiverThread = 0;
488 if (state == ChannelState.KILLPENDING) {
489 kill();
490 connected = false;
491 }
492 }
493 end(connected);
494 }
495 } catch (IOException x) {
496 /* If an exception was thrown, close the channel after
497 * invoking end() so as to avoid bogus
498 * AsynchronousCloseExceptions */
499 close();
500 throw x;
501 }
502
503 if (connected) {
504 synchronized (stateLock) {
505 state = ChannelState.CONNECTED;
506 if (!isBound()) {
507 InetSocketAddress boundIsa =
524
525 return true;
526 }
527 }
528 }
529 }
530 return false;
531 }
532
533 @Override
534 protected void implConfigureBlocking(boolean block) throws IOException {
535 IOUtil.configureBlocking(fd, block);
536 }
537
538 @Override
539 public void implCloseSelectableChannel() throws IOException {
540 synchronized (stateLock) {
541 if (state != ChannelState.KILLED)
542 SctpNet.preClose(fdVal);
543
544 if (receiverThread != 0)
545 NativeThread.signal(receiverThread);
546
547 if (senderThread != 0)
548 NativeThread.signal(senderThread);
549
550 if (!isRegistered())
551 kill();
552 }
553 }
554
555 @Override
556 public FileDescriptor getFD() {
557 return fd;
558 }
559
560 @Override
561 public int getFDVal() {
562 return fdVal;
563 }
564
565 /**
566 * Translates native poll revent ops into a ready operation ops
567 */
568 private boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) {
627 newOps |= Net.POLLOUT;
628 if ((ops & SelectionKey.OP_CONNECT) != 0)
629 newOps |= Net.POLLCONN;
630 return newOps;
631 }
632
633 @Override
634 public void kill() throws IOException {
635 synchronized (stateLock) {
636 if (state == ChannelState.KILLED)
637 return;
638 if (state == ChannelState.UNINITIALIZED) {
639 state = ChannelState.KILLED;
640 SctpNet.close(fdVal);
641 return;
642 }
643 assert !isOpen() && !isRegistered();
644
645 /* Postpone the kill if there is a waiting reader
646 * or writer thread. */
647 if (receiverThread == 0 && senderThread == 0) {
648 state = ChannelState.KILLED;
649 SctpNet.close(fdVal);
650 } else {
651 state = ChannelState.KILLPENDING;
652 }
653 }
654 }
655
656 @Override
657 public <T> SctpChannel setOption(SctpSocketOption<T> name, T value)
658 throws IOException {
659 if (name == null)
660 throw new NullPointerException();
661 if (!supportedOptions().contains(name))
662 throw new UnsupportedOperationException("'" + name + "' not supported");
663
664 synchronized (stateLock) {
665 if (!isOpen())
666 throw new ClosedChannelException();
667
1014 NIO_ACCESS.acquireSession(bb);
1015 try {
1016 int written = send0(fd, NIO_ACCESS.getBufferAddress(bb) + pos, rem, addr,
1017 port, -1 /*121*/, streamNumber, unordered, ppid);
1018 if (written > 0)
1019 bb.position(pos + written);
1020 return written;
1021 } finally {
1022 NIO_ACCESS.releaseSession(bb);
1023 }
1024 }
1025
1026 @Override
1027 public SctpChannel shutdown() throws IOException {
1028 synchronized(stateLock) {
1029 if (isShutdown)
1030 return this;
1031
1032 ensureSendOpen();
1033 SctpNet.shutdown(fdVal, -1);
1034 if (senderThread != 0)
1035 NativeThread.signal(senderThread);
1036 isShutdown = true;
1037 }
1038 return this;
1039 }
1040
1041 @Override
1042 public Set<SocketAddress> getAllLocalAddresses()
1043 throws IOException {
1044 synchronized (stateLock) {
1045 if (!isOpen())
1046 throw new ClosedChannelException();
1047 if (!isBound())
1048 return Collections.emptySet();
1049
1050 return SctpNet.getLocalAddresses(fdVal);
1051 }
1052 }
1053
1054 @Override
1055 public Set<SocketAddress> getRemoteAddresses()
|
68 import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
69 import static sun.nio.ch.sctp.ResultContainer.SEND_FAILED;
70 import static sun.nio.ch.sctp.ResultContainer.ASSOCIATION_CHANGED;
71 import static sun.nio.ch.sctp.ResultContainer.PEER_ADDRESS_CHANGED;
72 import static sun.nio.ch.sctp.ResultContainer.SHUTDOWN;
73
74 /**
75 * An implementation of an SctpChannel
76 */
77 public class SctpChannelImpl extends SctpChannel
78 implements SelChImpl
79 {
80
81 private static final JavaNioAccess NIO_ACCESS = SharedSecrets.getJavaNioAccess();
82
83 private final FileDescriptor fd;
84
85 private final int fdVal;
86
87 /* IDs of native threads doing send and receive, for signalling */
88 private volatile NativeThread receiverThread;
89 private volatile NativeThread senderThread;
90
91 /* Lock held by current receiving or connecting thread */
92 private final Object receiveLock = new Object();
93
94 /* Lock held by current sending or connecting thread */
95 private final Object sendLock = new Object();
96
97 private final ThreadLocal<Boolean> receiveInvoked =
98 new ThreadLocal<>() {
99 @Override protected Boolean initialValue() {
100 return Boolean.FALSE;
101 }
102 };
103
104 /* Lock held by any thread that modifies the state fields declared below
105 DO NOT invoke a blocking I/O operation while holding this lock! */
106 private final Object stateLock = new Object();
107
108 private enum ChannelState {
109 UNINITIALIZED,
309 if (!isConnected())
310 throw new NotYetConnectedException();
311 else
312 return true;
313 }
314 }
315
316 private void ensureSendOpen() throws ClosedChannelException {
317 synchronized (stateLock) {
318 if (!isOpen())
319 throw new ClosedChannelException();
320 if (isShutdown)
321 throw new ClosedChannelException();
322 if (!isConnected())
323 throw new NotYetConnectedException();
324 }
325 }
326
327 private void receiverCleanup() throws IOException {
328 synchronized (stateLock) {
329 receiverThread = null;
330 if (state == ChannelState.KILLPENDING)
331 kill();
332 }
333 }
334
335 private void senderCleanup() throws IOException {
336 synchronized (stateLock) {
337 senderThread = null;
338 if (state == ChannelState.KILLPENDING)
339 kill();
340 }
341 }
342
343 @Override
344 public Association association() throws ClosedChannelException {
345 synchronized (stateLock) {
346 if (!isOpen())
347 throw new ClosedChannelException();
348 if (!isConnected())
349 return null;
350
351 return association;
352 }
353 }
354
355 @Override
356 public boolean connect(SocketAddress endpoint) throws IOException {
357 synchronized (receiveLock) {
467 try {
468 try {
469 begin();
470 synchronized (blockingLock()) {
471 synchronized (stateLock) {
472 if (!isOpen()) {
473 return false;
474 }
475 receiverThread = NativeThread.current();
476 }
477 if (!isBlocking()) {
478 connected = Net.pollConnect(fd, 0);
479 } else {
480 do {
481 connected = Net.pollConnect(fd, -1);
482 } while (!connected && isOpen());
483 }
484 }
485 } finally {
486 synchronized (stateLock) {
487 receiverThread = null;
488 if (state == ChannelState.KILLPENDING) {
489 kill();
490 connected = false;
491 }
492 }
493 end(connected);
494 }
495 } catch (IOException x) {
496 /* If an exception was thrown, close the channel after
497 * invoking end() so as to avoid bogus
498 * AsynchronousCloseExceptions */
499 close();
500 throw x;
501 }
502
503 if (connected) {
504 synchronized (stateLock) {
505 state = ChannelState.CONNECTED;
506 if (!isBound()) {
507 InetSocketAddress boundIsa =
524
525 return true;
526 }
527 }
528 }
529 }
530 return false;
531 }
532
533 @Override
534 protected void implConfigureBlocking(boolean block) throws IOException {
535 IOUtil.configureBlocking(fd, block);
536 }
537
538 @Override
539 public void implCloseSelectableChannel() throws IOException {
540 synchronized (stateLock) {
541 if (state != ChannelState.KILLED)
542 SctpNet.preClose(fdVal);
543
544 if (NativeThread.isNativeThread(receiverThread))
545 receiverThread.signal();
546
547 if (NativeThread.isNativeThread(senderThread))
548 senderThread.signal();
549
550 if (!isRegistered())
551 kill();
552 }
553 }
554
555 @Override
556 public FileDescriptor getFD() {
557 return fd;
558 }
559
560 @Override
561 public int getFDVal() {
562 return fdVal;
563 }
564
565 /**
566 * Translates native poll revent ops into a ready operation ops
567 */
568 private boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) {
627 newOps |= Net.POLLOUT;
628 if ((ops & SelectionKey.OP_CONNECT) != 0)
629 newOps |= Net.POLLCONN;
630 return newOps;
631 }
632
633 @Override
634 public void kill() throws IOException {
635 synchronized (stateLock) {
636 if (state == ChannelState.KILLED)
637 return;
638 if (state == ChannelState.UNINITIALIZED) {
639 state = ChannelState.KILLED;
640 SctpNet.close(fdVal);
641 return;
642 }
643 assert !isOpen() && !isRegistered();
644
645 /* Postpone the kill if there is a waiting reader
646 * or writer thread. */
647 if (receiverThread == null && senderThread == null) {
648 state = ChannelState.KILLED;
649 SctpNet.close(fdVal);
650 } else {
651 state = ChannelState.KILLPENDING;
652 }
653 }
654 }
655
656 @Override
657 public <T> SctpChannel setOption(SctpSocketOption<T> name, T value)
658 throws IOException {
659 if (name == null)
660 throw new NullPointerException();
661 if (!supportedOptions().contains(name))
662 throw new UnsupportedOperationException("'" + name + "' not supported");
663
664 synchronized (stateLock) {
665 if (!isOpen())
666 throw new ClosedChannelException();
667
1014 NIO_ACCESS.acquireSession(bb);
1015 try {
1016 int written = send0(fd, NIO_ACCESS.getBufferAddress(bb) + pos, rem, addr,
1017 port, -1 /*121*/, streamNumber, unordered, ppid);
1018 if (written > 0)
1019 bb.position(pos + written);
1020 return written;
1021 } finally {
1022 NIO_ACCESS.releaseSession(bb);
1023 }
1024 }
1025
1026 @Override
1027 public SctpChannel shutdown() throws IOException {
1028 synchronized(stateLock) {
1029 if (isShutdown)
1030 return this;
1031
1032 ensureSendOpen();
1033 SctpNet.shutdown(fdVal, -1);
1034 if (NativeThread.isNativeThread(senderThread))
1035 senderThread.signal();
1036 isShutdown = true;
1037 }
1038 return this;
1039 }
1040
1041 @Override
1042 public Set<SocketAddress> getAllLocalAddresses()
1043 throws IOException {
1044 synchronized (stateLock) {
1045 if (!isOpen())
1046 throw new ClosedChannelException();
1047 if (!isBound())
1048 return Collections.emptySet();
1049
1050 return SctpNet.getLocalAddresses(fdVal);
1051 }
1052 }
1053
1054 @Override
1055 public Set<SocketAddress> getRemoteAddresses()
|