< prev index next >

src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java

Print this page

  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.io.FileDescriptor;
  29 import java.io.IOException;

  30 import java.io.UncheckedIOException;
  31 import java.lang.invoke.MethodHandles;
  32 import java.lang.invoke.VarHandle;
  33 import java.lang.ref.Cleaner.Cleanable;
  34 import java.lang.reflect.Method;
  35 import java.net.DatagramSocket;
  36 import java.net.Inet4Address;
  37 import java.net.Inet6Address;
  38 import java.net.InetAddress;
  39 import java.net.InetSocketAddress;
  40 import java.net.NetworkInterface;
  41 import java.net.PortUnreachableException;
  42 import java.net.ProtocolFamily;
  43 import java.net.SocketAddress;
  44 import java.net.SocketException;
  45 import java.net.SocketOption;
  46 import java.net.SocketTimeoutException;
  47 import java.net.StandardProtocolFamily;
  48 import java.net.StandardSocketOptions;
  49 import java.nio.ByteBuffer;
  50 import java.nio.channels.AlreadyBoundException;
  51 import java.nio.channels.AlreadyConnectedException;
  52 import java.nio.channels.AsynchronousCloseException;
  53 import java.nio.channels.ClosedChannelException;
  54 import java.nio.channels.DatagramChannel;
  55 import java.nio.channels.IllegalBlockingModeException;
  56 import java.nio.channels.MembershipKey;
  57 import java.nio.channels.NotYetConnectedException;
  58 import java.nio.channels.SelectionKey;
  59 import java.nio.channels.spi.AbstractSelectableChannel;
  60 import java.nio.channels.spi.SelectorProvider;
  61 import java.security.AccessController;
  62 import java.security.PrivilegedExceptionAction;
  63 import java.util.Collections;
  64 import java.util.HashMap;
  65 import java.util.HashSet;
  66 import java.util.Map;
  67 import java.util.Objects;
  68 import java.util.Set;

  69 import java.util.concurrent.locks.ReentrantLock;
  70 import java.util.function.Consumer;
  71 
  72 import jdk.internal.ref.CleanerFactory;
  73 import sun.net.ResourceManager;
  74 import sun.net.ext.ExtendedSocketOptions;
  75 import sun.net.util.IPAddressUtil;
  76 
  77 /**
  78  * An implementation of DatagramChannels.
  79  */
  80 
  81 class DatagramChannelImpl
  82     extends DatagramChannel
  83     implements SelChImpl
  84 {
  85     // Used to make native read and write calls
  86     private static final NativeDispatcher nd = new DatagramDispatcher();
  87 
  88     // true if interruptible (can be false to emulate legacy DatagramSocket)

 142     private static final VarHandle SOCKET;
 143     static {
 144         try {
 145             MethodHandles.Lookup l = MethodHandles.lookup();
 146             SOCKET = l.findVarHandle(DatagramChannelImpl.class, "socket", DatagramSocket.class);
 147         } catch (Exception e) {
 148             throw new InternalError(e);
 149         }
 150     }
 151     private volatile DatagramSocket socket;
 152 
 153     // Multicast support
 154     private MembershipRegistry registry;
 155 
 156     // set true when socket is bound and SO_REUSEADDRESS is emulated
 157     private boolean reuseAddressEmulated;
 158 
 159     // set true/false when socket is already bound and SO_REUSEADDR is emulated
 160     private boolean isReuseAddress;
 161 



 162     // -- End of fields protected by stateLock
 163 
 164 
 165     DatagramChannelImpl(SelectorProvider sp, boolean interruptible) throws IOException {
 166         this(sp, (Net.isIPv6Available()
 167                 ? StandardProtocolFamily.INET6
 168                 : StandardProtocolFamily.INET),
 169                 interruptible);
 170     }
 171 
 172     DatagramChannelImpl(SelectorProvider sp, ProtocolFamily family, boolean interruptible)
 173         throws IOException
 174     {
 175         super(sp);
 176 
 177         Objects.requireNonNull(family, "'family' is null");
 178         if ((family != StandardProtocolFamily.INET) &&
 179                 (family != StandardProtocolFamily.INET6)) {
 180             throw new UnsupportedOperationException("Protocol family not supported");
 181         }

 453             set.add(StandardSocketOptions.SO_RCVBUF);
 454             set.add(StandardSocketOptions.SO_REUSEADDR);
 455             if (Net.isReusePortAvailable()) {
 456                 set.add(StandardSocketOptions.SO_REUSEPORT);
 457             }
 458             set.add(StandardSocketOptions.SO_BROADCAST);
 459             set.add(StandardSocketOptions.IP_TOS);
 460             set.add(StandardSocketOptions.IP_MULTICAST_IF);
 461             set.add(StandardSocketOptions.IP_MULTICAST_TTL);
 462             set.add(StandardSocketOptions.IP_MULTICAST_LOOP);
 463             set.addAll(ExtendedSocketOptions.datagramSocketOptions());
 464             return Collections.unmodifiableSet(set);
 465         }
 466     }
 467 
 468     @Override
 469     public final Set<SocketOption<?>> supportedOptions() {
 470         return DefaultOptionsHolder.defaultOptions;
 471     }
 472 




















 473     /**
 474      * Marks the beginning of a read operation that might block.
 475      *
 476      * @param blocking true if configured blocking
 477      * @param mustBeConnected true if the socket must be connected
 478      * @return remote address if connected
 479      * @throws ClosedChannelException if the channel is closed
 480      * @throws NotYetConnectedException if mustBeConnected and not connected
 481      * @throws IOException if socket not bound and cannot be bound
 482      */
 483     private SocketAddress beginRead(boolean blocking, boolean mustBeConnected)
 484         throws IOException
 485     {
 486         if (blocking && interruptible) {
 487             // set hook for Thread.interrupt
 488             begin();
 489         }
 490         SocketAddress remote;
 491         synchronized (stateLock) {
 492             ensureOpen();

 518             }
 519             if (interruptible) {
 520                 // remove hook for Thread.interrupt (may throw AsynchronousCloseException)
 521                 end(completed);
 522             } else if (!completed && !isOpen()) {
 523                 throw new AsynchronousCloseException();
 524             }
 525         }
 526     }
 527 
 528     @Override
 529     public SocketAddress receive(ByteBuffer dst) throws IOException {
 530         if (dst.isReadOnly())
 531             throw new IllegalArgumentException("Read-only buffer");
 532         readLock.lock();
 533         try {
 534             boolean blocking = isBlocking();
 535             SocketAddress sender = null;
 536             try {
 537                 SocketAddress remote = beginRead(blocking, false);

 538                 boolean connected = (remote != null);
 539                 @SuppressWarnings("removal")
 540                 SecurityManager sm = System.getSecurityManager();
 541                 if (connected || (sm == null)) {
 542                     // connected or no security manager
 543                     int n = receive(dst, connected);
 544                     if (blocking) {
 545                         while (IOStatus.okayToRetry(n) && isOpen()) {
 546                             park(Net.POLLIN);
 547                             n = receive(dst, connected);
 548                         }
 549                     }
 550                     if (n >= 0) {
 551                         // sender address is in socket address buffer
 552                         sender = sourceSocketAddress();
 553                     }
 554                 } else {
 555                     // security manager and unconnected
 556                     sender = untrustedReceive(dst);
 557                 }

 646             } while (sender == null);
 647             return sender;
 648         } finally {
 649             readLock.unlock();
 650         }
 651     }
 652 
 653     /**
 654      * Receives a datagram into given buffer. This method is used to support
 655      * the socket adaptor. The buffer is assumed to be trusted.
 656      * @throws SocketTimeoutException if the timeout elapses
 657      */
 658     private SocketAddress trustedBlockingReceive(ByteBuffer dst)
 659         throws IOException
 660     {
 661         assert readLock.isHeldByCurrentThread() && isBlocking();
 662         SocketAddress sender = null;
 663         try {
 664             SocketAddress remote = beginRead(true, false);
 665             boolean connected = (remote != null);

 666             int n = receive(dst, connected);
 667             while (IOStatus.okayToRetry(n) && isOpen()) {
 668                 park(Net.POLLIN);
 669                 n = receive(dst, connected);
 670             }
 671             if (n >= 0) {
 672                 // sender address is in socket address buffer
 673                 sender = sourceSocketAddress();
 674             }
 675             return sender;
 676         } finally {
 677             endRead(true, (sender != null));
 678         }
 679     }
 680 
 681     /**
 682      * Receives a datagram into given buffer with a timeout. This method is
 683      * used to support the socket adaptor. The buffer is assumed to be trusted.
 684      * @throws SocketTimeoutException if the timeout elapses
 685      */

 772         cachedSockAddr = sourceSockAddr;
 773         sourceSockAddr = tmp;
 774         cachedInetSocketAddress = isa;
 775         return isa;
 776     }
 777 
 778     @Override
 779     public int send(ByteBuffer src, SocketAddress target)
 780         throws IOException
 781     {
 782         Objects.requireNonNull(src);
 783         InetSocketAddress isa = Net.checkAddress(target, family);
 784 
 785         writeLock.lock();
 786         try {
 787             boolean blocking = isBlocking();
 788             int n;
 789             boolean completed = false;
 790             try {
 791                 SocketAddress remote = beginWrite(blocking, false);

 792                 if (remote != null) {
 793                     // connected
 794                     if (!target.equals(remote)) {
 795                         throw new AlreadyConnectedException();
 796                     }
 797                     n = IOUtil.write(fd, src, -1, nd);
 798                     if (blocking) {
 799                         while (IOStatus.okayToRetry(n) && isOpen()) {
 800                             park(Net.POLLOUT);
 801                             n = IOUtil.write(fd, src, -1, nd);
 802                         }
 803                     }
 804                     completed = (n > 0);
 805                 } else {
 806                     // not connected
 807                     @SuppressWarnings("removal")
 808                     SecurityManager sm = System.getSecurityManager();
 809                     InetAddress ia = isa.getAddress();
 810                     if (sm != null) {
 811                         if (ia.isMulticastAddress()) {

 920         // identity rather than equals as Inet6Address.equals ignores scope_id.
 921         if (isa == previousTarget)
 922             return previousSockAddrLength;
 923         previousTarget = null;
 924         int len = targetSockAddr.encode(family, isa);
 925         previousTarget = isa;
 926         previousSockAddrLength = len;
 927         return len;
 928     }
 929 
 930     @Override
 931     public int read(ByteBuffer buf) throws IOException {
 932         Objects.requireNonNull(buf);
 933 
 934         readLock.lock();
 935         try {
 936             boolean blocking = isBlocking();
 937             int n = 0;
 938             try {
 939                 beginRead(blocking, true);

 940                 n = IOUtil.read(fd, buf, -1, nd);
 941                 if (blocking) {
 942                     while (IOStatus.okayToRetry(n) && isOpen()) {
 943                         park(Net.POLLIN);
 944                         n = IOUtil.read(fd, buf, -1, nd);
 945                     }
 946                 }
 947             } finally {
 948                 endRead(blocking, n > 0);
 949                 assert IOStatus.check(n);
 950             }
 951             return IOStatus.normalize(n);
 952         } finally {
 953             readLock.unlock();
 954         }
 955     }
 956 
 957     @Override
 958     public long read(ByteBuffer[] dsts, int offset, int length)
 959         throws IOException
 960     {
 961         Objects.checkFromIndexSize(offset, length, dsts.length);
 962 
 963         readLock.lock();
 964         try {
 965             boolean blocking = isBlocking();
 966             long n = 0;
 967             try {
 968                 beginRead(blocking, true);

 969                 n = IOUtil.read(fd, dsts, offset, length, nd);
 970                 if (blocking) {
 971                     while (IOStatus.okayToRetry(n)  && isOpen()) {
 972                         park(Net.POLLIN);
 973                         n = IOUtil.read(fd, dsts, offset, length, nd);
 974                     }
 975                 }
 976             } finally {
 977                 endRead(blocking, n > 0);
 978                 assert IOStatus.check(n);
 979             }
 980             return IOStatus.normalize(n);
 981         } finally {
 982             readLock.unlock();
 983         }
 984     }
 985 
 986     /**
 987      * Marks the beginning of a write operation that might block.
 988      * @param blocking true if configured blocking

1031 
1032             if (interruptible) {
1033                 // remove hook for Thread.interrupt (may throw AsynchronousCloseException)
1034                 end(completed);
1035             } else if (!completed && !isOpen()) {
1036                 throw new AsynchronousCloseException();
1037             }
1038         }
1039     }
1040 
1041     @Override
1042     public int write(ByteBuffer buf) throws IOException {
1043         Objects.requireNonNull(buf);
1044 
1045         writeLock.lock();
1046         try {
1047             boolean blocking = isBlocking();
1048             int n = 0;
1049             try {
1050                 beginWrite(blocking, true);

1051                 n = IOUtil.write(fd, buf, -1, nd);
1052                 if (blocking) {
1053                     while (IOStatus.okayToRetry(n) && isOpen()) {
1054                         park(Net.POLLOUT);
1055                         n = IOUtil.write(fd, buf, -1, nd);
1056                     }
1057                 }
1058             } finally {
1059                 endWrite(blocking, n > 0);
1060                 assert IOStatus.check(n);
1061             }
1062             return IOStatus.normalize(n);
1063         } finally {
1064             writeLock.unlock();
1065         }
1066     }
1067 
1068     @Override
1069     public long write(ByteBuffer[] srcs, int offset, int length)
1070         throws IOException
1071     {
1072         Objects.checkFromIndexSize(offset, length, srcs.length);
1073 
1074         writeLock.lock();
1075         try {
1076             boolean blocking = isBlocking();
1077             long n = 0;
1078             try {
1079                 beginWrite(blocking, true);

1080                 n = IOUtil.write(fd, srcs, offset, length, nd);
1081                 if (blocking) {
1082                     while (IOStatus.okayToRetry(n) && isOpen()) {
1083                         park(Net.POLLOUT);
1084                         n = IOUtil.write(fd, srcs, offset, length, nd);
1085                     }
1086                 }
1087             } finally {
1088                 endWrite(blocking, n > 0);
1089                 assert IOStatus.check(n);
1090             }
1091             return IOStatus.normalize(n);
1092         } finally {
1093             writeLock.unlock();
1094         }
1095     }
1096 
1097     @Override
1098     protected void implConfigureBlocking(boolean block) throws IOException {
1099         readLock.lock();
1100         try {
1101             writeLock.lock();
1102             try {
1103                 lockedConfigureBlocking(block);
1104             } finally {
1105                 writeLock.unlock();
1106             }
1107         } finally {
1108             readLock.unlock();
1109         }
1110     }
1111 
1112     /**
1113      * Adjusts the blocking mode. readLock or writeLock must already be held.
1114      */
1115     private void lockedConfigureBlocking(boolean block) throws IOException {
1116         assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
1117         synchronized (stateLock) {
1118             ensureOpen();
1119             IOUtil.configureBlocking(fd, block);



1120         }
1121     }
1122 
1123     /**
1124      * Adjusts the blocking mode if the channel is open. readLock or writeLock
1125      * must already be held.
1126      *
1127      * @return {@code true} if the blocking mode was adjusted, {@code false} if
1128      *         the blocking mode was not adjusted because the channel is closed
1129      */
1130     private boolean tryLockedConfigureBlocking(boolean block) throws IOException {
1131         assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
1132         synchronized (stateLock) {
1133             if (isOpen()) {
1134                 IOUtil.configureBlocking(fd, block);
1135                 return true;
1136             } else {
1137                 return false;
1138             }
1139         }
1140     }
1141 
















1142     InetSocketAddress localAddress() {
1143         synchronized (stateLock) {
1144             return localAddress;
1145         }
1146     }
1147 
1148     InetSocketAddress remoteAddress() {
1149         synchronized (stateLock) {
1150             return remoteAddress;
1151         }
1152     }
1153 
1154     @Override
1155     public DatagramChannel bind(SocketAddress local) throws IOException {
1156         readLock.lock();
1157         try {
1158             writeLock.lock();
1159             try {
1160                 synchronized (stateLock) {
1161                     ensureOpen();

1246                     // capture local address before connect
1247                     initialLocalAddress = localAddress;
1248 
1249                     int n = Net.connect(family,
1250                                         fd,
1251                                         isa.getAddress(),
1252                                         isa.getPort());
1253                     if (n <= 0)
1254                         throw new Error();      // Can't happen
1255 
1256                     // connected
1257                     remoteAddress = isa;
1258                     state = ST_CONNECTED;
1259 
1260                     // refresh local address
1261                     localAddress = Net.localAddress(fd);
1262 
1263                     // flush any packets already received.
1264                     boolean blocking = isBlocking();
1265                     if (blocking) {
1266                         IOUtil.configureBlocking(fd, false);
1267                     }
1268                     try {
1269                         ByteBuffer buf = ByteBuffer.allocate(100);
1270                         while (receive(buf, false) >= 0) {
1271                             buf.clear();
1272                         }
1273                     } finally {
1274                         if (blocking) {
1275                             IOUtil.configureBlocking(fd, true);
1276                         }
1277                     }
1278                 }
1279             } finally {
1280                 writeLock.unlock();
1281             }
1282         } finally {
1283             readLock.unlock();
1284         }
1285         return this;
1286     }
1287 
1288     @Override
1289     public DatagramChannel disconnect() throws IOException {
1290         readLock.lock();
1291         try {
1292             writeLock.lock();
1293             try {
1294                 synchronized (stateLock) {
1295                     if (!isOpen() || (state != ST_CONNECTED))

1358             if (value != null) {
1359                 map.put(option, value);
1360             }
1361         }
1362 
1363         // macOS: re-create the socket.
1364         FileDescriptor newfd = Net.socket(family, false);
1365         try {
1366             // copy the socket options that are protocol family agnostic
1367             for (Map.Entry<SocketOption<?>, Object> e : map.entrySet()) {
1368                 SocketOption<?> option = e.getKey();
1369                 if (SocketOptionRegistry.findOption(option, Net.UNSPEC) != null) {
1370                     Object value = e.getValue();
1371                     try {
1372                         Net.setSocketOption(newfd, Net.UNSPEC, option, value);
1373                     } catch (IOException ignore) { }
1374                 }
1375             }
1376 
1377             // copy the blocking mode
1378             if (!isBlocking()) {
1379                 IOUtil.configureBlocking(newfd, false);
1380             }
1381 
1382             // dup this channel's socket to the new socket. If this succeeds then
1383             // fd will reference the new socket. If it fails then it will still
1384             // reference the old socket.
1385             nd.dup(newfd, fd);
1386         } finally {
1387             // release the file descriptor
1388             nd.close(newfd);
1389         }
1390 
1391         // bind to the original local address
1392         try {
1393             Net.bind(family, fd, target.getAddress(), target.getPort());
1394         } catch (IOException ioe) {
1395             // bind failed, socket is left unbound
1396             localAddress = null;
1397             throw ioe;
1398         }

1715     /**
1716      * Closes this channel when configured in blocking mode.
1717      *
1718      * If there is an I/O operation in progress then the socket is pre-closed
1719      * and the I/O threads signalled, in which case the final close is deferred
1720      * until all I/O operations complete.
1721      */
1722     private void implCloseBlockingMode() throws IOException {
1723         synchronized (stateLock) {
1724             assert state < ST_CLOSING;
1725             state = ST_CLOSING;
1726 
1727             // if member of any multicast groups then invalidate the keys
1728             if (registry != null)
1729                 registry.invalidateAll();
1730 
1731             if (!tryClose()) {
1732                 long reader = readerThread;
1733                 long writer = writerThread;
1734                 if (reader != 0 || writer != 0) {
1735                     nd.preClose(fd);
1736                     if (reader != 0)
1737                         NativeThread.signal(reader);
1738                     if (writer != 0)
1739                         NativeThread.signal(writer);







1740                 }
1741             }
1742         }
1743     }
1744 
1745     /**
1746      * Closes this channel when configured in non-blocking mode.
1747      *
1748      * If the channel is registered with a Selector then the close is deferred
1749      * until the channel is flushed from all Selectors.
1750      */
1751     private void implCloseNonBlockingMode() throws IOException {
1752         synchronized (stateLock) {
1753             assert state < ST_CLOSING;
1754             state = ST_CLOSING;
1755 
1756             // if member of any multicast groups then invalidate the keys
1757             if (registry != null)
1758                 registry.invalidateAll();
1759         }

  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.io.FileDescriptor;
  29 import java.io.IOException;
  30 import java.io.InterruptedIOException;
  31 import java.io.UncheckedIOException;
  32 import java.lang.invoke.MethodHandles;
  33 import java.lang.invoke.VarHandle;
  34 import java.lang.ref.Cleaner.Cleanable;
  35 import java.lang.reflect.Method;
  36 import java.net.DatagramSocket;
  37 import java.net.Inet4Address;
  38 import java.net.Inet6Address;
  39 import java.net.InetAddress;
  40 import java.net.InetSocketAddress;
  41 import java.net.NetworkInterface;
  42 import java.net.PortUnreachableException;
  43 import java.net.ProtocolFamily;
  44 import java.net.SocketAddress;
  45 import java.net.SocketException;
  46 import java.net.SocketOption;
  47 import java.net.SocketTimeoutException;
  48 import java.net.StandardProtocolFamily;
  49 import java.net.StandardSocketOptions;
  50 import java.nio.ByteBuffer;
  51 import java.nio.channels.AlreadyBoundException;
  52 import java.nio.channels.AlreadyConnectedException;
  53 import java.nio.channels.AsynchronousCloseException;
  54 import java.nio.channels.ClosedChannelException;
  55 import java.nio.channels.DatagramChannel;
  56 import java.nio.channels.IllegalBlockingModeException;
  57 import java.nio.channels.MembershipKey;
  58 import java.nio.channels.NotYetConnectedException;
  59 import java.nio.channels.SelectionKey;
  60 import java.nio.channels.spi.AbstractSelectableChannel;
  61 import java.nio.channels.spi.SelectorProvider;
  62 import java.security.AccessController;
  63 import java.security.PrivilegedExceptionAction;
  64 import java.util.Collections;
  65 import java.util.HashMap;
  66 import java.util.HashSet;
  67 import java.util.Map;
  68 import java.util.Objects;
  69 import java.util.Set;
  70 import java.util.concurrent.TimeUnit;
  71 import java.util.concurrent.locks.ReentrantLock;
  72 import java.util.function.Consumer;
  73 
  74 import jdk.internal.ref.CleanerFactory;
  75 import sun.net.ResourceManager;
  76 import sun.net.ext.ExtendedSocketOptions;
  77 import sun.net.util.IPAddressUtil;
  78 
  79 /**
  80  * An implementation of DatagramChannels.
  81  */
  82 
  83 class DatagramChannelImpl
  84     extends DatagramChannel
  85     implements SelChImpl
  86 {
  87     // Used to make native read and write calls
  88     private static final NativeDispatcher nd = new DatagramDispatcher();
  89 
  90     // true if interruptible (can be false to emulate legacy DatagramSocket)

 144     private static final VarHandle SOCKET;
 145     static {
 146         try {
 147             MethodHandles.Lookup l = MethodHandles.lookup();
 148             SOCKET = l.findVarHandle(DatagramChannelImpl.class, "socket", DatagramSocket.class);
 149         } catch (Exception e) {
 150             throw new InternalError(e);
 151         }
 152     }
 153     private volatile DatagramSocket socket;
 154 
 155     // Multicast support
 156     private MembershipRegistry registry;
 157 
 158     // set true when socket is bound and SO_REUSEADDRESS is emulated
 159     private boolean reuseAddressEmulated;
 160 
 161     // set true/false when socket is already bound and SO_REUSEADDR is emulated
 162     private boolean isReuseAddress;
 163 
 164     // lazily set to true when the socket is configured non-blocking
 165     private volatile boolean nonBlocking;
 166 
 167     // -- End of fields protected by stateLock
 168 
 169 
 170     DatagramChannelImpl(SelectorProvider sp, boolean interruptible) throws IOException {
 171         this(sp, (Net.isIPv6Available()
 172                 ? StandardProtocolFamily.INET6
 173                 : StandardProtocolFamily.INET),
 174                 interruptible);
 175     }
 176 
 177     DatagramChannelImpl(SelectorProvider sp, ProtocolFamily family, boolean interruptible)
 178         throws IOException
 179     {
 180         super(sp);
 181 
 182         Objects.requireNonNull(family, "'family' is null");
 183         if ((family != StandardProtocolFamily.INET) &&
 184                 (family != StandardProtocolFamily.INET6)) {
 185             throw new UnsupportedOperationException("Protocol family not supported");
 186         }

 458             set.add(StandardSocketOptions.SO_RCVBUF);
 459             set.add(StandardSocketOptions.SO_REUSEADDR);
 460             if (Net.isReusePortAvailable()) {
 461                 set.add(StandardSocketOptions.SO_REUSEPORT);
 462             }
 463             set.add(StandardSocketOptions.SO_BROADCAST);
 464             set.add(StandardSocketOptions.IP_TOS);
 465             set.add(StandardSocketOptions.IP_MULTICAST_IF);
 466             set.add(StandardSocketOptions.IP_MULTICAST_TTL);
 467             set.add(StandardSocketOptions.IP_MULTICAST_LOOP);
 468             set.addAll(ExtendedSocketOptions.datagramSocketOptions());
 469             return Collections.unmodifiableSet(set);
 470         }
 471     }
 472 
 473     @Override
 474     public final Set<SocketOption<?>> supportedOptions() {
 475         return DefaultOptionsHolder.defaultOptions;
 476     }
 477 
 478     @Override
 479     public void park(int event, long nanos) throws IOException {
 480         Thread thread = Thread.currentThread();
 481         if (thread.isVirtual()) {
 482             Poller.poll(getFDVal(), event, nanos, this::isOpen);
 483             // DatagramSocket throws when virtual thread interrupted
 484             if (!interruptible && thread.isInterrupted()) {
 485                 throw new InterruptedIOException();
 486             }
 487         } else {
 488             long millis;
 489             if (nanos == 0) {
 490                 millis = -1;
 491             } else {
 492                 millis = TimeUnit.NANOSECONDS.toMillis(nanos);
 493             }
 494             Net.poll(getFD(), event, millis);
 495         }
 496     }
 497 
 498     /**
 499      * Marks the beginning of a read operation that might block.
 500      *
 501      * @param blocking true if configured blocking
 502      * @param mustBeConnected true if the socket must be connected
 503      * @return remote address if connected
 504      * @throws ClosedChannelException if the channel is closed
 505      * @throws NotYetConnectedException if mustBeConnected and not connected
 506      * @throws IOException if socket not bound and cannot be bound
 507      */
 508     private SocketAddress beginRead(boolean blocking, boolean mustBeConnected)
 509         throws IOException
 510     {
 511         if (blocking && interruptible) {
 512             // set hook for Thread.interrupt
 513             begin();
 514         }
 515         SocketAddress remote;
 516         synchronized (stateLock) {
 517             ensureOpen();

 543             }
 544             if (interruptible) {
 545                 // remove hook for Thread.interrupt (may throw AsynchronousCloseException)
 546                 end(completed);
 547             } else if (!completed && !isOpen()) {
 548                 throw new AsynchronousCloseException();
 549             }
 550         }
 551     }
 552 
 553     @Override
 554     public SocketAddress receive(ByteBuffer dst) throws IOException {
 555         if (dst.isReadOnly())
 556             throw new IllegalArgumentException("Read-only buffer");
 557         readLock.lock();
 558         try {
 559             boolean blocking = isBlocking();
 560             SocketAddress sender = null;
 561             try {
 562                 SocketAddress remote = beginRead(blocking, false);
 563                 lockedConfigureNonBlockingIfNeeded();
 564                 boolean connected = (remote != null);
 565                 @SuppressWarnings("removal")
 566                 SecurityManager sm = System.getSecurityManager();
 567                 if (connected || (sm == null)) {
 568                     // connected or no security manager
 569                     int n = receive(dst, connected);
 570                     if (blocking) {
 571                         while (IOStatus.okayToRetry(n) && isOpen()) {
 572                             park(Net.POLLIN);
 573                             n = receive(dst, connected);
 574                         }
 575                     }
 576                     if (n >= 0) {
 577                         // sender address is in socket address buffer
 578                         sender = sourceSocketAddress();
 579                     }
 580                 } else {
 581                     // security manager and unconnected
 582                     sender = untrustedReceive(dst);
 583                 }

 672             } while (sender == null);
 673             return sender;
 674         } finally {
 675             readLock.unlock();
 676         }
 677     }
 678 
 679     /**
 680      * Receives a datagram into given buffer. This method is used to support
 681      * the socket adaptor. The buffer is assumed to be trusted.
 682      * @throws SocketTimeoutException if the timeout elapses
 683      */
 684     private SocketAddress trustedBlockingReceive(ByteBuffer dst)
 685         throws IOException
 686     {
 687         assert readLock.isHeldByCurrentThread() && isBlocking();
 688         SocketAddress sender = null;
 689         try {
 690             SocketAddress remote = beginRead(true, false);
 691             boolean connected = (remote != null);
 692             lockedConfigureNonBlockingIfNeeded();
 693             int n = receive(dst, connected);
 694             while (IOStatus.okayToRetry(n) && isOpen()) {
 695                 park(Net.POLLIN);
 696                 n = receive(dst, connected);
 697             }
 698             if (n >= 0) {
 699                 // sender address is in socket address buffer
 700                 sender = sourceSocketAddress();
 701             }
 702             return sender;
 703         } finally {
 704             endRead(true, (sender != null));
 705         }
 706     }
 707 
 708     /**
 709      * Receives a datagram into given buffer with a timeout. This method is
 710      * used to support the socket adaptor. The buffer is assumed to be trusted.
 711      * @throws SocketTimeoutException if the timeout elapses
 712      */

 799         cachedSockAddr = sourceSockAddr;
 800         sourceSockAddr = tmp;
 801         cachedInetSocketAddress = isa;
 802         return isa;
 803     }
 804 
 805     @Override
 806     public int send(ByteBuffer src, SocketAddress target)
 807         throws IOException
 808     {
 809         Objects.requireNonNull(src);
 810         InetSocketAddress isa = Net.checkAddress(target, family);
 811 
 812         writeLock.lock();
 813         try {
 814             boolean blocking = isBlocking();
 815             int n;
 816             boolean completed = false;
 817             try {
 818                 SocketAddress remote = beginWrite(blocking, false);
 819                 lockedConfigureNonBlockingIfNeeded();
 820                 if (remote != null) {
 821                     // connected
 822                     if (!target.equals(remote)) {
 823                         throw new AlreadyConnectedException();
 824                     }
 825                     n = IOUtil.write(fd, src, -1, nd);
 826                     if (blocking) {
 827                         while (IOStatus.okayToRetry(n) && isOpen()) {
 828                             park(Net.POLLOUT);
 829                             n = IOUtil.write(fd, src, -1, nd);
 830                         }
 831                     }
 832                     completed = (n > 0);
 833                 } else {
 834                     // not connected
 835                     @SuppressWarnings("removal")
 836                     SecurityManager sm = System.getSecurityManager();
 837                     InetAddress ia = isa.getAddress();
 838                     if (sm != null) {
 839                         if (ia.isMulticastAddress()) {

 948         // identity rather than equals as Inet6Address.equals ignores scope_id.
 949         if (isa == previousTarget)
 950             return previousSockAddrLength;
 951         previousTarget = null;
 952         int len = targetSockAddr.encode(family, isa);
 953         previousTarget = isa;
 954         previousSockAddrLength = len;
 955         return len;
 956     }
 957 
 958     @Override
 959     public int read(ByteBuffer buf) throws IOException {
 960         Objects.requireNonNull(buf);
 961 
 962         readLock.lock();
 963         try {
 964             boolean blocking = isBlocking();
 965             int n = 0;
 966             try {
 967                 beginRead(blocking, true);
 968                 lockedConfigureNonBlockingIfNeeded();
 969                 n = IOUtil.read(fd, buf, -1, nd);
 970                 if (blocking) {
 971                     while (IOStatus.okayToRetry(n) && isOpen()) {
 972                         park(Net.POLLIN);
 973                         n = IOUtil.read(fd, buf, -1, nd);
 974                     }
 975                 }
 976             } finally {
 977                 endRead(blocking, n > 0);
 978                 assert IOStatus.check(n);
 979             }
 980             return IOStatus.normalize(n);
 981         } finally {
 982             readLock.unlock();
 983         }
 984     }
 985 
 986     @Override
 987     public long read(ByteBuffer[] dsts, int offset, int length)
 988         throws IOException
 989     {
 990         Objects.checkFromIndexSize(offset, length, dsts.length);
 991 
 992         readLock.lock();
 993         try {
 994             boolean blocking = isBlocking();
 995             long n = 0;
 996             try {
 997                 beginRead(blocking, true);
 998                 lockedConfigureNonBlockingIfNeeded();
 999                 n = IOUtil.read(fd, dsts, offset, length, nd);
1000                 if (blocking) {
1001                     while (IOStatus.okayToRetry(n)  && isOpen()) {
1002                         park(Net.POLLIN);
1003                         n = IOUtil.read(fd, dsts, offset, length, nd);
1004                     }
1005                 }
1006             } finally {
1007                 endRead(blocking, n > 0);
1008                 assert IOStatus.check(n);
1009             }
1010             return IOStatus.normalize(n);
1011         } finally {
1012             readLock.unlock();
1013         }
1014     }
1015 
1016     /**
1017      * Marks the beginning of a write operation that might block.
1018      * @param blocking true if configured blocking

1061 
1062             if (interruptible) {
1063                 // remove hook for Thread.interrupt (may throw AsynchronousCloseException)
1064                 end(completed);
1065             } else if (!completed && !isOpen()) {
1066                 throw new AsynchronousCloseException();
1067             }
1068         }
1069     }
1070 
1071     @Override
1072     public int write(ByteBuffer buf) throws IOException {
1073         Objects.requireNonNull(buf);
1074 
1075         writeLock.lock();
1076         try {
1077             boolean blocking = isBlocking();
1078             int n = 0;
1079             try {
1080                 beginWrite(blocking, true);
1081                 lockedConfigureNonBlockingIfNeeded();
1082                 n = IOUtil.write(fd, buf, -1, nd);
1083                 if (blocking) {
1084                     while (IOStatus.okayToRetry(n) && isOpen()) {
1085                         park(Net.POLLOUT);
1086                         n = IOUtil.write(fd, buf, -1, nd);
1087                     }
1088                 }
1089             } finally {
1090                 endWrite(blocking, n > 0);
1091                 assert IOStatus.check(n);
1092             }
1093             return IOStatus.normalize(n);
1094         } finally {
1095             writeLock.unlock();
1096         }
1097     }
1098 
1099     @Override
1100     public long write(ByteBuffer[] srcs, int offset, int length)
1101         throws IOException
1102     {
1103         Objects.checkFromIndexSize(offset, length, srcs.length);
1104 
1105         writeLock.lock();
1106         try {
1107             boolean blocking = isBlocking();
1108             long n = 0;
1109             try {
1110                 beginWrite(blocking, true);
1111                 lockedConfigureNonBlockingIfNeeded();
1112                 n = IOUtil.write(fd, srcs, offset, length, nd);
1113                 if (blocking) {
1114                     while (IOStatus.okayToRetry(n) && isOpen()) {
1115                         park(Net.POLLOUT);
1116                         n = IOUtil.write(fd, srcs, offset, length, nd);
1117                     }
1118                 }
1119             } finally {
1120                 endWrite(blocking, n > 0);
1121                 assert IOStatus.check(n);
1122             }
1123             return IOStatus.normalize(n);
1124         } finally {
1125             writeLock.unlock();
1126         }
1127     }
1128 
1129     @Override
1130     protected void implConfigureBlocking(boolean block) throws IOException {
1131         readLock.lock();
1132         try {
1133             writeLock.lock();
1134             try {
1135                 lockedConfigureBlocking(block);
1136             } finally {
1137                 writeLock.unlock();
1138             }
1139         } finally {
1140             readLock.unlock();
1141         }
1142     }
1143 
1144     /**
1145      * Adjusts the blocking mode. readLock or writeLock must already be held.
1146      */
1147     private void lockedConfigureBlocking(boolean block) throws IOException {
1148         assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
1149         synchronized (stateLock) {
1150             ensureOpen();
1151             // do nothing if virtual thread has forced the socket to be non-blocking
1152             if (!nonBlocking) {
1153                 IOUtil.configureBlocking(fd, block);
1154             }
1155         }
1156     }
1157 
1158     /**
1159      * Attempts to adjust the blocking mode if the channel is open.
1160      * @return {@code true} if the blocking mode was adjusted



1161      */
1162     private boolean tryLockedConfigureBlocking(boolean block) throws IOException {
1163         assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
1164         synchronized (stateLock) {
1165             if (!nonBlocking && isOpen()) {
1166                 IOUtil.configureBlocking(fd, block);
1167                 return true;
1168             } else {
1169                 return false;
1170             }
1171         }
1172     }
1173 
1174     /**
1175      * Ensures that the socket is configured non-blocking when on a virtual
1176      * thread or a timeout is specified.
1177      * @throws IOException if there is an I/O error changing the blocking mode
1178      */
1179     private void lockedConfigureNonBlockingIfNeeded() throws IOException {
1180         assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
1181         if (!nonBlocking && Thread.currentThread().isVirtual()) {
1182             synchronized (stateLock) {
1183                 ensureOpen();
1184                 IOUtil.configureBlocking(fd, false);
1185                 nonBlocking = true;
1186             }
1187         }
1188     }
1189 
1190     InetSocketAddress localAddress() {
1191         synchronized (stateLock) {
1192             return localAddress;
1193         }
1194     }
1195 
1196     InetSocketAddress remoteAddress() {
1197         synchronized (stateLock) {
1198             return remoteAddress;
1199         }
1200     }
1201 
1202     @Override
1203     public DatagramChannel bind(SocketAddress local) throws IOException {
1204         readLock.lock();
1205         try {
1206             writeLock.lock();
1207             try {
1208                 synchronized (stateLock) {
1209                     ensureOpen();

1294                     // capture local address before connect
1295                     initialLocalAddress = localAddress;
1296 
1297                     int n = Net.connect(family,
1298                                         fd,
1299                                         isa.getAddress(),
1300                                         isa.getPort());
1301                     if (n <= 0)
1302                         throw new Error();      // Can't happen
1303 
1304                     // connected
1305                     remoteAddress = isa;
1306                     state = ST_CONNECTED;
1307 
1308                     // refresh local address
1309                     localAddress = Net.localAddress(fd);
1310 
1311                     // flush any packets already received.
1312                     boolean blocking = isBlocking();
1313                     if (blocking) {
1314                         lockedConfigureBlocking(false);
1315                     }
1316                     try {
1317                         ByteBuffer buf = ByteBuffer.allocate(100);
1318                         while (receive(buf, false) >= 0) {
1319                             buf.clear();
1320                         }
1321                     } finally {
1322                         if (blocking) {
1323                             tryLockedConfigureBlocking(true);
1324                         }
1325                     }
1326                 }
1327             } finally {
1328                 writeLock.unlock();
1329             }
1330         } finally {
1331             readLock.unlock();
1332         }
1333         return this;
1334     }
1335 
1336     @Override
1337     public DatagramChannel disconnect() throws IOException {
1338         readLock.lock();
1339         try {
1340             writeLock.lock();
1341             try {
1342                 synchronized (stateLock) {
1343                     if (!isOpen() || (state != ST_CONNECTED))

1406             if (value != null) {
1407                 map.put(option, value);
1408             }
1409         }
1410 
1411         // macOS: re-create the socket.
1412         FileDescriptor newfd = Net.socket(family, false);
1413         try {
1414             // copy the socket options that are protocol family agnostic
1415             for (Map.Entry<SocketOption<?>, Object> e : map.entrySet()) {
1416                 SocketOption<?> option = e.getKey();
1417                 if (SocketOptionRegistry.findOption(option, Net.UNSPEC) != null) {
1418                     Object value = e.getValue();
1419                     try {
1420                         Net.setSocketOption(newfd, Net.UNSPEC, option, value);
1421                     } catch (IOException ignore) { }
1422                 }
1423             }
1424 
1425             // copy the blocking mode
1426             if (!isBlocking() || nonBlocking) {
1427                 IOUtil.configureBlocking(newfd, false);
1428             }
1429 
1430             // dup this channel's socket to the new socket. If this succeeds then
1431             // fd will reference the new socket. If it fails then it will still
1432             // reference the old socket.
1433             nd.dup(newfd, fd);
1434         } finally {
1435             // release the file descriptor
1436             nd.close(newfd);
1437         }
1438 
1439         // bind to the original local address
1440         try {
1441             Net.bind(family, fd, target.getAddress(), target.getPort());
1442         } catch (IOException ioe) {
1443             // bind failed, socket is left unbound
1444             localAddress = null;
1445             throw ioe;
1446         }

1763     /**
1764      * Closes this channel when configured in blocking mode.
1765      *
1766      * If there is an I/O operation in progress then the socket is pre-closed
1767      * and the I/O threads signalled, in which case the final close is deferred
1768      * until all I/O operations complete.
1769      */
1770     private void implCloseBlockingMode() throws IOException {
1771         synchronized (stateLock) {
1772             assert state < ST_CLOSING;
1773             state = ST_CLOSING;
1774 
1775             // if member of any multicast groups then invalidate the keys
1776             if (registry != null)
1777                 registry.invalidateAll();
1778 
1779             if (!tryClose()) {
1780                 long reader = readerThread;
1781                 long writer = writerThread;
1782                 if (reader != 0 || writer != 0) {
1783                     if (NativeThread.isVirtualThread(reader)
1784                             || NativeThread.isVirtualThread(writer)) {
1785                         Poller.stopPoll(fdVal);
1786                     }
1787                     if (NativeThread.isKernelThread(reader)
1788                             || NativeThread.isKernelThread(writer)) {
1789                         nd.preClose(fd);
1790                         if (NativeThread.isKernelThread(reader))
1791                             NativeThread.signal(reader);
1792                         if (NativeThread.isKernelThread(writer))
1793                             NativeThread.signal(writer);
1794                     }
1795                 }
1796             }
1797         }
1798     }
1799 
1800     /**
1801      * Closes this channel when configured in non-blocking mode.
1802      *
1803      * If the channel is registered with a Selector then the close is deferred
1804      * until the channel is flushed from all Selectors.
1805      */
1806     private void implCloseNonBlockingMode() throws IOException {
1807         synchronized (stateLock) {
1808             assert state < ST_CLOSING;
1809             state = ST_CLOSING;
1810 
1811             // if member of any multicast groups then invalidate the keys
1812             if (registry != null)
1813                 registry.invalidateAll();
1814         }
< prev index next >