< prev index next >

src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java

Print this page

  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.io.FileDescriptor;
  29 import java.io.IOException;

  30 import java.io.UncheckedIOException;
  31 import java.lang.invoke.MethodHandles;
  32 import java.lang.invoke.VarHandle;
  33 import java.lang.ref.Cleaner.Cleanable;
  34 import java.lang.reflect.Method;
  35 import java.net.DatagramSocket;
  36 import java.net.Inet4Address;
  37 import java.net.Inet6Address;
  38 import java.net.InetAddress;
  39 import java.net.InetSocketAddress;
  40 import java.net.NetworkInterface;
  41 import java.net.PortUnreachableException;
  42 import java.net.ProtocolFamily;
  43 import java.net.SocketAddress;
  44 import java.net.SocketException;
  45 import java.net.SocketOption;
  46 import java.net.SocketTimeoutException;
  47 import java.net.StandardProtocolFamily;
  48 import java.net.StandardSocketOptions;
  49 import java.nio.ByteBuffer;
  50 import java.nio.channels.AlreadyBoundException;
  51 import java.nio.channels.AlreadyConnectedException;
  52 import java.nio.channels.AsynchronousCloseException;
  53 import java.nio.channels.ClosedChannelException;
  54 import java.nio.channels.DatagramChannel;
  55 import java.nio.channels.IllegalBlockingModeException;
  56 import java.nio.channels.MembershipKey;
  57 import java.nio.channels.NotYetConnectedException;
  58 import java.nio.channels.SelectionKey;
  59 import java.nio.channels.spi.AbstractSelectableChannel;
  60 import java.nio.channels.spi.SelectorProvider;
  61 import java.security.AccessController;
  62 import java.security.PrivilegedExceptionAction;
  63 import java.util.Collections;
  64 import java.util.HashMap;
  65 import java.util.HashSet;
  66 import java.util.Map;
  67 import java.util.Objects;
  68 import java.util.Set;

  69 import java.util.concurrent.locks.ReentrantLock;
  70 import java.util.function.Consumer;
  71 

  72 import jdk.internal.ref.CleanerFactory;
  73 import sun.net.ResourceManager;
  74 import sun.net.ext.ExtendedSocketOptions;
  75 import sun.net.util.IPAddressUtil;
  76 
  77 /**
  78  * An implementation of DatagramChannels.
  79  */
  80 
  81 class DatagramChannelImpl
  82     extends DatagramChannel
  83     implements SelChImpl
  84 {
  85     // Used to make native read and write calls
  86     private static final NativeDispatcher nd = new DatagramDispatcher();
  87 
  88     // true if interruptible (can be false to emulate legacy DatagramSocket)
  89     private final boolean interruptible;
  90 
  91     // The protocol family of the socket

 142     private static final VarHandle SOCKET;
 143     static {
 144         try {
 145             MethodHandles.Lookup l = MethodHandles.lookup();
 146             SOCKET = l.findVarHandle(DatagramChannelImpl.class, "socket", DatagramSocket.class);
 147         } catch (Exception e) {
 148             throw new InternalError(e);
 149         }
 150     }
 151     private volatile DatagramSocket socket;
 152 
 153     // Multicast support
 154     private MembershipRegistry registry;
 155 
 156     // set true when socket is bound and SO_REUSEADDRESS is emulated
 157     private boolean reuseAddressEmulated;
 158 
 159     // set true/false when socket is already bound and SO_REUSEADDR is emulated
 160     private boolean isReuseAddress;
 161 



 162     // -- End of fields protected by stateLock
 163 
 164 
 165     DatagramChannelImpl(SelectorProvider sp, boolean interruptible) throws IOException {
 166         this(sp, (Net.isIPv6Available()
 167                 ? StandardProtocolFamily.INET6
 168                 : StandardProtocolFamily.INET),
 169                 interruptible);
 170     }
 171 
 172     DatagramChannelImpl(SelectorProvider sp, ProtocolFamily family, boolean interruptible)
 173         throws IOException
 174     {
 175         super(sp);
 176 
 177         Objects.requireNonNull(family, "'family' is null");
 178         if ((family != StandardProtocolFamily.INET) &&
 179                 (family != StandardProtocolFamily.INET6)) {
 180             throw new UnsupportedOperationException("Protocol family not supported");
 181         }

 453             set.add(StandardSocketOptions.SO_RCVBUF);
 454             set.add(StandardSocketOptions.SO_REUSEADDR);
 455             if (Net.isReusePortAvailable()) {
 456                 set.add(StandardSocketOptions.SO_REUSEPORT);
 457             }
 458             set.add(StandardSocketOptions.SO_BROADCAST);
 459             set.add(StandardSocketOptions.IP_TOS);
 460             set.add(StandardSocketOptions.IP_MULTICAST_IF);
 461             set.add(StandardSocketOptions.IP_MULTICAST_TTL);
 462             set.add(StandardSocketOptions.IP_MULTICAST_LOOP);
 463             set.addAll(ExtendedSocketOptions.datagramSocketOptions());
 464             return Collections.unmodifiableSet(set);
 465         }
 466     }
 467 
 468     @Override
 469     public final Set<SocketOption<?>> supportedOptions() {
 470         return DefaultOptionsHolder.defaultOptions;
 471     }
 472 






























 473     /**
 474      * Marks the beginning of a read operation that might block.
 475      *
 476      * @param blocking true if configured blocking
 477      * @param mustBeConnected true if the socket must be connected
 478      * @return remote address if connected
 479      * @throws ClosedChannelException if the channel is closed
 480      * @throws NotYetConnectedException if mustBeConnected and not connected
 481      * @throws IOException if socket not bound and cannot be bound
 482      */
 483     private SocketAddress beginRead(boolean blocking, boolean mustBeConnected)
 484         throws IOException
 485     {
 486         if (blocking && interruptible) {
 487             // set hook for Thread.interrupt
 488             begin();
 489         }
 490         SocketAddress remote;
 491         synchronized (stateLock) {
 492             ensureOpen();

 518             }
 519             if (interruptible) {
 520                 // remove hook for Thread.interrupt (may throw AsynchronousCloseException)
 521                 end(completed);
 522             } else if (!completed && !isOpen()) {
 523                 throw new AsynchronousCloseException();
 524             }
 525         }
 526     }
 527 
 528     @Override
 529     public SocketAddress receive(ByteBuffer dst) throws IOException {
 530         if (dst.isReadOnly())
 531             throw new IllegalArgumentException("Read-only buffer");
 532         readLock.lock();
 533         try {
 534             boolean blocking = isBlocking();
 535             SocketAddress sender = null;
 536             try {
 537                 SocketAddress remote = beginRead(blocking, false);

 538                 boolean connected = (remote != null);
 539                 @SuppressWarnings("removal")
 540                 SecurityManager sm = System.getSecurityManager();
 541                 if (connected || (sm == null)) {
 542                     // connected or no security manager
 543                     int n = receive(dst, connected);
 544                     if (blocking) {
 545                         while (IOStatus.okayToRetry(n) && isOpen()) {
 546                             park(Net.POLLIN);
 547                             n = receive(dst, connected);
 548                         }
 549                     }
 550                     if (n >= 0) {
 551                         // sender address is in socket address buffer
 552                         sender = sourceSocketAddress();
 553                     }
 554                 } else {
 555                     // security manager and unconnected
 556                     sender = untrustedReceive(dst);
 557                 }

 646             } while (sender == null);
 647             return sender;
 648         } finally {
 649             readLock.unlock();
 650         }
 651     }
 652 
 653     /**
 654      * Receives a datagram into given buffer. This method is used to support
 655      * the socket adaptor. The buffer is assumed to be trusted.
 656      * @throws SocketTimeoutException if the timeout elapses
 657      */
 658     private SocketAddress trustedBlockingReceive(ByteBuffer dst)
 659         throws IOException
 660     {
 661         assert readLock.isHeldByCurrentThread() && isBlocking();
 662         SocketAddress sender = null;
 663         try {
 664             SocketAddress remote = beginRead(true, false);
 665             boolean connected = (remote != null);

 666             int n = receive(dst, connected);
 667             while (IOStatus.okayToRetry(n) && isOpen()) {
 668                 park(Net.POLLIN);
 669                 n = receive(dst, connected);
 670             }
 671             if (n >= 0) {
 672                 // sender address is in socket address buffer
 673                 sender = sourceSocketAddress();
 674             }
 675             return sender;
 676         } finally {
 677             endRead(true, (sender != null));
 678         }
 679     }
 680 
 681     /**
 682      * Receives a datagram into given buffer with a timeout. This method is
 683      * used to support the socket adaptor. The buffer is assumed to be trusted.
 684      * @throws SocketTimeoutException if the timeout elapses
 685      */

 772         cachedSockAddr = sourceSockAddr;
 773         sourceSockAddr = tmp;
 774         cachedInetSocketAddress = isa;
 775         return isa;
 776     }
 777 
 778     @Override
 779     public int send(ByteBuffer src, SocketAddress target)
 780         throws IOException
 781     {
 782         Objects.requireNonNull(src);
 783         InetSocketAddress isa = Net.checkAddress(target, family);
 784 
 785         writeLock.lock();
 786         try {
 787             boolean blocking = isBlocking();
 788             int n;
 789             boolean completed = false;
 790             try {
 791                 SocketAddress remote = beginWrite(blocking, false);

 792                 if (remote != null) {
 793                     // connected
 794                     if (!target.equals(remote)) {
 795                         throw new AlreadyConnectedException();
 796                     }
 797                     n = IOUtil.write(fd, src, -1, nd);
 798                     if (blocking) {
 799                         while (IOStatus.okayToRetry(n) && isOpen()) {
 800                             park(Net.POLLOUT);
 801                             n = IOUtil.write(fd, src, -1, nd);
 802                         }
 803                     }
 804                     completed = (n > 0);
 805                 } else {
 806                     // not connected
 807                     @SuppressWarnings("removal")
 808                     SecurityManager sm = System.getSecurityManager();
 809                     InetAddress ia = isa.getAddress();
 810                     if (sm != null) {
 811                         if (ia.isMulticastAddress()) {

 920         // identity rather than equals as Inet6Address.equals ignores scope_id.
 921         if (isa == previousTarget)
 922             return previousSockAddrLength;
 923         previousTarget = null;
 924         int len = targetSockAddr.encode(family, isa);
 925         previousTarget = isa;
 926         previousSockAddrLength = len;
 927         return len;
 928     }
 929 
 930     @Override
 931     public int read(ByteBuffer buf) throws IOException {
 932         Objects.requireNonNull(buf);
 933 
 934         readLock.lock();
 935         try {
 936             boolean blocking = isBlocking();
 937             int n = 0;
 938             try {
 939                 beginRead(blocking, true);

 940                 n = IOUtil.read(fd, buf, -1, nd);
 941                 if (blocking) {
 942                     while (IOStatus.okayToRetry(n) && isOpen()) {
 943                         park(Net.POLLIN);
 944                         n = IOUtil.read(fd, buf, -1, nd);
 945                     }
 946                 }
 947             } finally {
 948                 endRead(blocking, n > 0);
 949                 assert IOStatus.check(n);
 950             }
 951             return IOStatus.normalize(n);
 952         } finally {
 953             readLock.unlock();
 954         }
 955     }
 956 
 957     @Override
 958     public long read(ByteBuffer[] dsts, int offset, int length)
 959         throws IOException
 960     {
 961         Objects.checkFromIndexSize(offset, length, dsts.length);
 962 
 963         readLock.lock();
 964         try {
 965             boolean blocking = isBlocking();
 966             long n = 0;
 967             try {
 968                 beginRead(blocking, true);

 969                 n = IOUtil.read(fd, dsts, offset, length, nd);
 970                 if (blocking) {
 971                     while (IOStatus.okayToRetry(n)  && isOpen()) {
 972                         park(Net.POLLIN);
 973                         n = IOUtil.read(fd, dsts, offset, length, nd);
 974                     }
 975                 }
 976             } finally {
 977                 endRead(blocking, n > 0);
 978                 assert IOStatus.check(n);
 979             }
 980             return IOStatus.normalize(n);
 981         } finally {
 982             readLock.unlock();
 983         }
 984     }
 985 
 986     /**
 987      * Marks the beginning of a write operation that might block.
 988      * @param blocking true if configured blocking

1031 
1032             if (interruptible) {
1033                 // remove hook for Thread.interrupt (may throw AsynchronousCloseException)
1034                 end(completed);
1035             } else if (!completed && !isOpen()) {
1036                 throw new AsynchronousCloseException();
1037             }
1038         }
1039     }
1040 
1041     @Override
1042     public int write(ByteBuffer buf) throws IOException {
1043         Objects.requireNonNull(buf);
1044 
1045         writeLock.lock();
1046         try {
1047             boolean blocking = isBlocking();
1048             int n = 0;
1049             try {
1050                 beginWrite(blocking, true);

1051                 n = IOUtil.write(fd, buf, -1, nd);
1052                 if (blocking) {
1053                     while (IOStatus.okayToRetry(n) && isOpen()) {
1054                         park(Net.POLLOUT);
1055                         n = IOUtil.write(fd, buf, -1, nd);
1056                     }
1057                 }
1058             } finally {
1059                 endWrite(blocking, n > 0);
1060                 assert IOStatus.check(n);
1061             }
1062             return IOStatus.normalize(n);
1063         } finally {
1064             writeLock.unlock();
1065         }
1066     }
1067 
1068     @Override
1069     public long write(ByteBuffer[] srcs, int offset, int length)
1070         throws IOException
1071     {
1072         Objects.checkFromIndexSize(offset, length, srcs.length);
1073 
1074         writeLock.lock();
1075         try {
1076             boolean blocking = isBlocking();
1077             long n = 0;
1078             try {
1079                 beginWrite(blocking, true);

1080                 n = IOUtil.write(fd, srcs, offset, length, nd);
1081                 if (blocking) {
1082                     while (IOStatus.okayToRetry(n) && isOpen()) {
1083                         park(Net.POLLOUT);
1084                         n = IOUtil.write(fd, srcs, offset, length, nd);
1085                     }
1086                 }
1087             } finally {
1088                 endWrite(blocking, n > 0);
1089                 assert IOStatus.check(n);
1090             }
1091             return IOStatus.normalize(n);
1092         } finally {
1093             writeLock.unlock();
1094         }
1095     }
1096 
1097     @Override
1098     protected void implConfigureBlocking(boolean block) throws IOException {
1099         readLock.lock();
1100         try {
1101             writeLock.lock();
1102             try {
1103                 lockedConfigureBlocking(block);
1104             } finally {
1105                 writeLock.unlock();
1106             }
1107         } finally {
1108             readLock.unlock();
1109         }
1110     }
1111 
1112     /**
1113      * Adjusts the blocking mode. readLock or writeLock must already be held.
1114      */
1115     private void lockedConfigureBlocking(boolean block) throws IOException {
1116         assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
1117         synchronized (stateLock) {
1118             ensureOpen();
1119             IOUtil.configureBlocking(fd, block);



1120         }
1121     }
1122 
1123     /**
1124      * Adjusts the blocking mode if the channel is open. readLock or writeLock
1125      * must already be held.
1126      *
1127      * @return {@code true} if the blocking mode was adjusted, {@code false} if
1128      *         the blocking mode was not adjusted because the channel is closed
1129      */
1130     private boolean tryLockedConfigureBlocking(boolean block) throws IOException {
1131         assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
1132         synchronized (stateLock) {
1133             if (isOpen()) {
1134                 IOUtil.configureBlocking(fd, block);
1135                 return true;
1136             } else {
1137                 return false;
1138             }
1139         }
1140     }
1141 
















1142     InetSocketAddress localAddress() {
1143         synchronized (stateLock) {
1144             return localAddress;
1145         }
1146     }
1147 
1148     InetSocketAddress remoteAddress() {
1149         synchronized (stateLock) {
1150             return remoteAddress;
1151         }
1152     }
1153 
1154     @Override
1155     public DatagramChannel bind(SocketAddress local) throws IOException {
1156         readLock.lock();
1157         try {
1158             writeLock.lock();
1159             try {
1160                 synchronized (stateLock) {
1161                     ensureOpen();

1246                     // capture local address before connect
1247                     initialLocalAddress = localAddress;
1248 
1249                     int n = Net.connect(family,
1250                                         fd,
1251                                         isa.getAddress(),
1252                                         isa.getPort());
1253                     if (n <= 0)
1254                         throw new Error();      // Can't happen
1255 
1256                     // connected
1257                     remoteAddress = isa;
1258                     state = ST_CONNECTED;
1259 
1260                     // refresh local address
1261                     localAddress = Net.localAddress(fd);
1262 
1263                     // flush any packets already received.
1264                     boolean blocking = isBlocking();
1265                     if (blocking) {
1266                         IOUtil.configureBlocking(fd, false);
1267                     }
1268                     try {
1269                         ByteBuffer buf = ByteBuffer.allocate(100);
1270                         while (receive(buf, false) >= 0) {
1271                             buf.clear();
1272                         }
1273                     } finally {
1274                         if (blocking) {
1275                             IOUtil.configureBlocking(fd, true);
1276                         }
1277                     }
1278                 }
1279             } finally {
1280                 writeLock.unlock();
1281             }
1282         } finally {
1283             readLock.unlock();
1284         }
1285         return this;
1286     }
1287 
1288     @Override
1289     public DatagramChannel disconnect() throws IOException {
1290         readLock.lock();
1291         try {
1292             writeLock.lock();
1293             try {
1294                 synchronized (stateLock) {
1295                     if (!isOpen() || (state != ST_CONNECTED))

1715     /**
1716      * Closes this channel when configured in blocking mode.
1717      *
1718      * If there is an I/O operation in progress then the socket is pre-closed
1719      * and the I/O threads signalled, in which case the final close is deferred
1720      * until all I/O operations complete.
1721      */
1722     private void implCloseBlockingMode() throws IOException {
1723         synchronized (stateLock) {
1724             assert state < ST_CLOSING;
1725             state = ST_CLOSING;
1726 
1727             // if member of any multicast groups then invalidate the keys
1728             if (registry != null)
1729                 registry.invalidateAll();
1730 
1731             if (!tryClose()) {
1732                 long reader = readerThread;
1733                 long writer = writerThread;
1734                 if (reader != 0 || writer != 0) {
1735                     nd.preClose(fd);
1736                     if (reader != 0)
1737                         NativeThread.signal(reader);
1738                     if (writer != 0)
1739                         NativeThread.signal(writer);







1740                 }
1741             }
1742         }
1743     }
1744 
1745     /**
1746      * Closes this channel when configured in non-blocking mode.
1747      *
1748      * If the channel is registered with a Selector then the close is deferred
1749      * until the channel is flushed from all Selectors.
1750      */
1751     private void implCloseNonBlockingMode() throws IOException {
1752         synchronized (stateLock) {
1753             assert state < ST_CLOSING;
1754             state = ST_CLOSING;
1755 
1756             // if member of any multicast groups then invalidate the keys
1757             if (registry != null)
1758                 registry.invalidateAll();
1759         }

  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.io.FileDescriptor;
  29 import java.io.IOException;
  30 import java.io.InterruptedIOException;
  31 import java.io.UncheckedIOException;
  32 import java.lang.invoke.MethodHandles;
  33 import java.lang.invoke.VarHandle;
  34 import java.lang.ref.Cleaner.Cleanable;
  35 import java.lang.reflect.Method;
  36 import java.net.DatagramSocket;
  37 import java.net.Inet4Address;
  38 import java.net.Inet6Address;
  39 import java.net.InetAddress;
  40 import java.net.InetSocketAddress;
  41 import java.net.NetworkInterface;
  42 import java.net.PortUnreachableException;
  43 import java.net.ProtocolFamily;
  44 import java.net.SocketAddress;
  45 import java.net.SocketException;
  46 import java.net.SocketOption;
  47 import java.net.SocketTimeoutException;
  48 import java.net.StandardProtocolFamily;
  49 import java.net.StandardSocketOptions;
  50 import java.nio.ByteBuffer;
  51 import java.nio.channels.AlreadyBoundException;
  52 import java.nio.channels.AlreadyConnectedException;
  53 import java.nio.channels.AsynchronousCloseException;
  54 import java.nio.channels.ClosedChannelException;
  55 import java.nio.channels.DatagramChannel;
  56 import java.nio.channels.IllegalBlockingModeException;
  57 import java.nio.channels.MembershipKey;
  58 import java.nio.channels.NotYetConnectedException;
  59 import java.nio.channels.SelectionKey;
  60 import java.nio.channels.spi.AbstractSelectableChannel;
  61 import java.nio.channels.spi.SelectorProvider;
  62 import java.security.AccessController;
  63 import java.security.PrivilegedExceptionAction;
  64 import java.util.Collections;
  65 import java.util.HashMap;
  66 import java.util.HashSet;
  67 import java.util.Map;
  68 import java.util.Objects;
  69 import java.util.Set;
  70 import java.util.concurrent.TimeUnit;
  71 import java.util.concurrent.locks.ReentrantLock;
  72 import java.util.function.Consumer;
  73 
  74 import jdk.internal.misc.VirtualThreads;
  75 import jdk.internal.ref.CleanerFactory;
  76 import sun.net.ResourceManager;
  77 import sun.net.ext.ExtendedSocketOptions;
  78 import sun.net.util.IPAddressUtil;
  79 
  80 /**
  81  * An implementation of DatagramChannels.
  82  */
  83 
  84 class DatagramChannelImpl
  85     extends DatagramChannel
  86     implements SelChImpl
  87 {
  88     // Used to make native read and write calls
  89     private static final NativeDispatcher nd = new DatagramDispatcher();
  90 
  91     // true if interruptible (can be false to emulate legacy DatagramSocket)
  92     private final boolean interruptible;
  93 
  94     // The protocol family of the socket

 145     private static final VarHandle SOCKET;
 146     static {
 147         try {
 148             MethodHandles.Lookup l = MethodHandles.lookup();
 149             SOCKET = l.findVarHandle(DatagramChannelImpl.class, "socket", DatagramSocket.class);
 150         } catch (Exception e) {
 151             throw new InternalError(e);
 152         }
 153     }
 154     private volatile DatagramSocket socket;
 155 
 156     // Multicast support
 157     private MembershipRegistry registry;
 158 
 159     // set true when socket is bound and SO_REUSEADDRESS is emulated
 160     private boolean reuseAddressEmulated;
 161 
 162     // set true/false when socket is already bound and SO_REUSEADDR is emulated
 163     private boolean isReuseAddress;
 164 
 165     // lazily set to true when the socket is configured non-blocking
 166     private volatile boolean nonBlocking;
 167 
 168     // -- End of fields protected by stateLock
 169 
 170 
 171     DatagramChannelImpl(SelectorProvider sp, boolean interruptible) throws IOException {
 172         this(sp, (Net.isIPv6Available()
 173                 ? StandardProtocolFamily.INET6
 174                 : StandardProtocolFamily.INET),
 175                 interruptible);
 176     }
 177 
 178     DatagramChannelImpl(SelectorProvider sp, ProtocolFamily family, boolean interruptible)
 179         throws IOException
 180     {
 181         super(sp);
 182 
 183         Objects.requireNonNull(family, "'family' is null");
 184         if ((family != StandardProtocolFamily.INET) &&
 185                 (family != StandardProtocolFamily.INET6)) {
 186             throw new UnsupportedOperationException("Protocol family not supported");
 187         }

 459             set.add(StandardSocketOptions.SO_RCVBUF);
 460             set.add(StandardSocketOptions.SO_REUSEADDR);
 461             if (Net.isReusePortAvailable()) {
 462                 set.add(StandardSocketOptions.SO_REUSEPORT);
 463             }
 464             set.add(StandardSocketOptions.SO_BROADCAST);
 465             set.add(StandardSocketOptions.IP_TOS);
 466             set.add(StandardSocketOptions.IP_MULTICAST_IF);
 467             set.add(StandardSocketOptions.IP_MULTICAST_TTL);
 468             set.add(StandardSocketOptions.IP_MULTICAST_LOOP);
 469             set.addAll(ExtendedSocketOptions.datagramSocketOptions());
 470             return Collections.unmodifiableSet(set);
 471         }
 472     }
 473 
 474     @Override
 475     public final Set<SocketOption<?>> supportedOptions() {
 476         return DefaultOptionsHolder.defaultOptions;
 477     }
 478 
 479     @Override
 480     public void park(int event, long nanos) throws IOException {
 481         Thread thread = Thread.currentThread();
 482         if (thread.isVirtual()) {
 483             Poller.register(getFDVal(), event);
 484             try {
 485                 if (isOpen()) {
 486                     if (nanos == 0) {
 487                         VirtualThreads.park();
 488                     } else {
 489                         VirtualThreads.park(nanos);
 490                     }
 491                     if (!interruptible && thread.isInterrupted()) {
 492                         throw new InterruptedIOException();
 493                     }
 494                 }
 495             } finally {
 496                 Poller.deregister(getFDVal(), event);
 497             }
 498         } else {
 499             long millis;
 500             if (nanos == 0) {
 501                 millis = -1;
 502             } else {
 503                 millis = TimeUnit.NANOSECONDS.toMillis(nanos);
 504             }
 505             Net.poll(getFD(), event, millis);
 506         }
 507     }
 508 
 509     /**
 510      * Marks the beginning of a read operation that might block.
 511      *
 512      * @param blocking true if configured blocking
 513      * @param mustBeConnected true if the socket must be connected
 514      * @return remote address if connected
 515      * @throws ClosedChannelException if the channel is closed
 516      * @throws NotYetConnectedException if mustBeConnected and not connected
 517      * @throws IOException if socket not bound and cannot be bound
 518      */
 519     private SocketAddress beginRead(boolean blocking, boolean mustBeConnected)
 520         throws IOException
 521     {
 522         if (blocking && interruptible) {
 523             // set hook for Thread.interrupt
 524             begin();
 525         }
 526         SocketAddress remote;
 527         synchronized (stateLock) {
 528             ensureOpen();

 554             }
 555             if (interruptible) {
 556                 // remove hook for Thread.interrupt (may throw AsynchronousCloseException)
 557                 end(completed);
 558             } else if (!completed && !isOpen()) {
 559                 throw new AsynchronousCloseException();
 560             }
 561         }
 562     }
 563 
 564     @Override
 565     public SocketAddress receive(ByteBuffer dst) throws IOException {
 566         if (dst.isReadOnly())
 567             throw new IllegalArgumentException("Read-only buffer");
 568         readLock.lock();
 569         try {
 570             boolean blocking = isBlocking();
 571             SocketAddress sender = null;
 572             try {
 573                 SocketAddress remote = beginRead(blocking, false);
 574                 lockedConfigureNonBlockingIfNeeded();
 575                 boolean connected = (remote != null);
 576                 @SuppressWarnings("removal")
 577                 SecurityManager sm = System.getSecurityManager();
 578                 if (connected || (sm == null)) {
 579                     // connected or no security manager
 580                     int n = receive(dst, connected);
 581                     if (blocking) {
 582                         while (IOStatus.okayToRetry(n) && isOpen()) {
 583                             park(Net.POLLIN);
 584                             n = receive(dst, connected);
 585                         }
 586                     }
 587                     if (n >= 0) {
 588                         // sender address is in socket address buffer
 589                         sender = sourceSocketAddress();
 590                     }
 591                 } else {
 592                     // security manager and unconnected
 593                     sender = untrustedReceive(dst);
 594                 }

 683             } while (sender == null);
 684             return sender;
 685         } finally {
 686             readLock.unlock();
 687         }
 688     }
 689 
 690     /**
 691      * Receives a datagram into given buffer. This method is used to support
 692      * the socket adaptor. The buffer is assumed to be trusted.
 693      * @throws SocketTimeoutException if the timeout elapses
 694      */
 695     private SocketAddress trustedBlockingReceive(ByteBuffer dst)
 696         throws IOException
 697     {
 698         assert readLock.isHeldByCurrentThread() && isBlocking();
 699         SocketAddress sender = null;
 700         try {
 701             SocketAddress remote = beginRead(true, false);
 702             boolean connected = (remote != null);
 703             lockedConfigureNonBlockingIfNeeded();
 704             int n = receive(dst, connected);
 705             while (IOStatus.okayToRetry(n) && isOpen()) {
 706                 park(Net.POLLIN);
 707                 n = receive(dst, connected);
 708             }
 709             if (n >= 0) {
 710                 // sender address is in socket address buffer
 711                 sender = sourceSocketAddress();
 712             }
 713             return sender;
 714         } finally {
 715             endRead(true, (sender != null));
 716         }
 717     }
 718 
 719     /**
 720      * Receives a datagram into given buffer with a timeout. This method is
 721      * used to support the socket adaptor. The buffer is assumed to be trusted.
 722      * @throws SocketTimeoutException if the timeout elapses
 723      */

 810         cachedSockAddr = sourceSockAddr;
 811         sourceSockAddr = tmp;
 812         cachedInetSocketAddress = isa;
 813         return isa;
 814     }
 815 
 816     @Override
 817     public int send(ByteBuffer src, SocketAddress target)
 818         throws IOException
 819     {
 820         Objects.requireNonNull(src);
 821         InetSocketAddress isa = Net.checkAddress(target, family);
 822 
 823         writeLock.lock();
 824         try {
 825             boolean blocking = isBlocking();
 826             int n;
 827             boolean completed = false;
 828             try {
 829                 SocketAddress remote = beginWrite(blocking, false);
 830                 lockedConfigureNonBlockingIfNeeded();
 831                 if (remote != null) {
 832                     // connected
 833                     if (!target.equals(remote)) {
 834                         throw new AlreadyConnectedException();
 835                     }
 836                     n = IOUtil.write(fd, src, -1, nd);
 837                     if (blocking) {
 838                         while (IOStatus.okayToRetry(n) && isOpen()) {
 839                             park(Net.POLLOUT);
 840                             n = IOUtil.write(fd, src, -1, nd);
 841                         }
 842                     }
 843                     completed = (n > 0);
 844                 } else {
 845                     // not connected
 846                     @SuppressWarnings("removal")
 847                     SecurityManager sm = System.getSecurityManager();
 848                     InetAddress ia = isa.getAddress();
 849                     if (sm != null) {
 850                         if (ia.isMulticastAddress()) {

 959         // identity rather than equals as Inet6Address.equals ignores scope_id.
 960         if (isa == previousTarget)
 961             return previousSockAddrLength;
 962         previousTarget = null;
 963         int len = targetSockAddr.encode(family, isa);
 964         previousTarget = isa;
 965         previousSockAddrLength = len;
 966         return len;
 967     }
 968 
 969     @Override
 970     public int read(ByteBuffer buf) throws IOException {
 971         Objects.requireNonNull(buf);
 972 
 973         readLock.lock();
 974         try {
 975             boolean blocking = isBlocking();
 976             int n = 0;
 977             try {
 978                 beginRead(blocking, true);
 979                 lockedConfigureNonBlockingIfNeeded();
 980                 n = IOUtil.read(fd, buf, -1, nd);
 981                 if (blocking) {
 982                     while (IOStatus.okayToRetry(n) && isOpen()) {
 983                         park(Net.POLLIN);
 984                         n = IOUtil.read(fd, buf, -1, nd);
 985                     }
 986                 }
 987             } finally {
 988                 endRead(blocking, n > 0);
 989                 assert IOStatus.check(n);
 990             }
 991             return IOStatus.normalize(n);
 992         } finally {
 993             readLock.unlock();
 994         }
 995     }
 996 
 997     @Override
 998     public long read(ByteBuffer[] dsts, int offset, int length)
 999         throws IOException
1000     {
1001         Objects.checkFromIndexSize(offset, length, dsts.length);
1002 
1003         readLock.lock();
1004         try {
1005             boolean blocking = isBlocking();
1006             long n = 0;
1007             try {
1008                 beginRead(blocking, true);
1009                 lockedConfigureNonBlockingIfNeeded();
1010                 n = IOUtil.read(fd, dsts, offset, length, nd);
1011                 if (blocking) {
1012                     while (IOStatus.okayToRetry(n)  && isOpen()) {
1013                         park(Net.POLLIN);
1014                         n = IOUtil.read(fd, dsts, offset, length, nd);
1015                     }
1016                 }
1017             } finally {
1018                 endRead(blocking, n > 0);
1019                 assert IOStatus.check(n);
1020             }
1021             return IOStatus.normalize(n);
1022         } finally {
1023             readLock.unlock();
1024         }
1025     }
1026 
1027     /**
1028      * Marks the beginning of a write operation that might block.
1029      * @param blocking true if configured blocking

1072 
1073             if (interruptible) {
1074                 // remove hook for Thread.interrupt (may throw AsynchronousCloseException)
1075                 end(completed);
1076             } else if (!completed && !isOpen()) {
1077                 throw new AsynchronousCloseException();
1078             }
1079         }
1080     }
1081 
1082     @Override
1083     public int write(ByteBuffer buf) throws IOException {
1084         Objects.requireNonNull(buf);
1085 
1086         writeLock.lock();
1087         try {
1088             boolean blocking = isBlocking();
1089             int n = 0;
1090             try {
1091                 beginWrite(blocking, true);
1092                 lockedConfigureNonBlockingIfNeeded();
1093                 n = IOUtil.write(fd, buf, -1, nd);
1094                 if (blocking) {
1095                     while (IOStatus.okayToRetry(n) && isOpen()) {
1096                         park(Net.POLLOUT);
1097                         n = IOUtil.write(fd, buf, -1, nd);
1098                     }
1099                 }
1100             } finally {
1101                 endWrite(blocking, n > 0);
1102                 assert IOStatus.check(n);
1103             }
1104             return IOStatus.normalize(n);
1105         } finally {
1106             writeLock.unlock();
1107         }
1108     }
1109 
1110     @Override
1111     public long write(ByteBuffer[] srcs, int offset, int length)
1112         throws IOException
1113     {
1114         Objects.checkFromIndexSize(offset, length, srcs.length);
1115 
1116         writeLock.lock();
1117         try {
1118             boolean blocking = isBlocking();
1119             long n = 0;
1120             try {
1121                 beginWrite(blocking, true);
1122                 lockedConfigureNonBlockingIfNeeded();
1123                 n = IOUtil.write(fd, srcs, offset, length, nd);
1124                 if (blocking) {
1125                     while (IOStatus.okayToRetry(n) && isOpen()) {
1126                         park(Net.POLLOUT);
1127                         n = IOUtil.write(fd, srcs, offset, length, nd);
1128                     }
1129                 }
1130             } finally {
1131                 endWrite(blocking, n > 0);
1132                 assert IOStatus.check(n);
1133             }
1134             return IOStatus.normalize(n);
1135         } finally {
1136             writeLock.unlock();
1137         }
1138     }
1139 
1140     @Override
1141     protected void implConfigureBlocking(boolean block) throws IOException {
1142         readLock.lock();
1143         try {
1144             writeLock.lock();
1145             try {
1146                 lockedConfigureBlocking(block);
1147             } finally {
1148                 writeLock.unlock();
1149             }
1150         } finally {
1151             readLock.unlock();
1152         }
1153     }
1154 
1155     /**
1156      * Adjusts the blocking mode. readLock or writeLock must already be held.
1157      */
1158     private void lockedConfigureBlocking(boolean block) throws IOException {
1159         assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
1160         synchronized (stateLock) {
1161             ensureOpen();
1162             // do nothing if virtual thread has forced the socket to be non-blocking
1163             if (!nonBlocking) {
1164                 IOUtil.configureBlocking(fd, block);
1165             }
1166         }
1167     }
1168 
1169     /**
1170      * Attempts to adjusts the blocking mode if the channel is open.
1171      * @return {@code true} if the blocking mode was adjusted



1172      */
1173     private boolean tryLockedConfigureBlocking(boolean block) throws IOException {
1174         assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
1175         synchronized (stateLock) {
1176             if (!nonBlocking && isOpen()) {
1177                 IOUtil.configureBlocking(fd, block);
1178                 return true;
1179             } else {
1180                 return false;
1181             }
1182         }
1183     }
1184 
1185     /**
1186      * Ensures that the socket is configured non-blocking when on a virtual
1187      * thread or a timeout is specified.
1188      * @throws IOException if there is an I/O error changing the blocking mode
1189      */
1190     private void lockedConfigureNonBlockingIfNeeded() throws IOException {
1191         assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
1192         if (!nonBlocking && Thread.currentThread().isVirtual()) {
1193             synchronized (stateLock) {
1194                 ensureOpen();
1195                 IOUtil.configureBlocking(fd, false);
1196                 nonBlocking = true;
1197             }
1198         }
1199     }
1200 
1201     InetSocketAddress localAddress() {
1202         synchronized (stateLock) {
1203             return localAddress;
1204         }
1205     }
1206 
1207     InetSocketAddress remoteAddress() {
1208         synchronized (stateLock) {
1209             return remoteAddress;
1210         }
1211     }
1212 
1213     @Override
1214     public DatagramChannel bind(SocketAddress local) throws IOException {
1215         readLock.lock();
1216         try {
1217             writeLock.lock();
1218             try {
1219                 synchronized (stateLock) {
1220                     ensureOpen();

1305                     // capture local address before connect
1306                     initialLocalAddress = localAddress;
1307 
1308                     int n = Net.connect(family,
1309                                         fd,
1310                                         isa.getAddress(),
1311                                         isa.getPort());
1312                     if (n <= 0)
1313                         throw new Error();      // Can't happen
1314 
1315                     // connected
1316                     remoteAddress = isa;
1317                     state = ST_CONNECTED;
1318 
1319                     // refresh local address
1320                     localAddress = Net.localAddress(fd);
1321 
1322                     // flush any packets already received.
1323                     boolean blocking = isBlocking();
1324                     if (blocking) {
1325                         lockedConfigureBlocking(false);
1326                     }
1327                     try {
1328                         ByteBuffer buf = ByteBuffer.allocate(100);
1329                         while (receive(buf, false) >= 0) {
1330                             buf.clear();
1331                         }
1332                     } finally {
1333                         if (blocking) {
1334                             tryLockedConfigureBlocking(true);
1335                         }
1336                     }
1337                 }
1338             } finally {
1339                 writeLock.unlock();
1340             }
1341         } finally {
1342             readLock.unlock();
1343         }
1344         return this;
1345     }
1346 
1347     @Override
1348     public DatagramChannel disconnect() throws IOException {
1349         readLock.lock();
1350         try {
1351             writeLock.lock();
1352             try {
1353                 synchronized (stateLock) {
1354                     if (!isOpen() || (state != ST_CONNECTED))

1774     /**
1775      * Closes this channel when configured in blocking mode.
1776      *
1777      * If there is an I/O operation in progress then the socket is pre-closed
1778      * and the I/O threads signalled, in which case the final close is deferred
1779      * until all I/O operations complete.
1780      */
1781     private void implCloseBlockingMode() throws IOException {
1782         synchronized (stateLock) {
1783             assert state < ST_CLOSING;
1784             state = ST_CLOSING;
1785 
1786             // if member of any multicast groups then invalidate the keys
1787             if (registry != null)
1788                 registry.invalidateAll();
1789 
1790             if (!tryClose()) {
1791                 long reader = readerThread;
1792                 long writer = writerThread;
1793                 if (reader != 0 || writer != 0) {
1794                     if (NativeThread.isVirtualThread(reader)
1795                             || NativeThread.isVirtualThread(writer)) {
1796                         Poller.stopPoll(fdVal);
1797                     }
1798                     if (NativeThread.isKernelThread(reader)
1799                             || NativeThread.isKernelThread(writer)) {
1800                         nd.preClose(fd);
1801                         if (NativeThread.isKernelThread(reader))
1802                             NativeThread.signal(reader);
1803                         if (NativeThread.isKernelThread(writer))
1804                             NativeThread.signal(writer);
1805                     }
1806                 }
1807             }
1808         }
1809     }
1810 
1811     /**
1812      * Closes this channel when configured in non-blocking mode.
1813      *
1814      * If the channel is registered with a Selector then the close is deferred
1815      * until the channel is flushed from all Selectors.
1816      */
1817     private void implCloseNonBlockingMode() throws IOException {
1818         synchronized (stateLock) {
1819             assert state < ST_CLOSING;
1820             state = ST_CLOSING;
1821 
1822             // if member of any multicast groups then invalidate the keys
1823             if (registry != null)
1824                 registry.invalidateAll();
1825         }
< prev index next >