96 // Input/Output closed
97 private volatile boolean isInputClosed;
98 private volatile boolean isOutputClosed;
99
100 // Connection reset protected by readLock
101 private boolean connectionReset;
102
103 // -- The following fields are protected by stateLock
104
105 // set true when exclusive binding is on and SO_REUSEADDR is emulated
106 private boolean isReuseAddress;
107
108 // State, increases monotonically
109 private static final int ST_UNCONNECTED = 0;
110 private static final int ST_CONNECTIONPENDING = 1;
111 private static final int ST_CONNECTED = 2;
112 private static final int ST_CLOSING = 3;
113 private static final int ST_CLOSED = 4;
114 private volatile int state; // need stateLock to change
115
116 // IDs of native threads doing reads and writes, for signalling
117 private long readerThread;
118 private long writerThread;
119
120 // Binding
121 private SocketAddress localAddress;
122 private SocketAddress remoteAddress;
123
124 // Socket adaptor, created on demand
125 private Socket socket;
126
127 // True if the channel's socket has been forced into non-blocking mode
128 // by a virtual thread. It cannot be reset. When the channel is in
129 // blocking mode and the channel's socket is in non-blocking mode then
130 // operations that don't complete immediately will poll the socket and
131 // preserve the semantics of blocking operations.
132 private volatile boolean forcedNonBlocking;
133
134 // -- End of fields protected by stateLock
135
136 SocketChannelImpl(SelectorProvider sp) throws IOException {
137 this(sp, Net.isIPv6Available() ? INET6 : INET);
138 }
351 if (isUnixSocket()) {
352 return DefaultOptionsHolder.defaultUnixOptions;
353 } else {
354 return DefaultOptionsHolder.defaultInetOptions;
355 }
356 }
357
358 /**
359 * Marks the beginning of a read operation that might block.
360 *
361 * @throws ClosedChannelException if blocking and the channel is closed
362 */
363 private void beginRead(boolean blocking) throws ClosedChannelException {
364 if (blocking) {
365 // set hook for Thread.interrupt
366 begin();
367
368 synchronized (stateLock) {
369 ensureOpen();
370 // record thread so it can be signalled if needed
371 readerThread = NativeThread.current();
372 }
373 }
374 }
375
376 /**
377 * Marks the end of a read operation that may have blocked.
378 *
379 * @throws AsynchronousCloseException if the channel was closed due to this
380 * thread being interrupted on a blocking read operation.
381 */
382 private void endRead(boolean blocking, boolean completed)
383 throws AsynchronousCloseException
384 {
385 if (blocking) {
386 synchronized (stateLock) {
387 readerThread = 0;
388 if (state == ST_CLOSING) {
389 tryFinishClose();
390 }
391 }
392 // remove hook for Thread.interrupt
393 end(completed);
394 }
395 }
396
397 private void throwConnectionReset() throws SocketException {
398 throw new SocketException("Connection reset");
399 }
400
401 private int implRead(ByteBuffer buf) throws IOException {
402 Objects.requireNonNull(buf);
403
404 readLock.lock();
405 try {
406 ensureOpenAndConnected();
407 boolean blocking = isBlocking();
505 long nbytes = implRead(dsts, offset, length);
506 SocketReadEvent.offer(start, nbytes, remoteAddress(), 0);
507 return nbytes;
508 }
509
510 /**
511 * Marks the beginning of a write operation that might block.
512 *
513 * @throws ClosedChannelException if blocking and the channel is closed
514 */
515 private void beginWrite(boolean blocking) throws ClosedChannelException {
516 if (blocking) {
517 // set hook for Thread.interrupt
518 begin();
519
520 synchronized (stateLock) {
521 ensureOpen();
522 if (isOutputClosed)
523 throw new ClosedChannelException();
524 // record thread so it can be signalled if needed
525 writerThread = NativeThread.current();
526 }
527 }
528 }
529
530 /**
531 * Marks the end of a write operation that may have blocked.
532 *
533 * @throws AsynchronousCloseException if the channel was closed due to this
534 * thread being interrupted on a blocking write operation.
535 */
536 private void endWrite(boolean blocking, boolean completed)
537 throws AsynchronousCloseException
538 {
539 if (blocking) {
540 synchronized (stateLock) {
541 writerThread = 0;
542 if (state == ST_CLOSING) {
543 tryFinishClose();
544 }
545 }
546 // remove hook for Thread.interrupt
547 end(completed);
548 }
549 }
550
551 private int implWrite(ByteBuffer buf) throws IOException {
552 Objects.requireNonNull(buf);
553 writeLock.lock();
554 try {
555 ensureOpenAndConnected();
556 boolean blocking = isBlocking();
557 int n = 0;
558 try {
559 beginWrite(blocking);
560 configureSocketNonBlockingIfVirtualThread();
561 n = IOUtil.write(fd, buf, -1, nd);
656 }
657 return IOStatus.normalize(n);
658 } finally {
659 writeLock.unlock();
660 }
661 }
662
663 /**
664 * Marks the beginning of a transfer to this channel.
665 * @throws ClosedChannelException if channel is closed or the output is shutdown
666 * @throws NotYetConnectedException if open and not connected
667 */
668 void beforeTransferTo() throws ClosedChannelException {
669 boolean completed = false;
670 writeLock.lock();
671 try {
672 synchronized (stateLock) {
673 ensureOpenAndConnected();
674 if (isOutputClosed)
675 throw new ClosedChannelException();
676 writerThread = NativeThread.current();
677 completed = true;
678 }
679 } finally {
680 if (!completed) {
681 writeLock.unlock();
682 }
683 }
684 }
685
686 /**
687 * Marks the end of a transfer to this channel.
688 * @throws AsynchronousCloseException if not completed and the channel is closed
689 */
690 void afterTransferTo(boolean completed) throws AsynchronousCloseException {
691 synchronized (stateLock) {
692 writerThread = 0;
693 if (state == ST_CLOSING) {
694 tryFinishClose();
695 }
696 }
697 writeLock.unlock();
698 if (!completed && !isOpen()) {
699 throw new AsynchronousCloseException();
700 }
701 }
702
703 @Override
704 protected void implConfigureBlocking(boolean block) throws IOException {
705 readLock.lock();
706 try {
707 writeLock.lock();
708 try {
709 lockedConfigureBlocking(block);
710 } finally {
711 writeLock.unlock();
712 }
857 // set hook for Thread.interrupt
858 begin();
859 }
860 synchronized (stateLock) {
861 ensureOpen();
862 int state = this.state;
863 if (state == ST_CONNECTED)
864 throw new AlreadyConnectedException();
865 if (state == ST_CONNECTIONPENDING)
866 throw new ConnectionPendingException();
867 assert state == ST_UNCONNECTED;
868 this.state = ST_CONNECTIONPENDING;
869
870 if (isNetSocket() && (localAddress == null)) {
871 InetSocketAddress isa = (InetSocketAddress) sa;
872 }
873 remoteAddress = sa;
874
875 if (blocking) {
876 // record thread so it can be signalled if needed
877 readerThread = NativeThread.current();
878 }
879 }
880 }
881
882 /**
883 * Marks the end of a connect operation that may have blocked.
884 *
885 * @throws AsynchronousCloseException if the channel was closed due to this
886 * thread being interrupted on a blocking connect operation.
887 * @throws IOException if completed and unable to obtain the local address
888 */
889 private void endConnect(boolean blocking, boolean completed)
890 throws IOException
891 {
892 endRead(blocking, completed);
893
894 if (completed) {
895 synchronized (stateLock) {
896 if (state == ST_CONNECTIONPENDING) {
897 if (isUnixSocket()) {
976 }
977 }
978
979 /**
980 * Marks the beginning of a finishConnect operation that might block.
981 *
982 * @throws ClosedChannelException if the channel is closed
983 * @throws NoConnectionPendingException if no connection is pending
984 */
985 private void beginFinishConnect(boolean blocking) throws ClosedChannelException {
986 if (blocking) {
987 // set hook for Thread.interrupt
988 begin();
989 }
990 synchronized (stateLock) {
991 ensureOpen();
992 if (state != ST_CONNECTIONPENDING)
993 throw new NoConnectionPendingException();
994 if (blocking) {
995 // record thread so it can be signalled if needed
996 readerThread = NativeThread.current();
997 }
998 }
999 }
1000
1001 /**
1002 * Marks the end of a finishConnect operation that may have blocked.
1003 *
1004 * @throws AsynchronousCloseException if the channel was closed due to this
1005 * thread being interrupted on a blocking connect operation.
1006 * @throws IOException if completed and unable to obtain the local address
1007 */
1008 private void endFinishConnect(boolean blocking, boolean completed)
1009 throws IOException
1010 {
1011 endRead(blocking, completed);
1012
1013 if (completed) {
1014 synchronized (stateLock) {
1015 if (state == ST_CONNECTIONPENDING) {
1016 if (isUnixSocket()) {
1055 return connected;
1056 } finally {
1057 writeLock.unlock();
1058 }
1059 } finally {
1060 readLock.unlock();
1061 }
1062 } catch (IOException ioe) {
1063 // connect failed, close the channel
1064 close();
1065 throw Exceptions.ioException(ioe, remoteAddress);
1066 }
1067 }
1068
1069 /**
1070 * Closes the socket if there are no I/O operations in progress (or no I/O
1071 * operations tracked), and the channel is not registered with a Selector.
1072 */
1073 private boolean tryClose() throws IOException {
1074 assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
1075 if ((readerThread == 0) && (writerThread == 0) && !isRegistered()) {
1076 state = ST_CLOSED;
1077 nd.close(fd);
1078 return true;
1079 } else {
1080 return false;
1081 }
1082 }
1083
1084 /**
1085 * Invokes tryClose to attempt to close the socket.
1086 *
1087 * This method is used for deferred closing by I/O and Selector operations.
1088 */
1089 private void tryFinishClose() {
1090 try {
1091 tryClose();
1092 } catch (IOException ignore) { }
1093 }
1094
1095 /**
1198 // wait for any read/write operations to complete before trying to close
1199 readLock.lock();
1200 readLock.unlock();
1201 writeLock.lock();
1202 writeLock.unlock();
1203 synchronized (stateLock) {
1204 if (state == ST_CLOSING) {
1205 tryFinishClose();
1206 }
1207 }
1208 }
1209
1210 @Override
1211 public SocketChannel shutdownInput() throws IOException {
1212 synchronized (stateLock) {
1213 ensureOpen();
1214 if (!isConnected())
1215 throw new NotYetConnectedException();
1216 if (!isInputClosed) {
1217 Net.shutdown(fd, Net.SHUT_RD);
1218 long reader = readerThread;
1219 if (NativeThread.isVirtualThread(reader)) {
1220 Poller.stopPoll(fdVal, Net.POLLIN);
1221 } else if (NativeThread.isNativeThread(reader)) {
1222 NativeThread.signal(reader);
1223 }
1224 isInputClosed = true;
1225 }
1226 return this;
1227 }
1228 }
1229
1230 @Override
1231 public SocketChannel shutdownOutput() throws IOException {
1232 synchronized (stateLock) {
1233 ensureOpen();
1234 if (!isConnected())
1235 throw new NotYetConnectedException();
1236 if (!isOutputClosed) {
1237 Net.shutdown(fd, Net.SHUT_WR);
1238 long writer = writerThread;
1239 if (NativeThread.isVirtualThread(writer)) {
1240 Poller.stopPoll(fdVal, Net.POLLOUT);
1241 } else if (NativeThread.isNativeThread(writer)) {
1242 NativeThread.signal(writer);
1243 }
1244 isOutputClosed = true;
1245 }
1246 return this;
1247 }
1248 }
1249
1250 boolean isInputOpen() {
1251 return !isInputClosed;
1252 }
1253
1254 boolean isOutputOpen() {
1255 return !isOutputClosed;
1256 }
1257
1258 /**
1259 * Waits for a connection attempt to finish with a timeout
1260 * @throws SocketTimeoutException if the connect timeout elapses
1261 */
1262 private boolean finishTimedConnect(long nanos) throws IOException {
|
96 // Input/Output closed
97 private volatile boolean isInputClosed;
98 private volatile boolean isOutputClosed;
99
100 // Connection reset protected by readLock
101 private boolean connectionReset;
102
103 // -- The following fields are protected by stateLock
104
105 // set true when exclusive binding is on and SO_REUSEADDR is emulated
106 private boolean isReuseAddress;
107
108 // State, increases monotonically
109 private static final int ST_UNCONNECTED = 0;
110 private static final int ST_CONNECTIONPENDING = 1;
111 private static final int ST_CONNECTED = 2;
112 private static final int ST_CLOSING = 3;
113 private static final int ST_CLOSED = 4;
114 private volatile int state; // need stateLock to change
115
116 // Threads doing reads and writes, for signalling
117 private Thread readerThread;
118 private Thread writerThread;
119
120 // Binding
121 private SocketAddress localAddress;
122 private SocketAddress remoteAddress;
123
124 // Socket adaptor, created on demand
125 private Socket socket;
126
127 // True if the channel's socket has been forced into non-blocking mode
128 // by a virtual thread. It cannot be reset. When the channel is in
129 // blocking mode and the channel's socket is in non-blocking mode then
130 // operations that don't complete immediately will poll the socket and
131 // preserve the semantics of blocking operations.
132 private volatile boolean forcedNonBlocking;
133
134 // -- End of fields protected by stateLock
135
136 SocketChannelImpl(SelectorProvider sp) throws IOException {
137 this(sp, Net.isIPv6Available() ? INET6 : INET);
138 }
351 if (isUnixSocket()) {
352 return DefaultOptionsHolder.defaultUnixOptions;
353 } else {
354 return DefaultOptionsHolder.defaultInetOptions;
355 }
356 }
357
358 /**
359 * Marks the beginning of a read operation that might block.
360 *
361 * @throws ClosedChannelException if blocking and the channel is closed
362 */
363 private void beginRead(boolean blocking) throws ClosedChannelException {
364 if (blocking) {
365 // set hook for Thread.interrupt
366 begin();
367
368 synchronized (stateLock) {
369 ensureOpen();
370 // record thread so it can be signalled if needed
371 readerThread = NativeThread.threadToSignal();
372 }
373 }
374 }
375
376 /**
377 * Marks the end of a read operation that may have blocked.
378 *
379 * @throws AsynchronousCloseException if the channel was closed due to this
380 * thread being interrupted on a blocking read operation.
381 */
382 private void endRead(boolean blocking, boolean completed)
383 throws AsynchronousCloseException
384 {
385 if (blocking) {
386 synchronized (stateLock) {
387 readerThread = null;
388 if (state == ST_CLOSING) {
389 tryFinishClose();
390 }
391 }
392 // remove hook for Thread.interrupt
393 end(completed);
394 }
395 }
396
397 private void throwConnectionReset() throws SocketException {
398 throw new SocketException("Connection reset");
399 }
400
401 private int implRead(ByteBuffer buf) throws IOException {
402 Objects.requireNonNull(buf);
403
404 readLock.lock();
405 try {
406 ensureOpenAndConnected();
407 boolean blocking = isBlocking();
505 long nbytes = implRead(dsts, offset, length);
506 SocketReadEvent.offer(start, nbytes, remoteAddress(), 0);
507 return nbytes;
508 }
509
510 /**
511 * Marks the beginning of a write operation that might block.
512 *
513 * @throws ClosedChannelException if blocking and the channel is closed
514 */
515 private void beginWrite(boolean blocking) throws ClosedChannelException {
516 if (blocking) {
517 // set hook for Thread.interrupt
518 begin();
519
520 synchronized (stateLock) {
521 ensureOpen();
522 if (isOutputClosed)
523 throw new ClosedChannelException();
524 // record thread so it can be signalled if needed
525 writerThread = NativeThread.threadToSignal();
526 }
527 }
528 }
529
530 /**
531 * Marks the end of a write operation that may have blocked.
532 *
533 * @throws AsynchronousCloseException if the channel was closed due to this
534 * thread being interrupted on a blocking write operation.
535 */
536 private void endWrite(boolean blocking, boolean completed)
537 throws AsynchronousCloseException
538 {
539 if (blocking) {
540 synchronized (stateLock) {
541 writerThread = null;
542 if (state == ST_CLOSING) {
543 tryFinishClose();
544 }
545 }
546 // remove hook for Thread.interrupt
547 end(completed);
548 }
549 }
550
551 private int implWrite(ByteBuffer buf) throws IOException {
552 Objects.requireNonNull(buf);
553 writeLock.lock();
554 try {
555 ensureOpenAndConnected();
556 boolean blocking = isBlocking();
557 int n = 0;
558 try {
559 beginWrite(blocking);
560 configureSocketNonBlockingIfVirtualThread();
561 n = IOUtil.write(fd, buf, -1, nd);
656 }
657 return IOStatus.normalize(n);
658 } finally {
659 writeLock.unlock();
660 }
661 }
662
663 /**
664 * Marks the beginning of a transfer to this channel.
665 * @throws ClosedChannelException if channel is closed or the output is shutdown
666 * @throws NotYetConnectedException if open and not connected
667 */
668 void beforeTransferTo() throws ClosedChannelException {
669 boolean completed = false;
670 writeLock.lock();
671 try {
672 synchronized (stateLock) {
673 ensureOpenAndConnected();
674 if (isOutputClosed)
675 throw new ClosedChannelException();
676 writerThread = NativeThread.threadToSignal();
677 completed = true;
678 }
679 } finally {
680 if (!completed) {
681 writeLock.unlock();
682 }
683 }
684 }
685
686 /**
687 * Marks the end of a transfer to this channel.
688 * @throws AsynchronousCloseException if not completed and the channel is closed
689 */
690 void afterTransferTo(boolean completed) throws AsynchronousCloseException {
691 synchronized (stateLock) {
692 writerThread = null;
693 if (state == ST_CLOSING) {
694 tryFinishClose();
695 }
696 }
697 writeLock.unlock();
698 if (!completed && !isOpen()) {
699 throw new AsynchronousCloseException();
700 }
701 }
702
703 @Override
704 protected void implConfigureBlocking(boolean block) throws IOException {
705 readLock.lock();
706 try {
707 writeLock.lock();
708 try {
709 lockedConfigureBlocking(block);
710 } finally {
711 writeLock.unlock();
712 }
857 // set hook for Thread.interrupt
858 begin();
859 }
860 synchronized (stateLock) {
861 ensureOpen();
862 int state = this.state;
863 if (state == ST_CONNECTED)
864 throw new AlreadyConnectedException();
865 if (state == ST_CONNECTIONPENDING)
866 throw new ConnectionPendingException();
867 assert state == ST_UNCONNECTED;
868 this.state = ST_CONNECTIONPENDING;
869
870 if (isNetSocket() && (localAddress == null)) {
871 InetSocketAddress isa = (InetSocketAddress) sa;
872 }
873 remoteAddress = sa;
874
875 if (blocking) {
876 // record thread so it can be signalled if needed
877 readerThread = NativeThread.threadToSignal();
878 }
879 }
880 }
881
882 /**
883 * Marks the end of a connect operation that may have blocked.
884 *
885 * @throws AsynchronousCloseException if the channel was closed due to this
886 * thread being interrupted on a blocking connect operation.
887 * @throws IOException if completed and unable to obtain the local address
888 */
889 private void endConnect(boolean blocking, boolean completed)
890 throws IOException
891 {
892 endRead(blocking, completed);
893
894 if (completed) {
895 synchronized (stateLock) {
896 if (state == ST_CONNECTIONPENDING) {
897 if (isUnixSocket()) {
976 }
977 }
978
979 /**
980 * Marks the beginning of a finishConnect operation that might block.
981 *
982 * @throws ClosedChannelException if the channel is closed
983 * @throws NoConnectionPendingException if no connection is pending
984 */
985 private void beginFinishConnect(boolean blocking) throws ClosedChannelException {
986 if (blocking) {
987 // set hook for Thread.interrupt
988 begin();
989 }
990 synchronized (stateLock) {
991 ensureOpen();
992 if (state != ST_CONNECTIONPENDING)
993 throw new NoConnectionPendingException();
994 if (blocking) {
995 // record thread so it can be signalled if needed
996 readerThread = NativeThread.threadToSignal();
997 }
998 }
999 }
1000
1001 /**
1002 * Marks the end of a finishConnect operation that may have blocked.
1003 *
1004 * @throws AsynchronousCloseException if the channel was closed due to this
1005 * thread being interrupted on a blocking connect operation.
1006 * @throws IOException if completed and unable to obtain the local address
1007 */
1008 private void endFinishConnect(boolean blocking, boolean completed)
1009 throws IOException
1010 {
1011 endRead(blocking, completed);
1012
1013 if (completed) {
1014 synchronized (stateLock) {
1015 if (state == ST_CONNECTIONPENDING) {
1016 if (isUnixSocket()) {
1055 return connected;
1056 } finally {
1057 writeLock.unlock();
1058 }
1059 } finally {
1060 readLock.unlock();
1061 }
1062 } catch (IOException ioe) {
1063 // connect failed, close the channel
1064 close();
1065 throw Exceptions.ioException(ioe, remoteAddress);
1066 }
1067 }
1068
1069 /**
1070 * Closes the socket if there are no I/O operations in progress (or no I/O
1071 * operations tracked), and the channel is not registered with a Selector.
1072 */
1073 private boolean tryClose() throws IOException {
1074 assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
1075 if ((readerThread == null) && (writerThread == null) && !isRegistered()) {
1076 state = ST_CLOSED;
1077 nd.close(fd);
1078 return true;
1079 } else {
1080 return false;
1081 }
1082 }
1083
1084 /**
1085 * Invokes tryClose to attempt to close the socket.
1086 *
1087 * This method is used for deferred closing by I/O and Selector operations.
1088 */
1089 private void tryFinishClose() {
1090 try {
1091 tryClose();
1092 } catch (IOException ignore) { }
1093 }
1094
1095 /**
1198 // wait for any read/write operations to complete before trying to close
1199 readLock.lock();
1200 readLock.unlock();
1201 writeLock.lock();
1202 writeLock.unlock();
1203 synchronized (stateLock) {
1204 if (state == ST_CLOSING) {
1205 tryFinishClose();
1206 }
1207 }
1208 }
1209
1210 @Override
1211 public SocketChannel shutdownInput() throws IOException {
1212 synchronized (stateLock) {
1213 ensureOpen();
1214 if (!isConnected())
1215 throw new NotYetConnectedException();
1216 if (!isInputClosed) {
1217 Net.shutdown(fd, Net.SHUT_RD);
1218 if (readerThread != null && readerThread.isVirtual()) {
1219 Poller.stopPoll(readerThread);
1220 }
1221 isInputClosed = true;
1222 }
1223 return this;
1224 }
1225 }
1226
1227 @Override
1228 public SocketChannel shutdownOutput() throws IOException {
1229 synchronized (stateLock) {
1230 ensureOpen();
1231 if (!isConnected())
1232 throw new NotYetConnectedException();
1233 if (!isOutputClosed) {
1234 Net.shutdown(fd, Net.SHUT_WR);
1235 if (writerThread != null && writerThread.isVirtual()) {
1236 Poller.stopPoll(writerThread);
1237 }
1238 isOutputClosed = true;
1239 }
1240 return this;
1241 }
1242 }
1243
1244 boolean isInputOpen() {
1245 return !isInputClosed;
1246 }
1247
1248 boolean isOutputOpen() {
1249 return !isOutputClosed;
1250 }
1251
1252 /**
1253 * Waits for a connection attempt to finish with a timeout
1254 * @throws SocketTimeoutException if the connect timeout elapses
1255 */
1256 private boolean finishTimedConnect(long nanos) throws IOException {
|