97 // Input/Output closed
98 private volatile boolean isInputClosed;
99 private volatile boolean isOutputClosed;
100
101 // Connection reset protected by readLock
102 private boolean connectionReset;
103
104 // -- The following fields are protected by stateLock
105
106 // set true when exclusive binding is on and SO_REUSEADDR is emulated
107 private boolean isReuseAddress;
108
109 // State, increases monotonically
110 private static final int ST_UNCONNECTED = 0;
111 private static final int ST_CONNECTIONPENDING = 1;
112 private static final int ST_CONNECTED = 2;
113 private static final int ST_CLOSING = 3;
114 private static final int ST_CLOSED = 4;
115 private volatile int state; // need stateLock to change
116
117 // IDs of native threads doing reads and writes, for signalling
118 private long readerThread;
119 private long writerThread;
120
121 // Binding
122 private SocketAddress localAddress;
123 private SocketAddress remoteAddress;
124
125 // Socket adaptor, created on demand
126 private Socket socket;
127
128 // True if the channel's socket has been forced into non-blocking mode
129 // by a virtual thread. It cannot be reset. When the channel is in
130 // blocking mode and the channel's socket is in non-blocking mode then
131 // operations that don't complete immediately will poll the socket and
132 // preserve the semantics of blocking operations.
133 private volatile boolean forcedNonBlocking;
134
135 // -- End of fields protected by stateLock
136
137 SocketChannelImpl(SelectorProvider sp) throws IOException {
138 this(sp, Net.isIPv6Available() ? INET6 : INET);
139 }
352 if (isUnixSocket()) {
353 return DefaultOptionsHolder.defaultUnixOptions;
354 } else {
355 return DefaultOptionsHolder.defaultInetOptions;
356 }
357 }
358
359 /**
360 * Marks the beginning of a read operation that might block.
361 *
362 * @throws ClosedChannelException if blocking and the channel is closed
363 */
364 private void beginRead(boolean blocking) throws ClosedChannelException {
365 if (blocking) {
366 // set hook for Thread.interrupt
367 begin();
368
369 synchronized (stateLock) {
370 ensureOpen();
371 // record thread so it can be signalled if needed
372 readerThread = NativeThread.current();
373 }
374 }
375 }
376
377 /**
378 * Marks the end of a read operation that may have blocked.
379 *
380 * @throws AsynchronousCloseException if the channel was closed due to this
381 * thread being interrupted on a blocking read operation.
382 */
383 private void endRead(boolean blocking, boolean completed)
384 throws AsynchronousCloseException
385 {
386 if (blocking) {
387 synchronized (stateLock) {
388 readerThread = 0;
389 if (state == ST_CLOSING) {
390 tryFinishClose();
391 }
392 }
393 // remove hook for Thread.interrupt
394 end(completed);
395 }
396 }
397
398 private void throwConnectionReset() throws SocketException {
399 throw new SocketException("Connection reset");
400 }
401
402 private int implRead(ByteBuffer buf) throws IOException {
403 Objects.requireNonNull(buf);
404
405 readLock.lock();
406 try {
407 ensureOpenAndConnected();
408 boolean blocking = isBlocking();
506 long nbytes = implRead(dsts, offset, length);
507 SocketReadEvent.offer(start, nbytes, remoteAddress(), 0);
508 return nbytes;
509 }
510
511 /**
512 * Marks the beginning of a write operation that might block.
513 *
514 * @throws ClosedChannelException if blocking and the channel is closed
515 */
516 private void beginWrite(boolean blocking) throws ClosedChannelException {
517 if (blocking) {
518 // set hook for Thread.interrupt
519 begin();
520
521 synchronized (stateLock) {
522 ensureOpen();
523 if (isOutputClosed)
524 throw new ClosedChannelException();
525 // record thread so it can be signalled if needed
526 writerThread = NativeThread.current();
527 }
528 }
529 }
530
531 /**
532 * Marks the end of a write operation that may have blocked.
533 *
534 * @throws AsynchronousCloseException if the channel was closed due to this
535 * thread being interrupted on a blocking write operation.
536 */
537 private void endWrite(boolean blocking, boolean completed)
538 throws AsynchronousCloseException
539 {
540 if (blocking) {
541 synchronized (stateLock) {
542 writerThread = 0;
543 if (state == ST_CLOSING) {
544 tryFinishClose();
545 }
546 }
547 // remove hook for Thread.interrupt
548 end(completed);
549 }
550 }
551
552 private int implWrite(ByteBuffer buf) throws IOException {
553 Objects.requireNonNull(buf);
554 writeLock.lock();
555 try {
556 ensureOpenAndConnected();
557 boolean blocking = isBlocking();
558 int n = 0;
559 try {
560 beginWrite(blocking);
561 configureSocketNonBlockingIfVirtualThread();
562 n = IOUtil.write(fd, buf, -1, nd);
657 }
658 return IOStatus.normalize(n);
659 } finally {
660 writeLock.unlock();
661 }
662 }
663
664 /**
665 * Marks the beginning of a transfer to this channel.
666 * @throws ClosedChannelException if channel is closed or the output is shutdown
667 * @throws NotYetConnectedException if open and not connected
668 */
669 void beforeTransferTo() throws ClosedChannelException {
670 boolean completed = false;
671 writeLock.lock();
672 try {
673 synchronized (stateLock) {
674 ensureOpenAndConnected();
675 if (isOutputClosed)
676 throw new ClosedChannelException();
677 writerThread = NativeThread.current();
678 completed = true;
679 }
680 } finally {
681 if (!completed) {
682 writeLock.unlock();
683 }
684 }
685 }
686
687 /**
688 * Marks the end of a transfer to this channel.
689 * @throws AsynchronousCloseException if not completed and the channel is closed
690 */
691 void afterTransferTo(boolean completed) throws AsynchronousCloseException {
692 synchronized (stateLock) {
693 writerThread = 0;
694 if (state == ST_CLOSING) {
695 tryFinishClose();
696 }
697 }
698 writeLock.unlock();
699 if (!completed && !isOpen()) {
700 throw new AsynchronousCloseException();
701 }
702 }
703
704 @Override
705 protected void implConfigureBlocking(boolean block) throws IOException {
706 readLock.lock();
707 try {
708 writeLock.lock();
709 try {
710 lockedConfigureBlocking(block);
711 } finally {
712 writeLock.unlock();
713 }
860 begin();
861 }
862 synchronized (stateLock) {
863 ensureOpen();
864 int state = this.state;
865 if (state == ST_CONNECTED)
866 throw new AlreadyConnectedException();
867 if (state == ST_CONNECTIONPENDING)
868 throw new ConnectionPendingException();
869 assert state == ST_UNCONNECTED;
870 this.state = ST_CONNECTIONPENDING;
871
872 if (isNetSocket() && (localAddress == null)) {
873 InetSocketAddress isa = (InetSocketAddress) sa;
874 NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
875 }
876 remoteAddress = sa;
877
878 if (blocking) {
879 // record thread so it can be signalled if needed
880 readerThread = NativeThread.current();
881 }
882 }
883 }
884
885 /**
886 * Marks the end of a connect operation that may have blocked.
887 *
888 * @throws AsynchronousCloseException if the channel was closed due to this
889 * thread being interrupted on a blocking connect operation.
890 * @throws IOException if completed and unable to obtain the local address
891 */
892 private void endConnect(boolean blocking, boolean completed)
893 throws IOException
894 {
895 endRead(blocking, completed);
896
897 if (completed) {
898 synchronized (stateLock) {
899 if (state == ST_CONNECTIONPENDING) {
900 if (isUnixSocket()) {
979 }
980 }
981
982 /**
983 * Marks the beginning of a finishConnect operation that might block.
984 *
985 * @throws ClosedChannelException if the channel is closed
986 * @throws NoConnectionPendingException if no connection is pending
987 */
988 private void beginFinishConnect(boolean blocking) throws ClosedChannelException {
989 if (blocking) {
990 // set hook for Thread.interrupt
991 begin();
992 }
993 synchronized (stateLock) {
994 ensureOpen();
995 if (state != ST_CONNECTIONPENDING)
996 throw new NoConnectionPendingException();
997 if (blocking) {
998 // record thread so it can be signalled if needed
999 readerThread = NativeThread.current();
1000 }
1001 }
1002 }
1003
1004 /**
1005 * Marks the end of a finishConnect operation that may have blocked.
1006 *
1007 * @throws AsynchronousCloseException if the channel was closed due to this
1008 * thread being interrupted on a blocking connect operation.
1009 * @throws IOException if completed and unable to obtain the local address
1010 */
1011 private void endFinishConnect(boolean blocking, boolean completed)
1012 throws IOException
1013 {
1014 endRead(blocking, completed);
1015
1016 if (completed) {
1017 synchronized (stateLock) {
1018 if (state == ST_CONNECTIONPENDING) {
1019 if (isUnixSocket()) {
1058 return connected;
1059 } finally {
1060 writeLock.unlock();
1061 }
1062 } finally {
1063 readLock.unlock();
1064 }
1065 } catch (IOException ioe) {
1066 // connect failed, close the channel
1067 close();
1068 throw Exceptions.ioException(ioe, remoteAddress);
1069 }
1070 }
1071
1072 /**
1073 * Closes the socket if there are no I/O operations in progress (or no I/O
1074 * operations tracked), and the channel is not registered with a Selector.
1075 */
1076 private boolean tryClose() throws IOException {
1077 assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
1078 if ((readerThread == 0) && (writerThread == 0) && !isRegistered()) {
1079 state = ST_CLOSED;
1080 nd.close(fd);
1081 return true;
1082 } else {
1083 return false;
1084 }
1085 }
1086
1087 /**
1088 * Invokes tryClose to attempt to close the socket.
1089 *
1090 * This method is used for deferred closing by I/O and Selector operations.
1091 */
1092 private void tryFinishClose() {
1093 try {
1094 tryClose();
1095 } catch (IOException ignore) { }
1096 }
1097
1098 /**
1201 // wait for any read/write operations to complete before trying to close
1202 readLock.lock();
1203 readLock.unlock();
1204 writeLock.lock();
1205 writeLock.unlock();
1206 synchronized (stateLock) {
1207 if (state == ST_CLOSING) {
1208 tryFinishClose();
1209 }
1210 }
1211 }
1212
1213 @Override
1214 public SocketChannel shutdownInput() throws IOException {
1215 synchronized (stateLock) {
1216 ensureOpen();
1217 if (!isConnected())
1218 throw new NotYetConnectedException();
1219 if (!isInputClosed) {
1220 Net.shutdown(fd, Net.SHUT_RD);
1221 long reader = readerThread;
1222 if (NativeThread.isVirtualThread(reader)) {
1223 Poller.stopPoll(fdVal, Net.POLLIN);
1224 } else if (NativeThread.isNativeThread(reader)) {
1225 NativeThread.signal(reader);
1226 }
1227 isInputClosed = true;
1228 }
1229 return this;
1230 }
1231 }
1232
1233 @Override
1234 public SocketChannel shutdownOutput() throws IOException {
1235 synchronized (stateLock) {
1236 ensureOpen();
1237 if (!isConnected())
1238 throw new NotYetConnectedException();
1239 if (!isOutputClosed) {
1240 Net.shutdown(fd, Net.SHUT_WR);
1241 long writer = writerThread;
1242 if (NativeThread.isVirtualThread(writer)) {
1243 Poller.stopPoll(fdVal, Net.POLLOUT);
1244 } else if (NativeThread.isNativeThread(writer)) {
1245 NativeThread.signal(writer);
1246 }
1247 isOutputClosed = true;
1248 }
1249 return this;
1250 }
1251 }
1252
1253 boolean isInputOpen() {
1254 return !isInputClosed;
1255 }
1256
1257 boolean isOutputOpen() {
1258 return !isOutputClosed;
1259 }
1260
1261 /**
1262 * Waits for a connection attempt to finish with a timeout
1263 * @throws SocketTimeoutException if the connect timeout elapses
1264 */
1265 private boolean finishTimedConnect(long nanos) throws IOException {
|
97 // Input/Output closed
98 private volatile boolean isInputClosed;
99 private volatile boolean isOutputClosed;
100
101 // Connection reset protected by readLock
102 private boolean connectionReset;
103
104 // -- The following fields are protected by stateLock
105
106 // set true when exclusive binding is on and SO_REUSEADDR is emulated
107 private boolean isReuseAddress;
108
109 // State, increases monotonically
110 private static final int ST_UNCONNECTED = 0;
111 private static final int ST_CONNECTIONPENDING = 1;
112 private static final int ST_CONNECTED = 2;
113 private static final int ST_CLOSING = 3;
114 private static final int ST_CLOSED = 4;
115 private volatile int state; // need stateLock to change
116
117 // Threads doing reads and writes, for signalling
118 private Thread readerThread;
119 private Thread writerThread;
120
121 // Binding
122 private SocketAddress localAddress;
123 private SocketAddress remoteAddress;
124
125 // Socket adaptor, created on demand
126 private Socket socket;
127
128 // True if the channel's socket has been forced into non-blocking mode
129 // by a virtual thread. It cannot be reset. When the channel is in
130 // blocking mode and the channel's socket is in non-blocking mode then
131 // operations that don't complete immediately will poll the socket and
132 // preserve the semantics of blocking operations.
133 private volatile boolean forcedNonBlocking;
134
135 // -- End of fields protected by stateLock
136
137 SocketChannelImpl(SelectorProvider sp) throws IOException {
138 this(sp, Net.isIPv6Available() ? INET6 : INET);
139 }
352 if (isUnixSocket()) {
353 return DefaultOptionsHolder.defaultUnixOptions;
354 } else {
355 return DefaultOptionsHolder.defaultInetOptions;
356 }
357 }
358
359 /**
360 * Marks the beginning of a read operation that might block.
361 *
362 * @throws ClosedChannelException if blocking and the channel is closed
363 */
364 private void beginRead(boolean blocking) throws ClosedChannelException {
365 if (blocking) {
366 // set hook for Thread.interrupt
367 begin();
368
369 synchronized (stateLock) {
370 ensureOpen();
371 // record thread so it can be signalled if needed
372 readerThread = NativeThread.threadToSignal();
373 }
374 }
375 }
376
377 /**
378 * Marks the end of a read operation that may have blocked.
379 *
380 * @throws AsynchronousCloseException if the channel was closed due to this
381 * thread being interrupted on a blocking read operation.
382 */
383 private void endRead(boolean blocking, boolean completed)
384 throws AsynchronousCloseException
385 {
386 if (blocking) {
387 synchronized (stateLock) {
388 readerThread = null;
389 if (state == ST_CLOSING) {
390 tryFinishClose();
391 }
392 }
393 // remove hook for Thread.interrupt
394 end(completed);
395 }
396 }
397
398 private void throwConnectionReset() throws SocketException {
399 throw new SocketException("Connection reset");
400 }
401
402 private int implRead(ByteBuffer buf) throws IOException {
403 Objects.requireNonNull(buf);
404
405 readLock.lock();
406 try {
407 ensureOpenAndConnected();
408 boolean blocking = isBlocking();
506 long nbytes = implRead(dsts, offset, length);
507 SocketReadEvent.offer(start, nbytes, remoteAddress(), 0);
508 return nbytes;
509 }
510
511 /**
512 * Marks the beginning of a write operation that might block.
513 *
514 * @throws ClosedChannelException if blocking and the channel is closed
515 */
516 private void beginWrite(boolean blocking) throws ClosedChannelException {
517 if (blocking) {
518 // set hook for Thread.interrupt
519 begin();
520
521 synchronized (stateLock) {
522 ensureOpen();
523 if (isOutputClosed)
524 throw new ClosedChannelException();
525 // record thread so it can be signalled if needed
526 writerThread = NativeThread.threadToSignal();
527 }
528 }
529 }
530
531 /**
532 * Marks the end of a write operation that may have blocked.
533 *
534 * @throws AsynchronousCloseException if the channel was closed due to this
535 * thread being interrupted on a blocking write operation.
536 */
537 private void endWrite(boolean blocking, boolean completed)
538 throws AsynchronousCloseException
539 {
540 if (blocking) {
541 synchronized (stateLock) {
542 writerThread = null;
543 if (state == ST_CLOSING) {
544 tryFinishClose();
545 }
546 }
547 // remove hook for Thread.interrupt
548 end(completed);
549 }
550 }
551
552 private int implWrite(ByteBuffer buf) throws IOException {
553 Objects.requireNonNull(buf);
554 writeLock.lock();
555 try {
556 ensureOpenAndConnected();
557 boolean blocking = isBlocking();
558 int n = 0;
559 try {
560 beginWrite(blocking);
561 configureSocketNonBlockingIfVirtualThread();
562 n = IOUtil.write(fd, buf, -1, nd);
657 }
658 return IOStatus.normalize(n);
659 } finally {
660 writeLock.unlock();
661 }
662 }
663
664 /**
665 * Marks the beginning of a transfer to this channel.
666 * @throws ClosedChannelException if channel is closed or the output is shutdown
667 * @throws NotYetConnectedException if open and not connected
668 */
669 void beforeTransferTo() throws ClosedChannelException {
670 boolean completed = false;
671 writeLock.lock();
672 try {
673 synchronized (stateLock) {
674 ensureOpenAndConnected();
675 if (isOutputClosed)
676 throw new ClosedChannelException();
677 writerThread = NativeThread.threadToSignal();
678 completed = true;
679 }
680 } finally {
681 if (!completed) {
682 writeLock.unlock();
683 }
684 }
685 }
686
687 /**
688 * Marks the end of a transfer to this channel.
689 * @throws AsynchronousCloseException if not completed and the channel is closed
690 */
691 void afterTransferTo(boolean completed) throws AsynchronousCloseException {
692 synchronized (stateLock) {
693 writerThread = null;
694 if (state == ST_CLOSING) {
695 tryFinishClose();
696 }
697 }
698 writeLock.unlock();
699 if (!completed && !isOpen()) {
700 throw new AsynchronousCloseException();
701 }
702 }
703
704 @Override
705 protected void implConfigureBlocking(boolean block) throws IOException {
706 readLock.lock();
707 try {
708 writeLock.lock();
709 try {
710 lockedConfigureBlocking(block);
711 } finally {
712 writeLock.unlock();
713 }
860 begin();
861 }
862 synchronized (stateLock) {
863 ensureOpen();
864 int state = this.state;
865 if (state == ST_CONNECTED)
866 throw new AlreadyConnectedException();
867 if (state == ST_CONNECTIONPENDING)
868 throw new ConnectionPendingException();
869 assert state == ST_UNCONNECTED;
870 this.state = ST_CONNECTIONPENDING;
871
872 if (isNetSocket() && (localAddress == null)) {
873 InetSocketAddress isa = (InetSocketAddress) sa;
874 NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
875 }
876 remoteAddress = sa;
877
878 if (blocking) {
879 // record thread so it can be signalled if needed
880 readerThread = NativeThread.threadToSignal();
881 }
882 }
883 }
884
885 /**
886 * Marks the end of a connect operation that may have blocked.
887 *
888 * @throws AsynchronousCloseException if the channel was closed due to this
889 * thread being interrupted on a blocking connect operation.
890 * @throws IOException if completed and unable to obtain the local address
891 */
892 private void endConnect(boolean blocking, boolean completed)
893 throws IOException
894 {
895 endRead(blocking, completed);
896
897 if (completed) {
898 synchronized (stateLock) {
899 if (state == ST_CONNECTIONPENDING) {
900 if (isUnixSocket()) {
979 }
980 }
981
982 /**
983 * Marks the beginning of a finishConnect operation that might block.
984 *
985 * @throws ClosedChannelException if the channel is closed
986 * @throws NoConnectionPendingException if no connection is pending
987 */
988 private void beginFinishConnect(boolean blocking) throws ClosedChannelException {
989 if (blocking) {
990 // set hook for Thread.interrupt
991 begin();
992 }
993 synchronized (stateLock) {
994 ensureOpen();
995 if (state != ST_CONNECTIONPENDING)
996 throw new NoConnectionPendingException();
997 if (blocking) {
998 // record thread so it can be signalled if needed
999 readerThread = NativeThread.threadToSignal();
1000 }
1001 }
1002 }
1003
1004 /**
1005 * Marks the end of a finishConnect operation that may have blocked.
1006 *
1007 * @throws AsynchronousCloseException if the channel was closed due to this
1008 * thread being interrupted on a blocking connect operation.
1009 * @throws IOException if completed and unable to obtain the local address
1010 */
1011 private void endFinishConnect(boolean blocking, boolean completed)
1012 throws IOException
1013 {
1014 endRead(blocking, completed);
1015
1016 if (completed) {
1017 synchronized (stateLock) {
1018 if (state == ST_CONNECTIONPENDING) {
1019 if (isUnixSocket()) {
1058 return connected;
1059 } finally {
1060 writeLock.unlock();
1061 }
1062 } finally {
1063 readLock.unlock();
1064 }
1065 } catch (IOException ioe) {
1066 // connect failed, close the channel
1067 close();
1068 throw Exceptions.ioException(ioe, remoteAddress);
1069 }
1070 }
1071
1072 /**
1073 * Closes the socket if there are no I/O operations in progress (or no I/O
1074 * operations tracked), and the channel is not registered with a Selector.
1075 */
1076 private boolean tryClose() throws IOException {
1077 assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
1078 if ((readerThread == null) && (writerThread == null) && !isRegistered()) {
1079 state = ST_CLOSED;
1080 nd.close(fd);
1081 return true;
1082 } else {
1083 return false;
1084 }
1085 }
1086
1087 /**
1088 * Invokes tryClose to attempt to close the socket.
1089 *
1090 * This method is used for deferred closing by I/O and Selector operations.
1091 */
1092 private void tryFinishClose() {
1093 try {
1094 tryClose();
1095 } catch (IOException ignore) { }
1096 }
1097
1098 /**
1201 // wait for any read/write operations to complete before trying to close
1202 readLock.lock();
1203 readLock.unlock();
1204 writeLock.lock();
1205 writeLock.unlock();
1206 synchronized (stateLock) {
1207 if (state == ST_CLOSING) {
1208 tryFinishClose();
1209 }
1210 }
1211 }
1212
1213 @Override
1214 public SocketChannel shutdownInput() throws IOException {
1215 synchronized (stateLock) {
1216 ensureOpen();
1217 if (!isConnected())
1218 throw new NotYetConnectedException();
1219 if (!isInputClosed) {
1220 Net.shutdown(fd, Net.SHUT_RD);
1221 if (readerThread != null && readerThread.isVirtual()) {
1222 Poller.stopPoll(readerThread);
1223 }
1224 isInputClosed = true;
1225 }
1226 return this;
1227 }
1228 }
1229
1230 @Override
1231 public SocketChannel shutdownOutput() throws IOException {
1232 synchronized (stateLock) {
1233 ensureOpen();
1234 if (!isConnected())
1235 throw new NotYetConnectedException();
1236 if (!isOutputClosed) {
1237 Net.shutdown(fd, Net.SHUT_WR);
1238 if (writerThread != null && writerThread.isVirtual()) {
1239 Poller.stopPoll(writerThread);
1240 }
1241 isOutputClosed = true;
1242 }
1243 return this;
1244 }
1245 }
1246
1247 boolean isInputOpen() {
1248 return !isInputClosed;
1249 }
1250
1251 boolean isOutputOpen() {
1252 return !isOutputClosed;
1253 }
1254
1255 /**
1256 * Waits for a connection attempt to finish with a timeout
1257 * @throws SocketTimeoutException if the connect timeout elapses
1258 */
1259 private boolean finishTimedConnect(long nanos) throws IOException {
|