1 /* 2 * Copyright (c) 2001, 2025, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.io.FileDescriptor; 29 import java.io.IOException; 30 import java.io.InterruptedIOException; 31 import java.io.UncheckedIOException; 32 import java.lang.invoke.MethodHandles; 33 import java.lang.invoke.VarHandle; 34 import java.lang.ref.Cleaner.Cleanable; 35 import java.lang.reflect.Method; 36 import java.net.DatagramPacket; 37 import java.net.DatagramSocket; 38 import java.net.Inet4Address; 39 import java.net.Inet6Address; 40 import java.net.InetAddress; 41 import java.net.InetSocketAddress; 42 import java.net.NetworkInterface; 43 import java.net.PortUnreachableException; 44 import java.net.ProtocolFamily; 45 import java.net.SocketAddress; 46 import java.net.SocketException; 47 import java.net.SocketOption; 48 import java.net.SocketTimeoutException; 49 import java.net.StandardProtocolFamily; 50 import java.net.StandardSocketOptions; 51 import java.nio.ByteBuffer; 52 import java.nio.channels.AlreadyBoundException; 53 import java.nio.channels.AlreadyConnectedException; 54 import java.nio.channels.AsynchronousCloseException; 55 import java.nio.channels.ClosedChannelException; 56 import java.nio.channels.DatagramChannel; 57 import java.nio.channels.IllegalBlockingModeException; 58 import java.nio.channels.MembershipKey; 59 import java.nio.channels.NotYetConnectedException; 60 import java.nio.channels.SelectionKey; 61 import java.nio.channels.spi.AbstractSelectableChannel; 62 import java.nio.channels.spi.SelectorProvider; 63 import java.util.Collections; 64 import java.util.HashMap; 65 import java.util.HashSet; 66 import java.util.Map; 67 import java.util.Objects; 68 import java.util.Set; 69 import java.util.concurrent.locks.ReentrantLock; 70 import java.util.function.Consumer; 71 72 import jdk.internal.access.JavaNioAccess; 73 import jdk.internal.access.SharedSecrets; 74 import jdk.internal.ref.CleanerFactory; 75 import jdk.internal.invoke.MhUtil; 76 import sun.net.ext.ExtendedSocketOptions; 77 import sun.net.util.IPAddressUtil; 78 79 import static java.util.concurrent.TimeUnit.MILLISECONDS; 80 import static java.util.concurrent.TimeUnit.NANOSECONDS; 81 82 /** 83 * An implementation of DatagramChannels. 84 */ 85 86 class DatagramChannelImpl 87 extends DatagramChannel 88 implements SelChImpl 89 { 90 // Used to make native read and write calls 91 private static final NativeDispatcher nd = new DatagramDispatcher(); 92 93 private static final JavaNioAccess NIO_ACCESS = SharedSecrets.getJavaNioAccess(); 94 95 // true if interruptible (can be false to emulate legacy DatagramSocket) 96 private final boolean interruptible; 97 98 // The protocol family of the socket 99 private final ProtocolFamily family; 100 101 // Our file descriptor 102 private final FileDescriptor fd; 103 private final int fdVal; 104 105 // Native sockaddrs and cached InetSocketAddress for receive, protected by readLock 106 private NativeSocketAddress sourceSockAddr; 107 private NativeSocketAddress cachedSockAddr; 108 private InetSocketAddress cachedInetSocketAddress; 109 110 // Native sockaddr and cached objects for send, protected by writeLock 111 private final NativeSocketAddress targetSockAddr; 112 private InetSocketAddress previousTarget; 113 private int previousSockAddrLength; 114 115 // Cleaner to close file descriptor and free native socket address 116 private final Cleanable cleaner; 117 118 // Lock held by current reading or connecting thread 119 private final ReentrantLock readLock = new ReentrantLock(); 120 121 // Lock held by current writing or connecting thread 122 private final ReentrantLock writeLock = new ReentrantLock(); 123 124 // Lock held by any thread that modifies the state fields declared below 125 // DO NOT invoke a blocking I/O operation while holding this lock! 126 private final Object stateLock = new Object(); 127 128 // -- The following fields are protected by stateLock 129 130 // State (does not necessarily increase monotonically) 131 private static final int ST_UNCONNECTED = 0; 132 private static final int ST_CONNECTED = 1; 133 private static final int ST_CLOSING = 2; 134 private static final int ST_CLOSED = 3; 135 private int state; 136 137 // IDs of native threads doing reads and writes, for signalling 138 private long readerThread; 139 private long writerThread; 140 141 // Local and remote (connected) address 142 private InetSocketAddress localAddress; 143 private InetSocketAddress remoteAddress; 144 145 // Local address prior to connecting 146 private InetSocketAddress initialLocalAddress; 147 148 // Socket adaptor, created lazily 149 private static final VarHandle SOCKET = MhUtil.findVarHandle( 150 MethodHandles.lookup(), "socket", DatagramSocket.class); 151 private volatile DatagramSocket socket; 152 153 // Multicast support 154 private MembershipRegistry registry; 155 156 // set true when socket is bound and SO_REUSEADDRESS is emulated 157 private boolean reuseAddressEmulated; 158 159 // set true/false when socket is already bound and SO_REUSEADDR is emulated 160 private boolean isReuseAddress; 161 162 // True if the channel's socket has been forced into non-blocking mode 163 // by a virtual thread. It cannot be reset. When the channel is in 164 // blocking mode and the channel's socket is in non-blocking mode then 165 // operations that don't complete immediately will poll the socket and 166 // preserve the semantics of blocking operations. 167 private volatile boolean forcedNonBlocking; 168 169 // -- End of fields protected by stateLock 170 171 172 DatagramChannelImpl(SelectorProvider sp, boolean interruptible) throws IOException { 173 this(sp, (Net.isIPv6Available() 174 ? StandardProtocolFamily.INET6 175 : StandardProtocolFamily.INET), 176 interruptible); 177 } 178 179 DatagramChannelImpl(SelectorProvider sp, ProtocolFamily family, boolean interruptible) 180 throws IOException 181 { 182 super(sp); 183 184 Objects.requireNonNull(family, "'family' is null"); 185 if ((family != StandardProtocolFamily.INET) && 186 (family != StandardProtocolFamily.INET6)) { 187 throw new UnsupportedOperationException("Protocol family not supported"); 188 } 189 if (family == StandardProtocolFamily.INET6 && !Net.isIPv6Available()) { 190 throw new UnsupportedOperationException("IPv6 not available"); 191 } 192 193 FileDescriptor fd = null; 194 NativeSocketAddress[] sockAddrs = null; 195 196 boolean initialized = false; 197 try { 198 this.interruptible = interruptible; 199 this.family = family; 200 this.fd = fd = Net.socket(family, false); 201 this.fdVal = IOUtil.fdVal(fd); 202 203 sockAddrs = NativeSocketAddress.allocate(3); 204 readLock.lock(); 205 try { 206 this.sourceSockAddr = sockAddrs[0]; 207 this.cachedSockAddr = sockAddrs[1]; 208 } finally { 209 readLock.unlock(); 210 } 211 this.targetSockAddr = sockAddrs[2]; 212 213 initialized = true; 214 } finally { 215 if (!initialized) { 216 if (sockAddrs != null) NativeSocketAddress.freeAll(sockAddrs); 217 if (fd != null) nd.close(fd); 218 } 219 } 220 221 Runnable releaser = releaserFor(fd, sockAddrs); 222 this.cleaner = CleanerFactory.cleaner().register(this, releaser); 223 } 224 225 DatagramChannelImpl(SelectorProvider sp, FileDescriptor fd) 226 throws IOException 227 { 228 super(sp); 229 230 NativeSocketAddress[] sockAddrs = null; 231 232 boolean initialized = false; 233 try { 234 this.interruptible = true; 235 this.family = Net.isIPv6Available() 236 ? StandardProtocolFamily.INET6 237 : StandardProtocolFamily.INET; 238 this.fd = fd; 239 this.fdVal = IOUtil.fdVal(fd); 240 241 sockAddrs = NativeSocketAddress.allocate(3); 242 readLock.lock(); 243 try { 244 this.sourceSockAddr = sockAddrs[0]; 245 this.cachedSockAddr = sockAddrs[1]; 246 } finally { 247 readLock.unlock(); 248 } 249 this.targetSockAddr = sockAddrs[2]; 250 251 initialized = true; 252 } finally { 253 if (!initialized) { 254 if (sockAddrs != null) NativeSocketAddress.freeAll(sockAddrs); 255 nd.close(fd); 256 } 257 } 258 259 Runnable releaser = releaserFor(fd, sockAddrs); 260 this.cleaner = CleanerFactory.cleaner().register(this, releaser); 261 262 synchronized (stateLock) { 263 this.localAddress = Net.localAddress(fd); 264 } 265 } 266 267 // @throws ClosedChannelException if channel is closed 268 private void ensureOpen() throws ClosedChannelException { 269 if (!isOpen()) 270 throw new ClosedChannelException(); 271 } 272 273 @Override 274 public DatagramSocket socket() { 275 DatagramSocket socket = this.socket; 276 if (socket == null) { 277 socket = DatagramSocketAdaptor.create(this); 278 if (!SOCKET.compareAndSet(this, null, socket)) { 279 socket = this.socket; 280 } 281 } 282 return socket; 283 } 284 285 @Override 286 public SocketAddress getLocalAddress() throws IOException { 287 synchronized (stateLock) { 288 ensureOpen(); 289 return localAddress; 290 } 291 } 292 293 @Override 294 public SocketAddress getRemoteAddress() throws IOException { 295 synchronized (stateLock) { 296 ensureOpen(); 297 return remoteAddress; 298 } 299 } 300 301 /** 302 * Returns the protocol family to specify to set/getSocketOption for the 303 * given socket option. 304 */ 305 private ProtocolFamily familyFor(SocketOption<?> name) { 306 assert Thread.holdsLock(stateLock); 307 308 // unspecified (most options) 309 if (SocketOptionRegistry.findOption(name, Net.UNSPEC) != null) 310 return Net.UNSPEC; 311 312 // IPv4 socket 313 if (family == StandardProtocolFamily.INET) 314 return StandardProtocolFamily.INET; 315 316 // IPv6 socket that is unbound 317 if (localAddress == null) 318 return StandardProtocolFamily.INET6; 319 320 // IPv6 socket bound to wildcard or IPv6 address 321 InetAddress address = localAddress.getAddress(); 322 if (address.isAnyLocalAddress() || (address instanceof Inet6Address)) 323 return StandardProtocolFamily.INET6; 324 325 // IPv6 socket bound to IPv4 address 326 if (Net.canUseIPv6OptionsWithIPv4LocalAddress()) { 327 // IPV6_XXX options can be used 328 return StandardProtocolFamily.INET6; 329 } else { 330 // IPV6_XXX options cannot be used 331 return StandardProtocolFamily.INET; 332 } 333 } 334 335 @Override 336 public <T> DatagramChannel setOption(SocketOption<T> name, T value) 337 throws IOException 338 { 339 Objects.requireNonNull(name); 340 if (!supportedOptions().contains(name)) 341 throw new UnsupportedOperationException("'" + name + "' not supported"); 342 if (!name.type().isInstance(value)) 343 throw new IllegalArgumentException("Invalid value '" + value + "'"); 344 345 synchronized (stateLock) { 346 ensureOpen(); 347 348 ProtocolFamily family = familyFor(name); 349 350 // Some platforms require both IPV6_XXX and IP_XXX socket options to 351 // be set when the channel's socket is IPv6 and it is used to send 352 // IPv4 multicast datagrams. The IP_XXX socket options are set on a 353 // best effort basis. 354 boolean needToSetIPv4Option = (family != Net.UNSPEC) 355 && (this.family == StandardProtocolFamily.INET6) 356 && Net.shouldSetBothIPv4AndIPv6Options(); 357 358 // outgoing multicast interface 359 if (name == StandardSocketOptions.IP_MULTICAST_IF) { 360 assert family != Net.UNSPEC; 361 NetworkInterface interf = (NetworkInterface) value; 362 if (family == StandardProtocolFamily.INET6) { 363 int index = interf.getIndex(); 364 if (index == -1) 365 throw new IOException("Network interface cannot be identified"); 366 Net.setInterface6(fd, index); 367 } 368 if (family == StandardProtocolFamily.INET || needToSetIPv4Option) { 369 // need IPv4 address to identify interface 370 Inet4Address target = Net.anyInet4Address(interf); 371 if (target != null) { 372 try { 373 Net.setInterface4(fd, Net.inet4AsInt(target)); 374 } catch (IOException ioe) { 375 if (family == StandardProtocolFamily.INET) throw ioe; 376 } 377 } else if (family == StandardProtocolFamily.INET) { 378 throw new IOException("Network interface not configured for IPv4"); 379 } 380 } 381 return this; 382 } 383 384 // SO_REUSEADDR needs special handling as it may be emulated 385 if (name == StandardSocketOptions.SO_REUSEADDR 386 && Net.useExclusiveBind() && localAddress != null) { 387 reuseAddressEmulated = true; 388 this.isReuseAddress = (Boolean)value; 389 } 390 391 // remaining options don't need any special handling 392 Net.setSocketOption(fd, family, name, value); 393 if (needToSetIPv4Option && family != StandardProtocolFamily.INET) { 394 try { 395 Net.setSocketOption(fd, StandardProtocolFamily.INET, name, value); 396 } catch (IOException ignore) { } 397 } 398 399 return this; 400 } 401 } 402 403 @Override 404 @SuppressWarnings("unchecked") 405 public <T> T getOption(SocketOption<T> name) 406 throws IOException 407 { 408 Objects.requireNonNull(name); 409 if (!supportedOptions().contains(name)) 410 throw new UnsupportedOperationException("'" + name + "' not supported"); 411 412 synchronized (stateLock) { 413 ensureOpen(); 414 415 ProtocolFamily family = familyFor(name); 416 417 if (name == StandardSocketOptions.IP_MULTICAST_IF) { 418 if (family == StandardProtocolFamily.INET) { 419 int address = Net.getInterface4(fd); 420 if (address == 0) 421 return null; // default interface 422 423 InetAddress ia = Net.inet4FromInt(address); 424 NetworkInterface ni = NetworkInterface.getByInetAddress(ia); 425 if (ni == null) 426 throw new IOException("Unable to map address to interface"); 427 return (T) ni; 428 } else { 429 int index = Net.getInterface6(fd); 430 if (index == 0) 431 return null; // default interface 432 433 NetworkInterface ni = NetworkInterface.getByIndex(index); 434 if (ni == null) 435 throw new IOException("Unable to map index to interface"); 436 return (T) ni; 437 } 438 } 439 440 if (name == StandardSocketOptions.SO_REUSEADDR && reuseAddressEmulated) { 441 return (T) Boolean.valueOf(isReuseAddress); 442 } 443 444 // no special handling 445 return (T) Net.getSocketOption(fd, family, name); 446 } 447 } 448 449 private static class DefaultOptionsHolder { 450 static final Set<SocketOption<?>> defaultOptions = defaultOptions(); 451 452 private static Set<SocketOption<?>> defaultOptions() { 453 HashSet<SocketOption<?>> set = new HashSet<>(); 454 set.add(StandardSocketOptions.SO_SNDBUF); 455 set.add(StandardSocketOptions.SO_RCVBUF); 456 set.add(StandardSocketOptions.SO_REUSEADDR); 457 if (Net.isReusePortAvailable()) { 458 set.add(StandardSocketOptions.SO_REUSEPORT); 459 } 460 set.add(StandardSocketOptions.SO_BROADCAST); 461 set.add(StandardSocketOptions.IP_TOS); 462 set.add(StandardSocketOptions.IP_MULTICAST_IF); 463 set.add(StandardSocketOptions.IP_MULTICAST_TTL); 464 set.add(StandardSocketOptions.IP_MULTICAST_LOOP); 465 set.addAll(ExtendedSocketOptions.datagramSocketOptions()); 466 return Collections.unmodifiableSet(set); 467 } 468 } 469 470 @Override 471 public final Set<SocketOption<?>> supportedOptions() { 472 return DefaultOptionsHolder.defaultOptions; 473 } 474 475 @Override 476 public void park(int event, long nanos) throws IOException { 477 Thread thread = Thread.currentThread(); 478 if (thread.isVirtual()) { 479 Poller.poll(getFDVal(), event, nanos, this::isOpen); 480 // DatagramSocket throws when virtual thread interrupted 481 if (!interruptible && thread.isInterrupted()) { 482 throw new InterruptedIOException(); 483 } 484 } else { 485 long millis; 486 if (nanos == 0) { 487 millis = -1; 488 } else { 489 millis = NANOSECONDS.toMillis(nanos); 490 if (nanos > MILLISECONDS.toNanos(millis)) { 491 // Round up any excess nanos to the nearest millisecond to 492 // avoid parking for less than requested. 493 millis++; 494 } 495 } 496 Net.poll(getFD(), event, millis); 497 } 498 } 499 500 /** 501 * Marks the beginning of a read operation that might block. 502 * 503 * @param blocking true if configured blocking 504 * @param mustBeConnected true if the socket must be connected 505 * @return remote address if connected 506 * @throws ClosedChannelException if the channel is closed 507 * @throws NotYetConnectedException if mustBeConnected and not connected 508 * @throws IOException if socket not bound and cannot be bound 509 */ 510 private SocketAddress beginRead(boolean blocking, boolean mustBeConnected) 511 throws IOException 512 { 513 if (blocking && interruptible) { 514 // set hook for Thread.interrupt 515 begin(); 516 } 517 SocketAddress remote; 518 synchronized (stateLock) { 519 ensureOpen(); 520 remote = remoteAddress; 521 if ((remote == null) && mustBeConnected) 522 throw new NotYetConnectedException(); 523 if (localAddress == null) 524 bindInternal(null); 525 if (blocking) 526 readerThread = NativeThread.current(); 527 } 528 return remote; 529 } 530 531 /** 532 * Marks the end of a read operation that may have blocked. 533 * 534 * @throws AsynchronousCloseException if the channel was closed asynchronously 535 */ 536 private void endRead(boolean blocking, boolean completed) 537 throws AsynchronousCloseException 538 { 539 if (blocking) { 540 synchronized (stateLock) { 541 readerThread = 0; 542 if (state == ST_CLOSING) { 543 tryFinishClose(); 544 } 545 } 546 if (interruptible) { 547 // remove hook for Thread.interrupt (may throw AsynchronousCloseException) 548 end(completed); 549 } else if (!completed && !isOpen()) { 550 throw new AsynchronousCloseException(); 551 } 552 } 553 } 554 555 @Override 556 public SocketAddress receive(ByteBuffer dst) throws IOException { 557 if (dst.isReadOnly()) 558 throw new IllegalArgumentException("Read-only buffer"); 559 readLock.lock(); 560 try { 561 ensureOpen(); 562 boolean blocking = isBlocking(); 563 SocketAddress sender = null; 564 try { 565 SocketAddress remote = beginRead(blocking, false); 566 configureSocketNonBlockingIfVirtualThread(); 567 boolean connected = (remote != null); 568 int n = receive(dst, connected); 569 if (blocking) { 570 while (IOStatus.okayToRetry(n) && isOpen()) { 571 park(Net.POLLIN); 572 n = receive(dst, connected); 573 } 574 } 575 if (n > 0 || (n == 0 && isOpen())) { 576 // sender address is in socket address buffer 577 sender = sourceSocketAddress(); 578 } 579 return sender; 580 } finally { 581 endRead(blocking, (sender != null)); 582 } 583 } finally { 584 readLock.unlock(); 585 } 586 } 587 588 /** 589 * Receives a datagram. 590 * 591 * @apiNote This method is for use by the socket adaptor. 592 * 593 * @throws IllegalBlockingModeException if the channel is non-blocking 594 * @throws SocketTimeoutException if the timeout elapses 595 */ 596 void blockingReceive(DatagramPacket p, long nanos) throws IOException { 597 assert Thread.holdsLock(p) && nanos >= 0; 598 599 readLock.lock(); 600 try { 601 ensureOpen(); 602 if (!isBlocking()) 603 throw new IllegalBlockingModeException(); 604 605 // underlying socket needs to be non-blocking if timed receive or virtual thread 606 if (nanos > 0) { 607 configureSocketNonBlocking(); 608 } else { 609 configureSocketNonBlockingIfVirtualThread(); 610 } 611 612 boolean completed = false; 613 try { 614 SocketAddress remote = beginRead(true, false); 615 boolean connected = (remote != null); 616 617 // p.bufLength is the maximum size of the datagram that can be received 618 int bufLength = DatagramPackets.getBufLength(p); 619 ByteBuffer dst = tryBlockingReceive(connected, bufLength, nanos); 620 if (dst != null) { 621 // copy to DatagramPacket, set length and sender 622 try { 623 int len = dst.limit(); 624 dst.get(p.getData(), p.getOffset(), len); 625 DatagramPackets.setLength(p, len); 626 p.setSocketAddress(sourceSocketAddress()); 627 } finally { 628 Util.offerFirstTemporaryDirectBuffer(dst); 629 } 630 completed = true; 631 } 632 633 } finally { 634 endRead(true, completed); 635 } 636 637 } finally { 638 readLock.unlock(); 639 } 640 } 641 642 /** 643 * Attempt to receive a datagram. 644 * 645 * @param connected if the channel's socket is connected 646 * @param len the maximum size of the datagram to receive 647 * @param nanos the timeout, should be Long.MAX_VALUE for untimed 648 * @return a direct buffer containing the datagram or null if channel is closed 649 * @throws SocketTimeoutException if the timeout elapses 650 */ 651 private ByteBuffer tryBlockingReceive(boolean connected, int len, long nanos) 652 throws IOException 653 { 654 assert nanos >= 0; 655 long startNanos = System.nanoTime(); 656 ByteBuffer dst = Util.getTemporaryDirectBuffer(len); 657 int n = -1; 658 try { 659 n = receive(dst, connected); 660 while (n == IOStatus.UNAVAILABLE && isOpen()) { 661 // virtual thread needs to release temporary direct buffer before parking 662 if (Thread.currentThread().isVirtual()) { 663 Util.offerFirstTemporaryDirectBuffer(dst); 664 dst = null; 665 } 666 if (nanos > 0) { 667 long remainingNanos = nanos - (System.nanoTime() - startNanos); 668 if (remainingNanos <= 0) { 669 throw new SocketTimeoutException("Receive timed out"); 670 } 671 park(Net.POLLIN, remainingNanos); 672 } else { 673 park(Net.POLLIN); 674 } 675 // virtual thread needs to re-allocate temporary direct buffer after parking 676 if (Thread.currentThread().isVirtual()) { 677 dst = Util.getTemporaryDirectBuffer(len); 678 } 679 n = receive(dst, connected); 680 } 681 dst.flip(); 682 } finally { 683 // release buffer if no datagram received 684 if (dst != null && (n < 0 || (n == 0 && !isOpen()))) { 685 Util.offerFirstTemporaryDirectBuffer(dst); 686 dst = null; 687 } 688 } 689 return dst; 690 } 691 692 /** 693 * Receives a datagram into the buffer. 694 * @param connected true if the channel is connected 695 */ 696 private int receive(ByteBuffer dst, boolean connected) throws IOException { 697 int pos = dst.position(); 698 int lim = dst.limit(); 699 assert (pos <= lim); 700 int rem = (pos <= lim ? lim - pos : 0); 701 if (dst instanceof DirectBuffer && rem > 0) 702 return receiveIntoNativeBuffer(dst, rem, pos, connected); 703 704 // Substitute a native buffer. If the supplied buffer is empty 705 // we must instead use a nonempty buffer, otherwise the call 706 // will not block waiting for a datagram on some platforms. 707 int newSize = Math.max(rem, 1); 708 ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize); 709 try { 710 int n = receiveIntoNativeBuffer(bb, newSize, 0, connected); 711 bb.flip(); 712 if (n > 0 && rem > 0) 713 dst.put(bb); 714 return n; 715 } finally { 716 Util.releaseTemporaryDirectBuffer(bb); 717 } 718 } 719 720 /** 721 * Receives a datagram into a direct buffer. 722 */ 723 private int receiveIntoNativeBuffer(ByteBuffer bb, int rem, int pos, 724 boolean connected) 725 throws IOException 726 { 727 NIO_ACCESS.acquireSession(bb); 728 try { 729 long bufAddress = NIO_ACCESS.getBufferAddress(bb); 730 int n = receive0(fd, 731 bufAddress + pos, 732 rem, 733 sourceSockAddr.address(), 734 connected); 735 if (n > 0) 736 bb.position(pos + n); 737 return n; 738 } finally { 739 NIO_ACCESS.releaseSession(bb); 740 } 741 } 742 743 /** 744 * Return an InetSocketAddress to represent the source/sender socket address 745 * in sourceSockAddr. Returns the cached InetSocketAddress if the source 746 * address is the same as the cached address. 747 */ 748 private InetSocketAddress sourceSocketAddress() throws IOException { 749 assert readLock.isHeldByCurrentThread(); 750 if (cachedInetSocketAddress != null && sourceSockAddr.equals(cachedSockAddr)) { 751 return cachedInetSocketAddress; 752 } 753 InetSocketAddress isa = sourceSockAddr.decode(); 754 // swap sourceSockAddr and cachedSockAddr 755 NativeSocketAddress tmp = cachedSockAddr; 756 cachedSockAddr = sourceSockAddr; 757 sourceSockAddr = tmp; 758 cachedInetSocketAddress = isa; 759 return isa; 760 } 761 762 @Override 763 public int send(ByteBuffer src, SocketAddress target) 764 throws IOException 765 { 766 Objects.requireNonNull(src); 767 InetSocketAddress isa = Net.checkAddress(target, family); 768 769 writeLock.lock(); 770 try { 771 ensureOpen(); 772 boolean blocking = isBlocking(); 773 int n; 774 boolean completed = false; 775 try { 776 SocketAddress remote = beginWrite(blocking, false); 777 configureSocketNonBlockingIfVirtualThread(); 778 if (remote != null) { 779 // connected 780 if (!target.equals(remote)) { 781 throw new AlreadyConnectedException(); 782 } 783 n = IOUtil.write(fd, src, -1, nd); 784 if (blocking) { 785 while (IOStatus.okayToRetry(n) && isOpen()) { 786 park(Net.POLLOUT); 787 n = IOUtil.write(fd, src, -1, nd); 788 } 789 } 790 completed = (n > 0); 791 } else { 792 // not connected 793 InetAddress ia = isa.getAddress(); 794 if (ia.isLinkLocalAddress()) 795 isa = IPAddressUtil.toScopedAddress(isa); 796 if (isa.getPort() == 0) 797 throw new SocketException("Can't send to port 0"); 798 n = send(fd, src, isa); 799 if (blocking) { 800 while (IOStatus.okayToRetry(n) && isOpen()) { 801 park(Net.POLLOUT); 802 n = send(fd, src, isa); 803 } 804 } 805 completed = (n >= 0); 806 } 807 } finally { 808 endWrite(blocking, completed); 809 } 810 assert n >= 0 || n == IOStatus.UNAVAILABLE; 811 return IOStatus.normalize(n); 812 } finally { 813 writeLock.unlock(); 814 } 815 } 816 817 /** 818 * Sends a datagram. 819 * 820 * @apiNote This method is for use by the socket adaptor. 821 * 822 * @throws IllegalArgumentException if not connected and target address not set 823 * @throws IllegalBlockingModeException if the channel is non-blocking 824 */ 825 void blockingSend(DatagramPacket p) throws IOException { 826 assert Thread.holdsLock(p); 827 828 writeLock.lock(); 829 try { 830 ensureOpen(); 831 if (!isBlocking()) 832 throw new IllegalBlockingModeException(); 833 834 int len = p.getLength(); 835 ByteBuffer src = Util.getTemporaryDirectBuffer(len); 836 try { 837 // copy bytes to temporary direct buffer 838 src.put(p.getData(), p.getOffset(), len); 839 src.flip(); 840 841 // target address 842 InetSocketAddress target; 843 if (p.getAddress() == null) { 844 InetSocketAddress remote = remoteAddress(); 845 if (remote == null) { 846 throw new IllegalArgumentException("Address not set"); 847 } 848 // set address/port to be compatible with long-standing behavior 849 p.setAddress(remote.getAddress()); 850 p.setPort(remote.getPort()); 851 target = remote; 852 } else { 853 target = (InetSocketAddress) p.getSocketAddress(); 854 } 855 856 // send the datagram (does not block) 857 send(src, target); 858 } finally { 859 Util.offerFirstTemporaryDirectBuffer(src); 860 } 861 862 } finally { 863 writeLock.unlock(); 864 } 865 } 866 867 private int send(FileDescriptor fd, ByteBuffer src, InetSocketAddress target) 868 throws IOException 869 { 870 if (src instanceof DirectBuffer) 871 return sendFromNativeBuffer(fd, src, target); 872 873 // Substitute a native buffer 874 int pos = src.position(); 875 int lim = src.limit(); 876 assert (pos <= lim); 877 int rem = (pos <= lim ? lim - pos : 0); 878 879 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); 880 try { 881 bb.put(src); 882 bb.flip(); 883 // Do not update src until we see how many bytes were written 884 src.position(pos); 885 886 int n = sendFromNativeBuffer(fd, bb, target); 887 if (n > 0) { 888 // now update src 889 src.position(pos + n); 890 } 891 return n; 892 } finally { 893 Util.releaseTemporaryDirectBuffer(bb); 894 } 895 } 896 897 /** 898 * Send a datagram contained in a direct buffer. 899 */ 900 private int sendFromNativeBuffer(FileDescriptor fd, ByteBuffer bb, 901 InetSocketAddress target) 902 throws IOException 903 { 904 int pos = bb.position(); 905 int lim = bb.limit(); 906 assert (pos <= lim); 907 int rem = (pos <= lim ? lim - pos : 0); 908 909 int written; 910 NIO_ACCESS.acquireSession(bb); 911 try { 912 long bufAddress = NIO_ACCESS.getBufferAddress(bb); 913 int addressLen = targetSocketAddress(target); 914 written = send0(fd, 915 bufAddress + pos, 916 rem, 917 targetSockAddr.address(), 918 addressLen); 919 } catch (PortUnreachableException pue) { 920 if (isConnected()) 921 throw pue; 922 written = rem; 923 } finally { 924 NIO_ACCESS.releaseSession(bb); 925 } 926 if (written > 0) 927 bb.position(pos + written); 928 return written; 929 } 930 931 /** 932 * Encodes the given InetSocketAddress into targetSockAddr, returning the 933 * length of the sockaddr structure (sizeof struct sockaddr or sockaddr6). 934 */ 935 private int targetSocketAddress(InetSocketAddress isa) { 936 assert writeLock.isHeldByCurrentThread(); 937 // Nothing to do if target address is already in the buffer. Use 938 // identity rather than equals as Inet6Address.equals ignores scope_id. 939 if (isa == previousTarget) 940 return previousSockAddrLength; 941 previousTarget = null; 942 int len = targetSockAddr.encode(family, isa); 943 previousTarget = isa; 944 previousSockAddrLength = len; 945 return len; 946 } 947 948 @Override 949 public int read(ByteBuffer buf) throws IOException { 950 Objects.requireNonNull(buf); 951 952 readLock.lock(); 953 try { 954 ensureOpen(); 955 boolean blocking = isBlocking(); 956 int n = 0; 957 try { 958 beginRead(blocking, true); 959 configureSocketNonBlockingIfVirtualThread(); 960 n = IOUtil.read(fd, buf, -1, nd); 961 if (blocking) { 962 while (IOStatus.okayToRetry(n) && isOpen()) { 963 park(Net.POLLIN); 964 n = IOUtil.read(fd, buf, -1, nd); 965 } 966 } 967 } finally { 968 endRead(blocking, n > 0); 969 assert IOStatus.check(n); 970 } 971 return IOStatus.normalize(n); 972 } finally { 973 readLock.unlock(); 974 } 975 } 976 977 @Override 978 public long read(ByteBuffer[] dsts, int offset, int length) 979 throws IOException 980 { 981 Objects.checkFromIndexSize(offset, length, dsts.length); 982 983 readLock.lock(); 984 try { 985 ensureOpen(); 986 boolean blocking = isBlocking(); 987 long n = 0; 988 try { 989 beginRead(blocking, true); 990 configureSocketNonBlockingIfVirtualThread(); 991 n = IOUtil.read(fd, dsts, offset, length, nd); 992 if (blocking) { 993 while (IOStatus.okayToRetry(n) && isOpen()) { 994 park(Net.POLLIN); 995 n = IOUtil.read(fd, dsts, offset, length, nd); 996 } 997 } 998 } finally { 999 endRead(blocking, n > 0); 1000 assert IOStatus.check(n); 1001 } 1002 return IOStatus.normalize(n); 1003 } finally { 1004 readLock.unlock(); 1005 } 1006 } 1007 1008 /** 1009 * Marks the beginning of a write operation that might block. 1010 * @param blocking true if configured blocking 1011 * @param mustBeConnected true if the socket must be connected 1012 * @return remote address if connected 1013 * @throws ClosedChannelException if the channel is closed 1014 * @throws NotYetConnectedException if mustBeConnected and not connected 1015 * @throws IOException if socket not bound and cannot be bound 1016 */ 1017 private SocketAddress beginWrite(boolean blocking, boolean mustBeConnected) 1018 throws IOException 1019 { 1020 if (blocking && interruptible) { 1021 // set hook for Thread.interrupt 1022 begin(); 1023 } 1024 SocketAddress remote; 1025 synchronized (stateLock) { 1026 ensureOpen(); 1027 remote = remoteAddress; 1028 if ((remote == null) && mustBeConnected) 1029 throw new NotYetConnectedException(); 1030 if (localAddress == null) 1031 bindInternal(null); 1032 if (blocking) 1033 writerThread = NativeThread.current(); 1034 } 1035 return remote; 1036 } 1037 1038 /** 1039 * Marks the end of a write operation that may have blocked. 1040 * 1041 * @throws AsynchronousCloseException if the channel was closed asynchronously 1042 */ 1043 private void endWrite(boolean blocking, boolean completed) 1044 throws AsynchronousCloseException 1045 { 1046 if (blocking) { 1047 synchronized (stateLock) { 1048 writerThread = 0; 1049 if (state == ST_CLOSING) { 1050 tryFinishClose(); 1051 } 1052 } 1053 1054 if (interruptible) { 1055 // remove hook for Thread.interrupt (may throw AsynchronousCloseException) 1056 end(completed); 1057 } else if (!completed && !isOpen()) { 1058 throw new AsynchronousCloseException(); 1059 } 1060 } 1061 } 1062 1063 @Override 1064 public int write(ByteBuffer buf) throws IOException { 1065 Objects.requireNonNull(buf); 1066 1067 writeLock.lock(); 1068 try { 1069 ensureOpen(); 1070 boolean blocking = isBlocking(); 1071 int n = 0; 1072 try { 1073 beginWrite(blocking, true); 1074 configureSocketNonBlockingIfVirtualThread(); 1075 n = IOUtil.write(fd, buf, -1, nd); 1076 if (blocking) { 1077 while (IOStatus.okayToRetry(n) && isOpen()) { 1078 park(Net.POLLOUT); 1079 n = IOUtil.write(fd, buf, -1, nd); 1080 } 1081 } 1082 } finally { 1083 endWrite(blocking, n > 0); 1084 assert IOStatus.check(n); 1085 } 1086 return IOStatus.normalize(n); 1087 } finally { 1088 writeLock.unlock(); 1089 } 1090 } 1091 1092 @Override 1093 public long write(ByteBuffer[] srcs, int offset, int length) 1094 throws IOException 1095 { 1096 Objects.checkFromIndexSize(offset, length, srcs.length); 1097 1098 writeLock.lock(); 1099 try { 1100 ensureOpen(); 1101 boolean blocking = isBlocking(); 1102 long n = 0; 1103 try { 1104 beginWrite(blocking, true); 1105 configureSocketNonBlockingIfVirtualThread(); 1106 n = IOUtil.write(fd, srcs, offset, length, nd); 1107 if (blocking) { 1108 while (IOStatus.okayToRetry(n) && isOpen()) { 1109 park(Net.POLLOUT); 1110 n = IOUtil.write(fd, srcs, offset, length, nd); 1111 } 1112 } 1113 } finally { 1114 endWrite(blocking, n > 0); 1115 assert IOStatus.check(n); 1116 } 1117 return IOStatus.normalize(n); 1118 } finally { 1119 writeLock.unlock(); 1120 } 1121 } 1122 1123 @Override 1124 protected void implConfigureBlocking(boolean block) throws IOException { 1125 readLock.lock(); 1126 try { 1127 writeLock.lock(); 1128 try { 1129 lockedConfigureBlocking(block); 1130 } finally { 1131 writeLock.unlock(); 1132 } 1133 } finally { 1134 readLock.unlock(); 1135 } 1136 } 1137 1138 /** 1139 * Adjusts the blocking mode. readLock or writeLock must already be held. 1140 */ 1141 private void lockedConfigureBlocking(boolean block) throws IOException { 1142 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); 1143 synchronized (stateLock) { 1144 ensureOpen(); 1145 // do nothing if virtual thread has forced the socket to be non-blocking 1146 if (!forcedNonBlocking) { 1147 IOUtil.configureBlocking(fd, block); 1148 } 1149 } 1150 } 1151 1152 /** 1153 * Attempts to adjust the blocking mode if the channel is open. 1154 * @return {@code true} if the blocking mode was adjusted 1155 */ 1156 private boolean tryLockedConfigureBlocking(boolean block) throws IOException { 1157 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); 1158 synchronized (stateLock) { 1159 if (!forcedNonBlocking && isOpen()) { 1160 IOUtil.configureBlocking(fd, block); 1161 return true; 1162 } else { 1163 return false; 1164 } 1165 } 1166 } 1167 1168 /** 1169 * Ensures that the socket is configured non-blocking. 1170 * @throws IOException if there is an I/O error changing the blocking mode 1171 */ 1172 private void configureSocketNonBlocking() throws IOException { 1173 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); 1174 if (!forcedNonBlocking) { 1175 synchronized (stateLock) { 1176 ensureOpen(); 1177 IOUtil.configureBlocking(fd, false); 1178 forcedNonBlocking = true; 1179 } 1180 } 1181 } 1182 1183 /** 1184 * Ensures that the socket is configured non-blocking when on a virtual thread. 1185 * @throws IOException if there is an I/O error changing the blocking mode 1186 */ 1187 private void configureSocketNonBlockingIfVirtualThread() throws IOException { 1188 if (Thread.currentThread().isVirtual()) { 1189 configureSocketNonBlocking(); 1190 } 1191 } 1192 1193 InetSocketAddress localAddress() { 1194 synchronized (stateLock) { 1195 return localAddress; 1196 } 1197 } 1198 1199 InetSocketAddress remoteAddress() { 1200 synchronized (stateLock) { 1201 return remoteAddress; 1202 } 1203 } 1204 1205 @Override 1206 public DatagramChannel bind(SocketAddress local) throws IOException { 1207 readLock.lock(); 1208 try { 1209 writeLock.lock(); 1210 try { 1211 synchronized (stateLock) { 1212 ensureOpen(); 1213 if (localAddress != null) 1214 throw new AlreadyBoundException(); 1215 bindInternal(local); 1216 } 1217 } finally { 1218 writeLock.unlock(); 1219 } 1220 } finally { 1221 readLock.unlock(); 1222 } 1223 return this; 1224 } 1225 1226 private void bindInternal(SocketAddress local) throws IOException { 1227 assert Thread.holdsLock(stateLock )&& (localAddress == null); 1228 1229 InetSocketAddress isa; 1230 if (local == null) { 1231 // only Inet4Address allowed with IPv4 socket 1232 if (family == StandardProtocolFamily.INET) { 1233 isa = new InetSocketAddress(InetAddress.getByName("0.0.0.0"), 0); 1234 } else { 1235 isa = new InetSocketAddress(0); 1236 } 1237 } else { 1238 isa = Net.checkAddress(local, family); 1239 } 1240 1241 Net.bind(family, fd, isa.getAddress(), isa.getPort()); 1242 localAddress = Net.localAddress(fd); 1243 } 1244 1245 @Override 1246 public boolean isConnected() { 1247 synchronized (stateLock) { 1248 return (state == ST_CONNECTED); 1249 } 1250 } 1251 1252 @Override 1253 public DatagramChannel connect(SocketAddress sa) throws IOException { 1254 return connect(sa, true); 1255 } 1256 1257 /** 1258 * Connects the channel's socket. 1259 * 1260 * @param sa the remote address to which this channel is to be connected 1261 * @param check true to check if the channel is already connected. 1262 */ 1263 DatagramChannel connect(SocketAddress sa, boolean check) throws IOException { 1264 InetSocketAddress isa = Net.checkAddress(sa, family); 1265 1266 readLock.lock(); 1267 try { 1268 writeLock.lock(); 1269 try { 1270 synchronized (stateLock) { 1271 ensureOpen(); 1272 if (check && state == ST_CONNECTED) 1273 throw new AlreadyConnectedException(); 1274 if (isa.getPort() == 0) 1275 throw new SocketException("Can't connect to port 0"); 1276 1277 // ensure that the socket is bound 1278 if (localAddress == null) { 1279 bindInternal(null); 1280 } 1281 1282 // capture local address before connect 1283 initialLocalAddress = localAddress; 1284 1285 int n = Net.connect(family, 1286 fd, 1287 isa.getAddress(), 1288 isa.getPort()); 1289 if (n <= 0) 1290 throw new Error(); // Can't happen 1291 1292 // connected 1293 remoteAddress = isa; 1294 state = ST_CONNECTED; 1295 1296 // refresh local address 1297 localAddress = Net.localAddress(fd); 1298 1299 // flush any packets already received. 1300 boolean blocking = isBlocking(); 1301 if (blocking) { 1302 lockedConfigureBlocking(false); 1303 } 1304 try { 1305 ByteBuffer buf = ByteBuffer.allocate(100); 1306 while (receive(buf, false) >= 0) { 1307 buf.clear(); 1308 } 1309 } finally { 1310 if (blocking) { 1311 tryLockedConfigureBlocking(true); 1312 } 1313 } 1314 } 1315 } finally { 1316 writeLock.unlock(); 1317 } 1318 } finally { 1319 readLock.unlock(); 1320 } 1321 return this; 1322 } 1323 1324 @Override 1325 public DatagramChannel disconnect() throws IOException { 1326 readLock.lock(); 1327 try { 1328 writeLock.lock(); 1329 try { 1330 synchronized (stateLock) { 1331 if (!isOpen() || (state != ST_CONNECTED)) 1332 return this; 1333 1334 // disconnect socket 1335 boolean isIPv6 = (family == StandardProtocolFamily.INET6); 1336 disconnect0(fd, isIPv6); 1337 1338 // no longer connected 1339 remoteAddress = null; 1340 state = ST_UNCONNECTED; 1341 1342 // refresh localAddress, should be same as it was prior to connect 1343 localAddress = Net.localAddress(fd); 1344 try { 1345 if (!localAddress.equals(initialLocalAddress)) { 1346 // Workaround connect(2) issues on Linux and macOS 1347 repairSocket(initialLocalAddress); 1348 assert (localAddress != null) 1349 && localAddress.equals(Net.localAddress(fd)) 1350 && localAddress.equals(initialLocalAddress); 1351 } 1352 } finally { 1353 initialLocalAddress = null; 1354 } 1355 } 1356 } finally { 1357 writeLock.unlock(); 1358 } 1359 } finally { 1360 readLock.unlock(); 1361 } 1362 return this; 1363 } 1364 1365 /** 1366 * "Repair" the channel's socket after a disconnect that didn't restore the 1367 * local address. 1368 * 1369 * On Linux, connect(2) dissolves the association but changes the local port 1370 * to 0 when it was initially bound to an ephemeral port. The workaround here 1371 * is to rebind to the original port. 1372 * 1373 * On macOS, connect(2) dissolves the association but rebinds the socket to 1374 * the wildcard address when it was initially bound to a specific address. 1375 * The workaround here is to re-create the socket. 1376 */ 1377 private void repairSocket(InetSocketAddress target) 1378 throws IOException 1379 { 1380 assert Thread.holdsLock(stateLock); 1381 1382 // Linux: try to bind the socket to the original address/port 1383 if (localAddress.getPort() == 0) { 1384 assert localAddress.getAddress().equals(target.getAddress()); 1385 Net.bind(family, fd, target.getAddress(), target.getPort()); 1386 localAddress = Net.localAddress(fd); 1387 return; 1388 } 1389 1390 // capture the value of all existing socket options 1391 Map<SocketOption<?>, Object> map = new HashMap<>(); 1392 for (SocketOption<?> option : supportedOptions()) { 1393 Object value = getOption(option); 1394 if (value != null) { 1395 map.put(option, value); 1396 } 1397 } 1398 1399 // macOS: re-create the socket. 1400 FileDescriptor newfd = Net.socket(family, false); 1401 try { 1402 // copy the socket options that are protocol family agnostic 1403 for (Map.Entry<SocketOption<?>, Object> e : map.entrySet()) { 1404 SocketOption<?> option = e.getKey(); 1405 if (SocketOptionRegistry.findOption(option, Net.UNSPEC) != null) { 1406 Object value = e.getValue(); 1407 try { 1408 Net.setSocketOption(newfd, Net.UNSPEC, option, value); 1409 } catch (IOException ignore) { } 1410 } 1411 } 1412 1413 // copy the blocking mode 1414 if (!isBlocking() || forcedNonBlocking) { 1415 IOUtil.configureBlocking(newfd, false); 1416 } 1417 1418 // dup this channel's socket to the new socket. If this succeeds then 1419 // fd will reference the new socket. If it fails then it will still 1420 // reference the old socket. 1421 nd.dup(newfd, fd); 1422 } finally { 1423 // release the file descriptor 1424 nd.close(newfd); 1425 } 1426 1427 // bind to the original local address 1428 try { 1429 Net.bind(family, fd, target.getAddress(), target.getPort()); 1430 } catch (IOException ioe) { 1431 // bind failed, socket is left unbound 1432 localAddress = null; 1433 throw ioe; 1434 } 1435 1436 // restore local address 1437 localAddress = Net.localAddress(fd); 1438 1439 // restore all socket options (including those set in first pass) 1440 for (Map.Entry<SocketOption<?>, Object> e : map.entrySet()) { 1441 @SuppressWarnings("unchecked") 1442 SocketOption<Object> option = (SocketOption<Object>) e.getKey(); 1443 Object value = e.getValue(); 1444 try { 1445 setOption(option, value); 1446 } catch (IOException ignore) { } 1447 } 1448 1449 // restore multicast group membership 1450 MembershipRegistry registry = this.registry; 1451 if (registry != null) { 1452 registry.forEach(k -> { 1453 if (k instanceof MembershipKeyImpl.Type6) { 1454 MembershipKeyImpl.Type6 key6 = (MembershipKeyImpl.Type6) k; 1455 Net.join6(fd, key6.groupAddress(), key6.index(), key6.source()); 1456 } else { 1457 MembershipKeyImpl.Type4 key4 = (MembershipKeyImpl.Type4) k; 1458 Net.join4(fd, key4.groupAddress(), key4.interfaceAddress(), key4.source()); 1459 } 1460 }); 1461 } 1462 1463 // reset registration in all Selectors that this channel is registered with 1464 AbstractSelectableChannels.forEach(this, SelectionKeyImpl::reset); 1465 } 1466 1467 /** 1468 * Defines static methods to access AbstractSelectableChannel non-public members. 1469 */ 1470 private static class AbstractSelectableChannels { 1471 private static final Method FOREACH; 1472 static { 1473 try { 1474 Method m = AbstractSelectableChannel.class.getDeclaredMethod("forEach", Consumer.class); 1475 m.setAccessible(true); 1476 FOREACH = m; 1477 } catch (Exception e) { 1478 throw new InternalError(e); 1479 } 1480 } 1481 static void forEach(AbstractSelectableChannel ch, Consumer<SelectionKeyImpl> action) { 1482 try { 1483 FOREACH.invoke(ch, action); 1484 } catch (Exception e) { 1485 throw new InternalError(e); 1486 } 1487 } 1488 } 1489 1490 /** 1491 * Joins channel's socket to the given group/interface and 1492 * optional source address. 1493 */ 1494 private MembershipKey innerJoin(InetAddress group, 1495 NetworkInterface interf, 1496 InetAddress source) 1497 throws IOException 1498 { 1499 if (!group.isMulticastAddress()) 1500 throw new IllegalArgumentException("Group not a multicast address"); 1501 1502 // check multicast address is compatible with this socket 1503 if (group instanceof Inet4Address) { 1504 if (family == StandardProtocolFamily.INET6 && !Net.canIPv6SocketJoinIPv4Group()) 1505 throw new IllegalArgumentException("IPv6 socket cannot join IPv4 multicast group"); 1506 } else if (group instanceof Inet6Address) { 1507 if (family != StandardProtocolFamily.INET6) 1508 throw new IllegalArgumentException("Only IPv6 sockets can join IPv6 multicast group"); 1509 } else { 1510 throw new IllegalArgumentException("Address type not supported"); 1511 } 1512 1513 // check source address 1514 if (source != null) { 1515 if (source.isAnyLocalAddress()) 1516 throw new IllegalArgumentException("Source address is a wildcard address"); 1517 if (source.isMulticastAddress()) 1518 throw new IllegalArgumentException("Source address is multicast address"); 1519 if (source.getClass() != group.getClass()) 1520 throw new IllegalArgumentException("Source address is different type to group"); 1521 } 1522 1523 synchronized (stateLock) { 1524 ensureOpen(); 1525 1526 // check the registry to see if we are already a member of the group 1527 if (registry == null) { 1528 registry = new MembershipRegistry(); 1529 } else { 1530 // return existing membership key 1531 MembershipKey key = registry.checkMembership(group, interf, source); 1532 if (key != null) 1533 return key; 1534 } 1535 1536 MembershipKeyImpl key; 1537 if ((family == StandardProtocolFamily.INET6) && 1538 ((group instanceof Inet6Address) || Net.canJoin6WithIPv4Group())) 1539 { 1540 int index = interf.getIndex(); 1541 if (index == -1) 1542 throw new IOException("Network interface cannot be identified"); 1543 1544 // need multicast and source address as byte arrays 1545 byte[] groupAddress = Net.inet6AsByteArray(group); 1546 byte[] sourceAddress = (source == null) ? null : 1547 Net.inet6AsByteArray(source); 1548 1549 // join the group 1550 int n = Net.join6(fd, groupAddress, index, sourceAddress); 1551 if (n == IOStatus.UNAVAILABLE) 1552 throw new UnsupportedOperationException(); 1553 1554 key = new MembershipKeyImpl.Type6(this, group, interf, source, 1555 groupAddress, index, sourceAddress); 1556 1557 } else { 1558 // need IPv4 address to identify interface 1559 Inet4Address target = Net.anyInet4Address(interf); 1560 if (target == null) 1561 throw new IOException("Network interface not configured for IPv4"); 1562 1563 int groupAddress = Net.inet4AsInt(group); 1564 int targetAddress = Net.inet4AsInt(target); 1565 int sourceAddress = (source == null) ? 0 : Net.inet4AsInt(source); 1566 1567 // join the group 1568 int n = Net.join4(fd, groupAddress, targetAddress, sourceAddress); 1569 if (n == IOStatus.UNAVAILABLE) 1570 throw new UnsupportedOperationException(); 1571 1572 key = new MembershipKeyImpl.Type4(this, group, interf, source, 1573 groupAddress, targetAddress, sourceAddress); 1574 } 1575 1576 registry.add(key); 1577 return key; 1578 } 1579 } 1580 1581 @Override 1582 public MembershipKey join(InetAddress group, 1583 NetworkInterface interf) 1584 throws IOException 1585 { 1586 return innerJoin(group, interf, null); 1587 } 1588 1589 @Override 1590 public MembershipKey join(InetAddress group, 1591 NetworkInterface interf, 1592 InetAddress source) 1593 throws IOException 1594 { 1595 Objects.requireNonNull(source); 1596 return innerJoin(group, interf, source); 1597 } 1598 1599 // package-private 1600 void drop(MembershipKeyImpl key) { 1601 assert key.channel() == this; 1602 1603 synchronized (stateLock) { 1604 if (!key.isValid()) 1605 return; 1606 1607 try { 1608 if (key instanceof MembershipKeyImpl.Type6) { 1609 MembershipKeyImpl.Type6 key6 = 1610 (MembershipKeyImpl.Type6)key; 1611 Net.drop6(fd, key6.groupAddress(), key6.index(), key6.source()); 1612 } else { 1613 MembershipKeyImpl.Type4 key4 = (MembershipKeyImpl.Type4)key; 1614 Net.drop4(fd, key4.groupAddress(), key4.interfaceAddress(), 1615 key4.source()); 1616 } 1617 } catch (IOException ioe) { 1618 // should not happen 1619 throw new AssertionError(ioe); 1620 } 1621 1622 key.invalidate(); 1623 registry.remove(key); 1624 } 1625 } 1626 1627 /** 1628 * Finds an existing membership of a multicast group. Returns null if this 1629 * channel's socket is not a member of the group. 1630 * 1631 * @apiNote This method is for use by the socket adaptor 1632 */ 1633 MembershipKey findMembership(InetAddress group, NetworkInterface interf) { 1634 synchronized (stateLock) { 1635 if (registry != null) { 1636 return registry.checkMembership(group, interf, null); 1637 } else { 1638 return null; 1639 } 1640 } 1641 } 1642 1643 /** 1644 * Block datagrams from the given source. 1645 */ 1646 void block(MembershipKeyImpl key, InetAddress source) 1647 throws IOException 1648 { 1649 assert key.channel() == this; 1650 assert key.sourceAddress() == null; 1651 1652 synchronized (stateLock) { 1653 if (!key.isValid()) 1654 throw new IllegalStateException("key is no longer valid"); 1655 if (source.isAnyLocalAddress()) 1656 throw new IllegalArgumentException("Source address is a wildcard address"); 1657 if (source.isMulticastAddress()) 1658 throw new IllegalArgumentException("Source address is multicast address"); 1659 if (source.getClass() != key.group().getClass()) 1660 throw new IllegalArgumentException("Source address is different type to group"); 1661 1662 int n; 1663 if (key instanceof MembershipKeyImpl.Type6) { 1664 MembershipKeyImpl.Type6 key6 = 1665 (MembershipKeyImpl.Type6)key; 1666 n = Net.block6(fd, key6.groupAddress(), key6.index(), 1667 Net.inet6AsByteArray(source)); 1668 } else { 1669 MembershipKeyImpl.Type4 key4 = 1670 (MembershipKeyImpl.Type4)key; 1671 n = Net.block4(fd, key4.groupAddress(), key4.interfaceAddress(), 1672 Net.inet4AsInt(source)); 1673 } 1674 if (n == IOStatus.UNAVAILABLE) { 1675 // ancient kernel 1676 throw new UnsupportedOperationException(); 1677 } 1678 } 1679 } 1680 1681 /** 1682 * Unblock the given source. 1683 */ 1684 void unblock(MembershipKeyImpl key, InetAddress source) { 1685 assert key.channel() == this; 1686 assert key.sourceAddress() == null; 1687 1688 synchronized (stateLock) { 1689 if (!key.isValid()) 1690 throw new IllegalStateException("key is no longer valid"); 1691 1692 try { 1693 if (key instanceof MembershipKeyImpl.Type6) { 1694 MembershipKeyImpl.Type6 key6 = 1695 (MembershipKeyImpl.Type6)key; 1696 Net.unblock6(fd, key6.groupAddress(), key6.index(), 1697 Net.inet6AsByteArray(source)); 1698 } else { 1699 MembershipKeyImpl.Type4 key4 = 1700 (MembershipKeyImpl.Type4)key; 1701 Net.unblock4(fd, key4.groupAddress(), key4.interfaceAddress(), 1702 Net.inet4AsInt(source)); 1703 } 1704 } catch (IOException ioe) { 1705 // should not happen 1706 throw new AssertionError(ioe); 1707 } 1708 } 1709 } 1710 1711 /** 1712 * Closes the socket if there are no I/O operations in progress and the 1713 * channel is not registered with a Selector. 1714 */ 1715 private boolean tryClose() throws IOException { 1716 assert Thread.holdsLock(stateLock) && state == ST_CLOSING; 1717 if ((readerThread == 0) && (writerThread == 0) && !isRegistered()) { 1718 state = ST_CLOSED; 1719 try { 1720 // close socket 1721 cleaner.clean(); 1722 } catch (UncheckedIOException ioe) { 1723 throw ioe.getCause(); 1724 } 1725 return true; 1726 } else { 1727 return false; 1728 } 1729 } 1730 1731 /** 1732 * Invokes tryClose to attempt to close the socket. 1733 * 1734 * This method is used for deferred closing by I/O and Selector operations. 1735 */ 1736 private void tryFinishClose() { 1737 try { 1738 tryClose(); 1739 } catch (IOException ignore) { } 1740 } 1741 1742 /** 1743 * Closes this channel when configured in blocking mode. 1744 * 1745 * If there is an I/O operation in progress then the socket is pre-closed 1746 * and the I/O threads signalled, in which case the final close is deferred 1747 * until all I/O operations complete. 1748 */ 1749 private void implCloseBlockingMode() throws IOException { 1750 synchronized (stateLock) { 1751 assert state < ST_CLOSING; 1752 state = ST_CLOSING; 1753 1754 // if member of any multicast groups then invalidate the keys 1755 if (registry != null) 1756 registry.invalidateAll(); 1757 1758 if (!tryClose()) { 1759 nd.preClose(fd, readerThread, writerThread); 1760 } 1761 } 1762 } 1763 1764 /** 1765 * Closes this channel when configured in non-blocking mode. 1766 * 1767 * If the channel is registered with a Selector then the close is deferred 1768 * until the channel is flushed from all Selectors. 1769 */ 1770 private void implCloseNonBlockingMode() throws IOException { 1771 synchronized (stateLock) { 1772 assert state < ST_CLOSING; 1773 state = ST_CLOSING; 1774 1775 // if member of any multicast groups then invalidate the keys 1776 if (registry != null) 1777 registry.invalidateAll(); 1778 } 1779 1780 // wait for any read/write operations to complete before trying to close 1781 readLock.lock(); 1782 readLock.unlock(); 1783 writeLock.lock(); 1784 writeLock.unlock(); 1785 synchronized (stateLock) { 1786 if (state == ST_CLOSING) { 1787 tryClose(); 1788 } 1789 } 1790 } 1791 1792 /** 1793 * Invoked by implCloseChannel to close the channel. 1794 */ 1795 @Override 1796 protected void implCloseSelectableChannel() throws IOException { 1797 assert !isOpen(); 1798 if (isBlocking()) { 1799 implCloseBlockingMode(); 1800 } else { 1801 implCloseNonBlockingMode(); 1802 } 1803 } 1804 1805 @Override 1806 public void kill() { 1807 // wait for any read/write operations to complete before trying to close 1808 readLock.lock(); 1809 readLock.unlock(); 1810 writeLock.lock(); 1811 writeLock.unlock(); 1812 synchronized (stateLock) { 1813 if (state == ST_CLOSING) { 1814 tryFinishClose(); 1815 } 1816 } 1817 } 1818 1819 /** 1820 * Translates native poll revent set into a ready operation set 1821 */ 1822 public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) { 1823 int intOps = ski.nioInterestOps(); 1824 int oldOps = ski.nioReadyOps(); 1825 int newOps = initialOps; 1826 1827 if ((ops & Net.POLLNVAL) != 0) { 1828 // This should only happen if this channel is pre-closed while a 1829 // selection operation is in progress 1830 // ## Throw an error if this channel has not been pre-closed 1831 return false; 1832 } 1833 1834 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) { 1835 newOps = intOps; 1836 ski.nioReadyOps(newOps); 1837 return (newOps & ~oldOps) != 0; 1838 } 1839 1840 if (((ops & Net.POLLIN) != 0) && 1841 ((intOps & SelectionKey.OP_READ) != 0)) 1842 newOps |= SelectionKey.OP_READ; 1843 1844 if (((ops & Net.POLLOUT) != 0) && 1845 ((intOps & SelectionKey.OP_WRITE) != 0)) 1846 newOps |= SelectionKey.OP_WRITE; 1847 1848 ski.nioReadyOps(newOps); 1849 return (newOps & ~oldOps) != 0; 1850 } 1851 1852 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) { 1853 return translateReadyOps(ops, ski.nioReadyOps(), ski); 1854 } 1855 1856 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) { 1857 return translateReadyOps(ops, 0, ski); 1858 } 1859 1860 /** 1861 * Translates an interest operation set into a native poll event set 1862 */ 1863 public int translateInterestOps(int ops) { 1864 int newOps = 0; 1865 if ((ops & SelectionKey.OP_READ) != 0) 1866 newOps |= Net.POLLIN; 1867 if ((ops & SelectionKey.OP_WRITE) != 0) 1868 newOps |= Net.POLLOUT; 1869 if ((ops & SelectionKey.OP_CONNECT) != 0) 1870 newOps |= Net.POLLIN; 1871 return newOps; 1872 } 1873 1874 public FileDescriptor getFD() { 1875 return fd; 1876 } 1877 1878 public int getFDVal() { 1879 return fdVal; 1880 } 1881 1882 /** 1883 * Returns an action to release the given file descriptor and socket addresses. 1884 */ 1885 private static Runnable releaserFor(FileDescriptor fd, NativeSocketAddress... sockAddrs) { 1886 return () -> { 1887 try { 1888 nd.close(fd); 1889 } catch (IOException ioe) { 1890 throw new UncheckedIOException(ioe); 1891 } finally { 1892 // release memory 1893 NativeSocketAddress.freeAll(sockAddrs); 1894 } 1895 }; 1896 } 1897 1898 /** 1899 * Defines static methods to get/set DatagramPacket fields and workaround 1900 * DatagramPacket deficiencies. 1901 */ 1902 private static class DatagramPackets { 1903 private static final VarHandle LENGTH; 1904 private static final VarHandle BUF_LENGTH; 1905 static { 1906 try { 1907 MethodHandles.Lookup l = MethodHandles.privateLookupIn(DatagramPacket.class, MethodHandles.lookup()); 1908 LENGTH = l.findVarHandle(DatagramPacket.class, "length", int.class); 1909 BUF_LENGTH = l.findVarHandle(DatagramPacket.class, "bufLength", int.class); 1910 } catch (Exception e) { 1911 throw new ExceptionInInitializerError(e); 1912 } 1913 } 1914 1915 /** 1916 * Sets the DatagramPacket.length field. DatagramPacket.setLength cannot be 1917 * used at this time because it sets both the length and bufLength fields. 1918 */ 1919 static void setLength(DatagramPacket p, int value) { 1920 assert Thread.holdsLock(p); 1921 LENGTH.set(p, value); 1922 } 1923 1924 /** 1925 * Returns the value of the DatagramPacket.bufLength field. 1926 */ 1927 static int getBufLength(DatagramPacket p) { 1928 assert Thread.holdsLock(p); 1929 return (int) BUF_LENGTH.get(p); 1930 } 1931 } 1932 1933 // -- Native methods -- 1934 1935 private static native void disconnect0(FileDescriptor fd, boolean isIPv6) 1936 throws IOException; 1937 1938 private static native int receive0(FileDescriptor fd, long address, int len, 1939 long senderAddress, boolean connected) 1940 throws IOException; 1941 1942 private static native int send0(FileDescriptor fd, long address, int len, 1943 long targetAddress, int targetAddressLen) 1944 throws IOException; 1945 1946 static { 1947 IOUtil.load(); 1948 } 1949 }