117
118 // Lock held by current reading or connecting thread
119 private final ReentrantLock readLock = new ReentrantLock();
120
121 // Lock held by current writing or connecting thread
122 private final ReentrantLock writeLock = new ReentrantLock();
123
124 // Lock held by any thread that modifies the state fields declared below
125 // DO NOT invoke a blocking I/O operation while holding this lock!
126 private final Object stateLock = new Object();
127
128 // -- The following fields are protected by stateLock
129
130 // State (does not necessarily increase monotonically)
131 private static final int ST_UNCONNECTED = 0;
132 private static final int ST_CONNECTED = 1;
133 private static final int ST_CLOSING = 2;
134 private static final int ST_CLOSED = 3;
135 private int state;
136
137 // IDs of native threads doing reads and writes, for signalling
138 private long readerThread;
139 private long writerThread;
140
141 // Local and remote (connected) address
142 private InetSocketAddress localAddress;
143 private InetSocketAddress remoteAddress;
144
145 // Local address prior to connecting
146 private InetSocketAddress initialLocalAddress;
147
148 // Socket adaptor, created lazily
149 private static final VarHandle SOCKET = MhUtil.findVarHandle(
150 MethodHandles.lookup(), "socket", DatagramSocket.class);
151 private volatile DatagramSocket socket;
152
153 // Multicast support
154 private MembershipRegistry registry;
155
156 // set true when socket is bound and SO_REUSEADDRESS is emulated
157 private boolean reuseAddressEmulated;
158
159 // set true/false when socket is already bound and SO_REUSEADDR is emulated
506 * @throws ClosedChannelException if the channel is closed
507 * @throws NotYetConnectedException if mustBeConnected and not connected
508 * @throws IOException if socket not bound and cannot be bound
509 */
510 private SocketAddress beginRead(boolean blocking, boolean mustBeConnected)
511 throws IOException
512 {
513 if (blocking && interruptible) {
514 // set hook for Thread.interrupt
515 begin();
516 }
517 SocketAddress remote;
518 synchronized (stateLock) {
519 ensureOpen();
520 remote = remoteAddress;
521 if ((remote == null) && mustBeConnected)
522 throw new NotYetConnectedException();
523 if (localAddress == null)
524 bindInternal(null);
525 if (blocking)
526 readerThread = NativeThread.current();
527 }
528 return remote;
529 }
530
531 /**
532 * Marks the end of a read operation that may have blocked.
533 *
534 * @throws AsynchronousCloseException if the channel was closed asynchronously
535 */
536 private void endRead(boolean blocking, boolean completed)
537 throws AsynchronousCloseException
538 {
539 if (blocking) {
540 synchronized (stateLock) {
541 readerThread = 0;
542 if (state == ST_CLOSING) {
543 tryFinishClose();
544 }
545 }
546 if (interruptible) {
547 // remove hook for Thread.interrupt (may throw AsynchronousCloseException)
548 end(completed);
549 } else if (!completed && !isOpen()) {
550 throw new AsynchronousCloseException();
551 }
552 }
553 }
554
555 @Override
556 public SocketAddress receive(ByteBuffer dst) throws IOException {
557 if (dst.isReadOnly())
558 throw new IllegalArgumentException("Read-only buffer");
559 readLock.lock();
560 try {
561 ensureOpen();
1013 * @throws ClosedChannelException if the channel is closed
1014 * @throws NotYetConnectedException if mustBeConnected and not connected
1015 * @throws IOException if socket not bound and cannot be bound
1016 */
1017 private SocketAddress beginWrite(boolean blocking, boolean mustBeConnected)
1018 throws IOException
1019 {
1020 if (blocking && interruptible) {
1021 // set hook for Thread.interrupt
1022 begin();
1023 }
1024 SocketAddress remote;
1025 synchronized (stateLock) {
1026 ensureOpen();
1027 remote = remoteAddress;
1028 if ((remote == null) && mustBeConnected)
1029 throw new NotYetConnectedException();
1030 if (localAddress == null)
1031 bindInternal(null);
1032 if (blocking)
1033 writerThread = NativeThread.current();
1034 }
1035 return remote;
1036 }
1037
1038 /**
1039 * Marks the end of a write operation that may have blocked.
1040 *
1041 * @throws AsynchronousCloseException if the channel was closed asynchronously
1042 */
1043 private void endWrite(boolean blocking, boolean completed)
1044 throws AsynchronousCloseException
1045 {
1046 if (blocking) {
1047 synchronized (stateLock) {
1048 writerThread = 0;
1049 if (state == ST_CLOSING) {
1050 tryFinishClose();
1051 }
1052 }
1053
1054 if (interruptible) {
1055 // remove hook for Thread.interrupt (may throw AsynchronousCloseException)
1056 end(completed);
1057 } else if (!completed && !isOpen()) {
1058 throw new AsynchronousCloseException();
1059 }
1060 }
1061 }
1062
1063 @Override
1064 public int write(ByteBuffer buf) throws IOException {
1065 Objects.requireNonNull(buf);
1066
1067 writeLock.lock();
1068 try {
1697 Net.inet6AsByteArray(source));
1698 } else {
1699 MembershipKeyImpl.Type4 key4 =
1700 (MembershipKeyImpl.Type4)key;
1701 Net.unblock4(fd, key4.groupAddress(), key4.interfaceAddress(),
1702 Net.inet4AsInt(source));
1703 }
1704 } catch (IOException ioe) {
1705 // should not happen
1706 throw new AssertionError(ioe);
1707 }
1708 }
1709 }
1710
1711 /**
1712 * Closes the socket if there are no I/O operations in progress and the
1713 * channel is not registered with a Selector.
1714 */
1715 private boolean tryClose() throws IOException {
1716 assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
1717 if ((readerThread == 0) && (writerThread == 0) && !isRegistered()) {
1718 state = ST_CLOSED;
1719 try {
1720 // close socket
1721 cleaner.clean();
1722 } catch (UncheckedIOException ioe) {
1723 throw ioe.getCause();
1724 }
1725 return true;
1726 } else {
1727 return false;
1728 }
1729 }
1730
1731 /**
1732 * Invokes tryClose to attempt to close the socket.
1733 *
1734 * This method is used for deferred closing by I/O and Selector operations.
1735 */
1736 private void tryFinishClose() {
1737 try {
|
117
118 // Lock held by current reading or connecting thread
119 private final ReentrantLock readLock = new ReentrantLock();
120
121 // Lock held by current writing or connecting thread
122 private final ReentrantLock writeLock = new ReentrantLock();
123
124 // Lock held by any thread that modifies the state fields declared below
125 // DO NOT invoke a blocking I/O operation while holding this lock!
126 private final Object stateLock = new Object();
127
128 // -- The following fields are protected by stateLock
129
130 // State (does not necessarily increase monotonically)
131 private static final int ST_UNCONNECTED = 0;
132 private static final int ST_CONNECTED = 1;
133 private static final int ST_CLOSING = 2;
134 private static final int ST_CLOSED = 3;
135 private int state;
136
137 // Threads doing reads and writes, for signalling
138 private Thread readerThread;
139 private Thread writerThread;
140
141 // Local and remote (connected) address
142 private InetSocketAddress localAddress;
143 private InetSocketAddress remoteAddress;
144
145 // Local address prior to connecting
146 private InetSocketAddress initialLocalAddress;
147
148 // Socket adaptor, created lazily
149 private static final VarHandle SOCKET = MhUtil.findVarHandle(
150 MethodHandles.lookup(), "socket", DatagramSocket.class);
151 private volatile DatagramSocket socket;
152
153 // Multicast support
154 private MembershipRegistry registry;
155
156 // set true when socket is bound and SO_REUSEADDRESS is emulated
157 private boolean reuseAddressEmulated;
158
159 // set true/false when socket is already bound and SO_REUSEADDR is emulated
506 * @throws ClosedChannelException if the channel is closed
507 * @throws NotYetConnectedException if mustBeConnected and not connected
508 * @throws IOException if socket not bound and cannot be bound
509 */
510 private SocketAddress beginRead(boolean blocking, boolean mustBeConnected)
511 throws IOException
512 {
513 if (blocking && interruptible) {
514 // set hook for Thread.interrupt
515 begin();
516 }
517 SocketAddress remote;
518 synchronized (stateLock) {
519 ensureOpen();
520 remote = remoteAddress;
521 if ((remote == null) && mustBeConnected)
522 throw new NotYetConnectedException();
523 if (localAddress == null)
524 bindInternal(null);
525 if (blocking)
526 readerThread = NativeThread.threadToSignal();
527 }
528 return remote;
529 }
530
531 /**
532 * Marks the end of a read operation that may have blocked.
533 *
534 * @throws AsynchronousCloseException if the channel was closed asynchronously
535 */
536 private void endRead(boolean blocking, boolean completed)
537 throws AsynchronousCloseException
538 {
539 if (blocking) {
540 synchronized (stateLock) {
541 readerThread = null;
542 if (state == ST_CLOSING) {
543 tryFinishClose();
544 }
545 }
546 if (interruptible) {
547 // remove hook for Thread.interrupt (may throw AsynchronousCloseException)
548 end(completed);
549 } else if (!completed && !isOpen()) {
550 throw new AsynchronousCloseException();
551 }
552 }
553 }
554
555 @Override
556 public SocketAddress receive(ByteBuffer dst) throws IOException {
557 if (dst.isReadOnly())
558 throw new IllegalArgumentException("Read-only buffer");
559 readLock.lock();
560 try {
561 ensureOpen();
1013 * @throws ClosedChannelException if the channel is closed
1014 * @throws NotYetConnectedException if mustBeConnected and not connected
1015 * @throws IOException if socket not bound and cannot be bound
1016 */
1017 private SocketAddress beginWrite(boolean blocking, boolean mustBeConnected)
1018 throws IOException
1019 {
1020 if (blocking && interruptible) {
1021 // set hook for Thread.interrupt
1022 begin();
1023 }
1024 SocketAddress remote;
1025 synchronized (stateLock) {
1026 ensureOpen();
1027 remote = remoteAddress;
1028 if ((remote == null) && mustBeConnected)
1029 throw new NotYetConnectedException();
1030 if (localAddress == null)
1031 bindInternal(null);
1032 if (blocking)
1033 writerThread = NativeThread.threadToSignal();
1034 }
1035 return remote;
1036 }
1037
1038 /**
1039 * Marks the end of a write operation that may have blocked.
1040 *
1041 * @throws AsynchronousCloseException if the channel was closed asynchronously
1042 */
1043 private void endWrite(boolean blocking, boolean completed)
1044 throws AsynchronousCloseException
1045 {
1046 if (blocking) {
1047 synchronized (stateLock) {
1048 writerThread = null;
1049 if (state == ST_CLOSING) {
1050 tryFinishClose();
1051 }
1052 }
1053
1054 if (interruptible) {
1055 // remove hook for Thread.interrupt (may throw AsynchronousCloseException)
1056 end(completed);
1057 } else if (!completed && !isOpen()) {
1058 throw new AsynchronousCloseException();
1059 }
1060 }
1061 }
1062
1063 @Override
1064 public int write(ByteBuffer buf) throws IOException {
1065 Objects.requireNonNull(buf);
1066
1067 writeLock.lock();
1068 try {
1697 Net.inet6AsByteArray(source));
1698 } else {
1699 MembershipKeyImpl.Type4 key4 =
1700 (MembershipKeyImpl.Type4)key;
1701 Net.unblock4(fd, key4.groupAddress(), key4.interfaceAddress(),
1702 Net.inet4AsInt(source));
1703 }
1704 } catch (IOException ioe) {
1705 // should not happen
1706 throw new AssertionError(ioe);
1707 }
1708 }
1709 }
1710
1711 /**
1712 * Closes the socket if there are no I/O operations in progress and the
1713 * channel is not registered with a Selector.
1714 */
1715 private boolean tryClose() throws IOException {
1716 assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
1717 if ((readerThread == null) && (writerThread == null) && !isRegistered()) {
1718 state = ST_CLOSED;
1719 try {
1720 // close socket
1721 cleaner.clean();
1722 } catch (UncheckedIOException ioe) {
1723 throw ioe.getCause();
1724 }
1725 return true;
1726 } else {
1727 return false;
1728 }
1729 }
1730
1731 /**
1732 * Invokes tryClose to attempt to close the socket.
1733 *
1734 * This method is used for deferred closing by I/O and Selector operations.
1735 */
1736 private void tryFinishClose() {
1737 try {
|