< prev index next >

src/jdk.sctp/unix/classes/sun/nio/ch/sctp/SctpChannelImpl.java

Print this page

  68 import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
  69 import static sun.nio.ch.sctp.ResultContainer.SEND_FAILED;
  70 import static sun.nio.ch.sctp.ResultContainer.ASSOCIATION_CHANGED;
  71 import static sun.nio.ch.sctp.ResultContainer.PEER_ADDRESS_CHANGED;
  72 import static sun.nio.ch.sctp.ResultContainer.SHUTDOWN;
  73 
  74 /**
  75  * An implementation of an SctpChannel
  76  */
  77 public class SctpChannelImpl extends SctpChannel
  78     implements SelChImpl
  79 {
  80 
  81     private static final JavaNioAccess NIO_ACCESS = SharedSecrets.getJavaNioAccess();
  82 
  83     private final FileDescriptor fd;
  84 
  85     private final int fdVal;
  86 
  87     /* IDs of native threads doing send and receive, for signalling */
  88     private volatile long receiverThread;
  89     private volatile long senderThread;
  90 
  91     /* Lock held by current receiving or connecting thread */
  92     private final Object receiveLock = new Object();
  93 
  94     /* Lock held by current sending or connecting thread */
  95     private final Object sendLock = new Object();
  96 
  97     private final ThreadLocal<Boolean> receiveInvoked =
  98         new ThreadLocal<>() {
  99              @Override protected Boolean initialValue() {
 100                  return Boolean.FALSE;
 101             }
 102     };
 103 
 104     /* Lock held by any thread that modifies the state fields declared below
 105        DO NOT invoke a blocking I/O operation while holding this lock! */
 106     private final Object stateLock = new Object();
 107 
 108     private enum ChannelState {
 109         UNINITIALIZED,

 309             if (!isConnected())
 310                 throw new NotYetConnectedException();
 311             else
 312                 return true;
 313         }
 314     }
 315 
 316     private void ensureSendOpen() throws ClosedChannelException {
 317         synchronized (stateLock) {
 318             if (!isOpen())
 319                 throw new ClosedChannelException();
 320             if (isShutdown)
 321                 throw new ClosedChannelException();
 322             if (!isConnected())
 323                 throw new NotYetConnectedException();
 324         }
 325     }
 326 
 327     private void receiverCleanup() throws IOException {
 328         synchronized (stateLock) {
 329             receiverThread = 0;
 330             if (state == ChannelState.KILLPENDING)
 331                 kill();
 332         }
 333     }
 334 
 335     private void senderCleanup() throws IOException {
 336         synchronized (stateLock) {
 337             senderThread = 0;
 338             if (state == ChannelState.KILLPENDING)
 339                 kill();
 340         }
 341     }
 342 
 343     @Override
 344     public Association association() throws ClosedChannelException {
 345         synchronized (stateLock) {
 346             if (!isOpen())
 347                 throw new ClosedChannelException();
 348             if (!isConnected())
 349                 return null;
 350 
 351             return association;
 352         }
 353     }
 354 
 355     @Override
 356     public boolean connect(SocketAddress endpoint) throws IOException {
 357         synchronized (receiveLock) {

 467                 try {
 468                     try {
 469                         begin();
 470                         synchronized (blockingLock()) {
 471                             synchronized (stateLock) {
 472                                 if (!isOpen()) {
 473                                     return false;
 474                                 }
 475                                 receiverThread = NativeThread.current();
 476                             }
 477                             if (!isBlocking()) {
 478                                 connected = Net.pollConnect(fd, 0);
 479                             } else {
 480                                 do {
 481                                     connected = Net.pollConnect(fd, -1);
 482                                 } while (!connected && isOpen());
 483                             }
 484                         }
 485                     } finally {
 486                         synchronized (stateLock) {
 487                             receiverThread = 0;
 488                             if (state == ChannelState.KILLPENDING) {
 489                                 kill();
 490                                 connected = false;
 491                             }
 492                         }
 493                         end(connected);
 494                     }
 495                 } catch (IOException x) {
 496                     /* If an exception was thrown, close the channel after
 497                      * invoking end() so as to avoid bogus
 498                      * AsynchronousCloseExceptions */
 499                     close();
 500                     throw x;
 501                 }
 502 
 503                 if (connected) {
 504                     synchronized (stateLock) {
 505                         state = ChannelState.CONNECTED;
 506                         if (!isBound()) {
 507                             InetSocketAddress boundIsa =

 524 
 525                         return true;
 526                     }
 527                 }
 528             }
 529         }
 530         return false;
 531     }
 532 
 533     @Override
 534     protected void implConfigureBlocking(boolean block) throws IOException {
 535         IOUtil.configureBlocking(fd, block);
 536     }
 537 
 538     @Override
 539     public void implCloseSelectableChannel() throws IOException {
 540         synchronized (stateLock) {
 541             if (state != ChannelState.KILLED)
 542                 SctpNet.preClose(fdVal);
 543 
 544             if (receiverThread != 0)
 545                 NativeThread.signal(receiverThread);
 546 
 547             if (senderThread != 0)
 548                 NativeThread.signal(senderThread);
 549 
 550             if (!isRegistered())
 551                 kill();
 552         }
 553     }
 554 
 555     @Override
 556     public FileDescriptor getFD() {
 557         return fd;
 558     }
 559 
 560     @Override
 561     public int getFDVal() {
 562         return fdVal;
 563     }
 564 
 565     /**
 566      * Translates native poll revent ops into a ready operation ops
 567      */
 568     private boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) {

 627             newOps |= Net.POLLOUT;
 628         if ((ops & SelectionKey.OP_CONNECT) != 0)
 629             newOps |= Net.POLLCONN;
 630         return newOps;
 631     }
 632 
 633     @Override
 634     public void kill() throws IOException {
 635         synchronized (stateLock) {
 636             if (state == ChannelState.KILLED)
 637                 return;
 638             if (state == ChannelState.UNINITIALIZED) {
 639                 state = ChannelState.KILLED;
 640                 SctpNet.close(fdVal);
 641                 return;
 642             }
 643             assert !isOpen() && !isRegistered();
 644 
 645             /* Postpone the kill if there is a waiting reader
 646              * or writer thread. */
 647             if (receiverThread == 0 && senderThread == 0) {
 648                 state = ChannelState.KILLED;
 649                 SctpNet.close(fdVal);
 650             } else {
 651                 state = ChannelState.KILLPENDING;
 652             }
 653         }
 654     }
 655 
 656     @Override
 657     public <T> SctpChannel setOption(SctpSocketOption<T> name, T value)
 658             throws IOException {
 659         if (name == null)
 660             throw new NullPointerException();
 661         if (!supportedOptions().contains(name))
 662             throw new UnsupportedOperationException("'" + name + "' not supported");
 663 
 664         synchronized (stateLock) {
 665             if (!isOpen())
 666                 throw new ClosedChannelException();
 667 

1014         NIO_ACCESS.acquireSession(bb);
1015         try {
1016             int written = send0(fd, NIO_ACCESS.getBufferAddress(bb) + pos, rem, addr,
1017                     port, -1 /*121*/, streamNumber, unordered, ppid);
1018             if (written > 0)
1019                 bb.position(pos + written);
1020             return written;
1021         } finally {
1022             NIO_ACCESS.releaseSession(bb);
1023         }
1024     }
1025 
1026     @Override
1027     public SctpChannel shutdown() throws IOException {
1028         synchronized(stateLock) {
1029             if (isShutdown)
1030                 return this;
1031 
1032             ensureSendOpen();
1033             SctpNet.shutdown(fdVal, -1);
1034             if (senderThread != 0)
1035                 NativeThread.signal(senderThread);
1036             isShutdown = true;
1037         }
1038         return this;
1039     }
1040 
1041     @Override
1042     public Set<SocketAddress> getAllLocalAddresses()
1043             throws IOException {
1044         synchronized (stateLock) {
1045             if (!isOpen())
1046                 throw new ClosedChannelException();
1047             if (!isBound())
1048                 return Collections.emptySet();
1049 
1050             return SctpNet.getLocalAddresses(fdVal);
1051         }
1052     }
1053 
1054     @Override
1055     public Set<SocketAddress> getRemoteAddresses()

  68 import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
  69 import static sun.nio.ch.sctp.ResultContainer.SEND_FAILED;
  70 import static sun.nio.ch.sctp.ResultContainer.ASSOCIATION_CHANGED;
  71 import static sun.nio.ch.sctp.ResultContainer.PEER_ADDRESS_CHANGED;
  72 import static sun.nio.ch.sctp.ResultContainer.SHUTDOWN;
  73 
  74 /**
  75  * An implementation of an SctpChannel
  76  */
  77 public class SctpChannelImpl extends SctpChannel
  78     implements SelChImpl
  79 {
  80 
  81     private static final JavaNioAccess NIO_ACCESS = SharedSecrets.getJavaNioAccess();
  82 
  83     private final FileDescriptor fd;
  84 
  85     private final int fdVal;
  86 
  87     /* IDs of native threads doing send and receive, for signalling */
  88     private volatile NativeThread receiverThread;
  89     private volatile NativeThread senderThread;
  90 
  91     /* Lock held by current receiving or connecting thread */
  92     private final Object receiveLock = new Object();
  93 
  94     /* Lock held by current sending or connecting thread */
  95     private final Object sendLock = new Object();
  96 
  97     private final ThreadLocal<Boolean> receiveInvoked =
  98         new ThreadLocal<>() {
  99              @Override protected Boolean initialValue() {
 100                  return Boolean.FALSE;
 101             }
 102     };
 103 
 104     /* Lock held by any thread that modifies the state fields declared below
 105        DO NOT invoke a blocking I/O operation while holding this lock! */
 106     private final Object stateLock = new Object();
 107 
 108     private enum ChannelState {
 109         UNINITIALIZED,

 309             if (!isConnected())
 310                 throw new NotYetConnectedException();
 311             else
 312                 return true;
 313         }
 314     }
 315 
 316     private void ensureSendOpen() throws ClosedChannelException {
 317         synchronized (stateLock) {
 318             if (!isOpen())
 319                 throw new ClosedChannelException();
 320             if (isShutdown)
 321                 throw new ClosedChannelException();
 322             if (!isConnected())
 323                 throw new NotYetConnectedException();
 324         }
 325     }
 326 
 327     private void receiverCleanup() throws IOException {
 328         synchronized (stateLock) {
 329             receiverThread = null;
 330             if (state == ChannelState.KILLPENDING)
 331                 kill();
 332         }
 333     }
 334 
 335     private void senderCleanup() throws IOException {
 336         synchronized (stateLock) {
 337             senderThread = null;
 338             if (state == ChannelState.KILLPENDING)
 339                 kill();
 340         }
 341     }
 342 
 343     @Override
 344     public Association association() throws ClosedChannelException {
 345         synchronized (stateLock) {
 346             if (!isOpen())
 347                 throw new ClosedChannelException();
 348             if (!isConnected())
 349                 return null;
 350 
 351             return association;
 352         }
 353     }
 354 
 355     @Override
 356     public boolean connect(SocketAddress endpoint) throws IOException {
 357         synchronized (receiveLock) {

 467                 try {
 468                     try {
 469                         begin();
 470                         synchronized (blockingLock()) {
 471                             synchronized (stateLock) {
 472                                 if (!isOpen()) {
 473                                     return false;
 474                                 }
 475                                 receiverThread = NativeThread.current();
 476                             }
 477                             if (!isBlocking()) {
 478                                 connected = Net.pollConnect(fd, 0);
 479                             } else {
 480                                 do {
 481                                     connected = Net.pollConnect(fd, -1);
 482                                 } while (!connected && isOpen());
 483                             }
 484                         }
 485                     } finally {
 486                         synchronized (stateLock) {
 487                             receiverThread = null;
 488                             if (state == ChannelState.KILLPENDING) {
 489                                 kill();
 490                                 connected = false;
 491                             }
 492                         }
 493                         end(connected);
 494                     }
 495                 } catch (IOException x) {
 496                     /* If an exception was thrown, close the channel after
 497                      * invoking end() so as to avoid bogus
 498                      * AsynchronousCloseExceptions */
 499                     close();
 500                     throw x;
 501                 }
 502 
 503                 if (connected) {
 504                     synchronized (stateLock) {
 505                         state = ChannelState.CONNECTED;
 506                         if (!isBound()) {
 507                             InetSocketAddress boundIsa =

 524 
 525                         return true;
 526                     }
 527                 }
 528             }
 529         }
 530         return false;
 531     }
 532 
 533     @Override
 534     protected void implConfigureBlocking(boolean block) throws IOException {
 535         IOUtil.configureBlocking(fd, block);
 536     }
 537 
 538     @Override
 539     public void implCloseSelectableChannel() throws IOException {
 540         synchronized (stateLock) {
 541             if (state != ChannelState.KILLED)
 542                 SctpNet.preClose(fdVal);
 543 
 544             if (NativeThread.isNativeThread(receiverThread))
 545                 receiverThread.signal();
 546 
 547             if (NativeThread.isNativeThread(senderThread))
 548                 senderThread.signal();
 549 
 550             if (!isRegistered())
 551                 kill();
 552         }
 553     }
 554 
 555     @Override
 556     public FileDescriptor getFD() {
 557         return fd;
 558     }
 559 
 560     @Override
 561     public int getFDVal() {
 562         return fdVal;
 563     }
 564 
 565     /**
 566      * Translates native poll revent ops into a ready operation ops
 567      */
 568     private boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) {

 627             newOps |= Net.POLLOUT;
 628         if ((ops & SelectionKey.OP_CONNECT) != 0)
 629             newOps |= Net.POLLCONN;
 630         return newOps;
 631     }
 632 
 633     @Override
 634     public void kill() throws IOException {
 635         synchronized (stateLock) {
 636             if (state == ChannelState.KILLED)
 637                 return;
 638             if (state == ChannelState.UNINITIALIZED) {
 639                 state = ChannelState.KILLED;
 640                 SctpNet.close(fdVal);
 641                 return;
 642             }
 643             assert !isOpen() && !isRegistered();
 644 
 645             /* Postpone the kill if there is a waiting reader
 646              * or writer thread. */
 647             if (receiverThread == null && senderThread == null) {
 648                 state = ChannelState.KILLED;
 649                 SctpNet.close(fdVal);
 650             } else {
 651                 state = ChannelState.KILLPENDING;
 652             }
 653         }
 654     }
 655 
 656     @Override
 657     public <T> SctpChannel setOption(SctpSocketOption<T> name, T value)
 658             throws IOException {
 659         if (name == null)
 660             throw new NullPointerException();
 661         if (!supportedOptions().contains(name))
 662             throw new UnsupportedOperationException("'" + name + "' not supported");
 663 
 664         synchronized (stateLock) {
 665             if (!isOpen())
 666                 throw new ClosedChannelException();
 667 

1014         NIO_ACCESS.acquireSession(bb);
1015         try {
1016             int written = send0(fd, NIO_ACCESS.getBufferAddress(bb) + pos, rem, addr,
1017                     port, -1 /*121*/, streamNumber, unordered, ppid);
1018             if (written > 0)
1019                 bb.position(pos + written);
1020             return written;
1021         } finally {
1022             NIO_ACCESS.releaseSession(bb);
1023         }
1024     }
1025 
1026     @Override
1027     public SctpChannel shutdown() throws IOException {
1028         synchronized(stateLock) {
1029             if (isShutdown)
1030                 return this;
1031 
1032             ensureSendOpen();
1033             SctpNet.shutdown(fdVal, -1);
1034             if (NativeThread.isNativeThread(senderThread))
1035                 senderThread.signal();
1036             isShutdown = true;
1037         }
1038         return this;
1039     }
1040 
1041     @Override
1042     public Set<SocketAddress> getAllLocalAddresses()
1043             throws IOException {
1044         synchronized (stateLock) {
1045             if (!isOpen())
1046                 throw new ClosedChannelException();
1047             if (!isBound())
1048                 return Collections.emptySet();
1049 
1050             return SctpNet.getLocalAddresses(fdVal);
1051         }
1052     }
1053 
1054     @Override
1055     public Set<SocketAddress> getRemoteAddresses()
< prev index next >