1 /*
2 * Copyright (c) 2001, 2025, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package sun.nio.ch;
27
28 import java.io.FileDescriptor;
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.io.UncheckedIOException;
32 import java.lang.invoke.MethodHandles;
33 import java.lang.invoke.VarHandle;
34 import java.lang.ref.Cleaner.Cleanable;
35 import java.lang.reflect.Method;
36 import java.net.DatagramPacket;
37 import java.net.DatagramSocket;
38 import java.net.Inet4Address;
39 import java.net.Inet6Address;
40 import java.net.InetAddress;
41 import java.net.InetSocketAddress;
42 import java.net.NetworkInterface;
43 import java.net.PortUnreachableException;
44 import java.net.ProtocolFamily;
45 import java.net.SocketAddress;
46 import java.net.SocketException;
47 import java.net.SocketOption;
48 import java.net.SocketTimeoutException;
49 import java.net.StandardProtocolFamily;
50 import java.net.StandardSocketOptions;
51 import java.nio.ByteBuffer;
52 import java.nio.channels.AlreadyBoundException;
53 import java.nio.channels.AlreadyConnectedException;
54 import java.nio.channels.AsynchronousCloseException;
55 import java.nio.channels.ClosedChannelException;
56 import java.nio.channels.DatagramChannel;
57 import java.nio.channels.IllegalBlockingModeException;
58 import java.nio.channels.MembershipKey;
59 import java.nio.channels.NotYetConnectedException;
60 import java.nio.channels.SelectionKey;
61 import java.nio.channels.spi.AbstractSelectableChannel;
62 import java.nio.channels.spi.SelectorProvider;
63 import java.util.Collections;
64 import java.util.HashMap;
65 import java.util.HashSet;
66 import java.util.Map;
67 import java.util.Objects;
68 import java.util.Set;
69 import java.util.concurrent.locks.ReentrantLock;
70 import java.util.function.Consumer;
71
72 import jdk.internal.access.JavaNioAccess;
73 import jdk.internal.access.SharedSecrets;
74 import jdk.internal.ref.CleanerFactory;
75 import jdk.internal.invoke.MhUtil;
76 import sun.net.ext.ExtendedSocketOptions;
77 import sun.net.util.IPAddressUtil;
78
79 import static java.util.concurrent.TimeUnit.MILLISECONDS;
80 import static java.util.concurrent.TimeUnit.NANOSECONDS;
81
82 /**
83 * An implementation of DatagramChannels.
84 */
85
86 class DatagramChannelImpl
87 extends DatagramChannel
88 implements SelChImpl
89 {
90 // Used to make native read and write calls
91 private static final NativeDispatcher nd = new DatagramDispatcher();
92
93 private static final JavaNioAccess NIO_ACCESS = SharedSecrets.getJavaNioAccess();
94
95 // true if interruptible (can be false to emulate legacy DatagramSocket)
96 private final boolean interruptible;
97
98 // The protocol family of the socket
99 private final ProtocolFamily family;
100
101 // Our file descriptor
102 private final FileDescriptor fd;
103 private final int fdVal;
104
105 // Native sockaddrs and cached InetSocketAddress for receive, protected by readLock
106 private NativeSocketAddress sourceSockAddr;
107 private NativeSocketAddress cachedSockAddr;
108 private InetSocketAddress cachedInetSocketAddress;
109
110 // Native sockaddr and cached objects for send, protected by writeLock
111 private final NativeSocketAddress targetSockAddr;
112 private InetSocketAddress previousTarget;
113 private int previousSockAddrLength;
114
115 // Cleaner to close file descriptor and free native socket address
116 private final Cleanable cleaner;
117
118 // Lock held by current reading or connecting thread
119 private final ReentrantLock readLock = new ReentrantLock();
120
121 // Lock held by current writing or connecting thread
122 private final ReentrantLock writeLock = new ReentrantLock();
123
124 // Lock held by any thread that modifies the state fields declared below
125 // DO NOT invoke a blocking I/O operation while holding this lock!
126 private final Object stateLock = new Object();
127
128 // -- The following fields are protected by stateLock
129
130 // State (does not necessarily increase monotonically)
131 private static final int ST_UNCONNECTED = 0;
132 private static final int ST_CONNECTED = 1;
133 private static final int ST_CLOSING = 2;
134 private static final int ST_CLOSED = 3;
135 private int state;
136
137 // IDs of native threads doing reads and writes, for signalling
138 private long readerThread;
139 private long writerThread;
140
141 // Local and remote (connected) address
142 private InetSocketAddress localAddress;
143 private InetSocketAddress remoteAddress;
144
145 // Local address prior to connecting
146 private InetSocketAddress initialLocalAddress;
147
148 // Socket adaptor, created lazily
149 private static final VarHandle SOCKET = MhUtil.findVarHandle(
150 MethodHandles.lookup(), "socket", DatagramSocket.class);
151 private volatile DatagramSocket socket;
152
153 // Multicast support
154 private MembershipRegistry registry;
155
156 // set true when socket is bound and SO_REUSEADDRESS is emulated
157 private boolean reuseAddressEmulated;
158
159 // set true/false when socket is already bound and SO_REUSEADDR is emulated
160 private boolean isReuseAddress;
161
162 // True if the channel's socket has been forced into non-blocking mode
163 // by a virtual thread. It cannot be reset. When the channel is in
164 // blocking mode and the channel's socket is in non-blocking mode then
165 // operations that don't complete immediately will poll the socket and
166 // preserve the semantics of blocking operations.
167 private volatile boolean forcedNonBlocking;
168
169 // -- End of fields protected by stateLock
170
171
172 DatagramChannelImpl(SelectorProvider sp, boolean interruptible) throws IOException {
173 this(sp, (Net.isIPv6Available()
174 ? StandardProtocolFamily.INET6
175 : StandardProtocolFamily.INET),
176 interruptible);
177 }
178
179 DatagramChannelImpl(SelectorProvider sp, ProtocolFamily family, boolean interruptible)
180 throws IOException
181 {
182 super(sp);
183
184 Objects.requireNonNull(family, "'family' is null");
185 if ((family != StandardProtocolFamily.INET) &&
186 (family != StandardProtocolFamily.INET6)) {
187 throw new UnsupportedOperationException("Protocol family not supported");
188 }
189 if (family == StandardProtocolFamily.INET6 && !Net.isIPv6Available()) {
190 throw new UnsupportedOperationException("IPv6 not available");
191 }
192
193 FileDescriptor fd = null;
194 NativeSocketAddress[] sockAddrs = null;
195
196 boolean initialized = false;
197 try {
198 this.interruptible = interruptible;
199 this.family = family;
200 this.fd = fd = Net.socket(family, false);
201 this.fdVal = IOUtil.fdVal(fd);
202
203 sockAddrs = NativeSocketAddress.allocate(3);
204 readLock.lock();
205 try {
206 this.sourceSockAddr = sockAddrs[0];
207 this.cachedSockAddr = sockAddrs[1];
208 } finally {
209 readLock.unlock();
210 }
211 this.targetSockAddr = sockAddrs[2];
212
213 initialized = true;
214 } finally {
215 if (!initialized) {
216 if (sockAddrs != null) NativeSocketAddress.freeAll(sockAddrs);
217 if (fd != null) nd.close(fd);
218 }
219 }
220
221 Runnable releaser = releaserFor(fd, sockAddrs);
222 this.cleaner = CleanerFactory.cleaner().register(this, releaser);
223 }
224
225 DatagramChannelImpl(SelectorProvider sp, FileDescriptor fd)
226 throws IOException
227 {
228 super(sp);
229
230 NativeSocketAddress[] sockAddrs = null;
231
232 boolean initialized = false;
233 try {
234 this.interruptible = true;
235 this.family = Net.isIPv6Available()
236 ? StandardProtocolFamily.INET6
237 : StandardProtocolFamily.INET;
238 this.fd = fd;
239 this.fdVal = IOUtil.fdVal(fd);
240
241 sockAddrs = NativeSocketAddress.allocate(3);
242 readLock.lock();
243 try {
244 this.sourceSockAddr = sockAddrs[0];
245 this.cachedSockAddr = sockAddrs[1];
246 } finally {
247 readLock.unlock();
248 }
249 this.targetSockAddr = sockAddrs[2];
250
251 initialized = true;
252 } finally {
253 if (!initialized) {
254 if (sockAddrs != null) NativeSocketAddress.freeAll(sockAddrs);
255 nd.close(fd);
256 }
257 }
258
259 Runnable releaser = releaserFor(fd, sockAddrs);
260 this.cleaner = CleanerFactory.cleaner().register(this, releaser);
261
262 synchronized (stateLock) {
263 this.localAddress = Net.localAddress(fd);
264 }
265 }
266
267 // @throws ClosedChannelException if channel is closed
268 private void ensureOpen() throws ClosedChannelException {
269 if (!isOpen())
270 throw new ClosedChannelException();
271 }
272
273 @Override
274 public DatagramSocket socket() {
275 DatagramSocket socket = this.socket;
276 if (socket == null) {
277 socket = DatagramSocketAdaptor.create(this);
278 if (!SOCKET.compareAndSet(this, null, socket)) {
279 socket = this.socket;
280 }
281 }
282 return socket;
283 }
284
285 @Override
286 public SocketAddress getLocalAddress() throws IOException {
287 synchronized (stateLock) {
288 ensureOpen();
289 return localAddress;
290 }
291 }
292
293 @Override
294 public SocketAddress getRemoteAddress() throws IOException {
295 synchronized (stateLock) {
296 ensureOpen();
297 return remoteAddress;
298 }
299 }
300
301 /**
302 * Returns the protocol family to specify to set/getSocketOption for the
303 * given socket option.
304 */
305 private ProtocolFamily familyFor(SocketOption<?> name) {
306 assert Thread.holdsLock(stateLock);
307
308 // unspecified (most options)
309 if (SocketOptionRegistry.findOption(name, Net.UNSPEC) != null)
310 return Net.UNSPEC;
311
312 // IPv4 socket
313 if (family == StandardProtocolFamily.INET)
314 return StandardProtocolFamily.INET;
315
316 // IPv6 socket that is unbound
317 if (localAddress == null)
318 return StandardProtocolFamily.INET6;
319
320 // IPv6 socket bound to wildcard or IPv6 address
321 InetAddress address = localAddress.getAddress();
322 if (address.isAnyLocalAddress() || (address instanceof Inet6Address))
323 return StandardProtocolFamily.INET6;
324
325 // IPv6 socket bound to IPv4 address
326 if (Net.canUseIPv6OptionsWithIPv4LocalAddress()) {
327 // IPV6_XXX options can be used
328 return StandardProtocolFamily.INET6;
329 } else {
330 // IPV6_XXX options cannot be used
331 return StandardProtocolFamily.INET;
332 }
333 }
334
335 @Override
336 public <T> DatagramChannel setOption(SocketOption<T> name, T value)
337 throws IOException
338 {
339 Objects.requireNonNull(name);
340 if (!supportedOptions().contains(name))
341 throw new UnsupportedOperationException("'" + name + "' not supported");
342 if (!name.type().isInstance(value))
343 throw new IllegalArgumentException("Invalid value '" + value + "'");
344
345 synchronized (stateLock) {
346 ensureOpen();
347
348 ProtocolFamily family = familyFor(name);
349
350 // Some platforms require both IPV6_XXX and IP_XXX socket options to
351 // be set when the channel's socket is IPv6 and it is used to send
352 // IPv4 multicast datagrams. The IP_XXX socket options are set on a
353 // best effort basis.
354 boolean needToSetIPv4Option = (family != Net.UNSPEC)
355 && (this.family == StandardProtocolFamily.INET6)
356 && Net.shouldSetBothIPv4AndIPv6Options();
357
358 // outgoing multicast interface
359 if (name == StandardSocketOptions.IP_MULTICAST_IF) {
360 assert family != Net.UNSPEC;
361 NetworkInterface interf = (NetworkInterface) value;
362 if (family == StandardProtocolFamily.INET6) {
363 int index = interf.getIndex();
364 if (index == -1)
365 throw new IOException("Network interface cannot be identified");
366 Net.setInterface6(fd, index);
367 }
368 if (family == StandardProtocolFamily.INET || needToSetIPv4Option) {
369 // need IPv4 address to identify interface
370 Inet4Address target = Net.anyInet4Address(interf);
371 if (target != null) {
372 try {
373 Net.setInterface4(fd, Net.inet4AsInt(target));
374 } catch (IOException ioe) {
375 if (family == StandardProtocolFamily.INET) throw ioe;
376 }
377 } else if (family == StandardProtocolFamily.INET) {
378 throw new IOException("Network interface not configured for IPv4");
379 }
380 }
381 return this;
382 }
383
384 // SO_REUSEADDR needs special handling as it may be emulated
385 if (name == StandardSocketOptions.SO_REUSEADDR
386 && Net.useExclusiveBind() && localAddress != null) {
387 reuseAddressEmulated = true;
388 this.isReuseAddress = (Boolean)value;
389 }
390
391 // remaining options don't need any special handling
392 Net.setSocketOption(fd, family, name, value);
393 if (needToSetIPv4Option && family != StandardProtocolFamily.INET) {
394 try {
395 Net.setSocketOption(fd, StandardProtocolFamily.INET, name, value);
396 } catch (IOException ignore) { }
397 }
398
399 return this;
400 }
401 }
402
403 @Override
404 @SuppressWarnings("unchecked")
405 public <T> T getOption(SocketOption<T> name)
406 throws IOException
407 {
408 Objects.requireNonNull(name);
409 if (!supportedOptions().contains(name))
410 throw new UnsupportedOperationException("'" + name + "' not supported");
411
412 synchronized (stateLock) {
413 ensureOpen();
414
415 ProtocolFamily family = familyFor(name);
416
417 if (name == StandardSocketOptions.IP_MULTICAST_IF) {
418 if (family == StandardProtocolFamily.INET) {
419 int address = Net.getInterface4(fd);
420 if (address == 0)
421 return null; // default interface
422
423 InetAddress ia = Net.inet4FromInt(address);
424 NetworkInterface ni = NetworkInterface.getByInetAddress(ia);
425 if (ni == null)
426 throw new IOException("Unable to map address to interface");
427 return (T) ni;
428 } else {
429 int index = Net.getInterface6(fd);
430 if (index == 0)
431 return null; // default interface
432
433 NetworkInterface ni = NetworkInterface.getByIndex(index);
434 if (ni == null)
435 throw new IOException("Unable to map index to interface");
436 return (T) ni;
437 }
438 }
439
440 if (name == StandardSocketOptions.SO_REUSEADDR && reuseAddressEmulated) {
441 return (T) Boolean.valueOf(isReuseAddress);
442 }
443
444 // no special handling
445 return (T) Net.getSocketOption(fd, family, name);
446 }
447 }
448
449 private static class DefaultOptionsHolder {
450 static final Set<SocketOption<?>> defaultOptions = defaultOptions();
451
452 private static Set<SocketOption<?>> defaultOptions() {
453 HashSet<SocketOption<?>> set = new HashSet<>();
454 set.add(StandardSocketOptions.SO_SNDBUF);
455 set.add(StandardSocketOptions.SO_RCVBUF);
456 set.add(StandardSocketOptions.SO_REUSEADDR);
457 if (Net.isReusePortAvailable()) {
458 set.add(StandardSocketOptions.SO_REUSEPORT);
459 }
460 set.add(StandardSocketOptions.SO_BROADCAST);
461 set.add(StandardSocketOptions.IP_TOS);
462 set.add(StandardSocketOptions.IP_MULTICAST_IF);
463 set.add(StandardSocketOptions.IP_MULTICAST_TTL);
464 set.add(StandardSocketOptions.IP_MULTICAST_LOOP);
465 set.addAll(ExtendedSocketOptions.datagramSocketOptions());
466 return Collections.unmodifiableSet(set);
467 }
468 }
469
470 @Override
471 public final Set<SocketOption<?>> supportedOptions() {
472 return DefaultOptionsHolder.defaultOptions;
473 }
474
475 @Override
476 public void park(int event, long nanos) throws IOException {
477 Thread thread = Thread.currentThread();
478 if (thread.isVirtual()) {
479 Poller.poll(getFDVal(), event, nanos, this::isOpen);
480 // DatagramSocket throws when virtual thread interrupted
481 if (!interruptible && thread.isInterrupted()) {
482 throw new InterruptedIOException();
483 }
484 } else {
485 long millis;
486 if (nanos == 0) {
487 millis = -1;
488 } else {
489 millis = NANOSECONDS.toMillis(nanos);
490 if (nanos > MILLISECONDS.toNanos(millis)) {
491 // Round up any excess nanos to the nearest millisecond to
492 // avoid parking for less than requested.
493 millis++;
494 }
495 }
496 Net.poll(getFD(), event, millis);
497 }
498 }
499
500 /**
501 * Marks the beginning of a read operation that might block.
502 *
503 * @param blocking true if configured blocking
504 * @param mustBeConnected true if the socket must be connected
505 * @return remote address if connected
506 * @throws ClosedChannelException if the channel is closed
507 * @throws NotYetConnectedException if mustBeConnected and not connected
508 * @throws IOException if socket not bound and cannot be bound
509 */
510 private SocketAddress beginRead(boolean blocking, boolean mustBeConnected)
511 throws IOException
512 {
513 if (blocking && interruptible) {
514 // set hook for Thread.interrupt
515 begin();
516 }
517 SocketAddress remote;
518 synchronized (stateLock) {
519 ensureOpen();
520 remote = remoteAddress;
521 if ((remote == null) && mustBeConnected)
522 throw new NotYetConnectedException();
523 if (localAddress == null)
524 bindInternal(null);
525 if (blocking)
526 readerThread = NativeThread.current();
527 }
528 return remote;
529 }
530
531 /**
532 * Marks the end of a read operation that may have blocked.
533 *
534 * @throws AsynchronousCloseException if the channel was closed asynchronously
535 */
536 private void endRead(boolean blocking, boolean completed)
537 throws AsynchronousCloseException
538 {
539 if (blocking) {
540 synchronized (stateLock) {
541 readerThread = 0;
542 if (state == ST_CLOSING) {
543 tryFinishClose();
544 }
545 }
546 if (interruptible) {
547 // remove hook for Thread.interrupt (may throw AsynchronousCloseException)
548 end(completed);
549 } else if (!completed && !isOpen()) {
550 throw new AsynchronousCloseException();
551 }
552 }
553 }
554
555 @Override
556 public SocketAddress receive(ByteBuffer dst) throws IOException {
557 if (dst.isReadOnly())
558 throw new IllegalArgumentException("Read-only buffer");
559 readLock.lock();
560 try {
561 ensureOpen();
562 boolean blocking = isBlocking();
563 SocketAddress sender = null;
564 try {
565 SocketAddress remote = beginRead(blocking, false);
566 configureSocketNonBlockingIfVirtualThread();
567 boolean connected = (remote != null);
568 int n = receive(dst, connected);
569 if (blocking) {
570 while (IOStatus.okayToRetry(n) && isOpen()) {
571 park(Net.POLLIN);
572 n = receive(dst, connected);
573 }
574 }
575 if (n > 0 || (n == 0 && isOpen())) {
576 // sender address is in socket address buffer
577 sender = sourceSocketAddress();
578 }
579 return sender;
580 } finally {
581 endRead(blocking, (sender != null));
582 }
583 } finally {
584 readLock.unlock();
585 }
586 }
587
588 /**
589 * Receives a datagram.
590 *
591 * @apiNote This method is for use by the socket adaptor.
592 *
593 * @throws IllegalBlockingModeException if the channel is non-blocking
594 * @throws SocketTimeoutException if the timeout elapses
595 */
596 void blockingReceive(DatagramPacket p, long nanos) throws IOException {
597 assert Thread.holdsLock(p) && nanos >= 0;
598
599 readLock.lock();
600 try {
601 ensureOpen();
602 if (!isBlocking())
603 throw new IllegalBlockingModeException();
604
605 // underlying socket needs to be non-blocking if timed receive or virtual thread
606 if (nanos > 0) {
607 configureSocketNonBlocking();
608 } else {
609 configureSocketNonBlockingIfVirtualThread();
610 }
611
612 boolean completed = false;
613 try {
614 SocketAddress remote = beginRead(true, false);
615 boolean connected = (remote != null);
616
617 // p.bufLength is the maximum size of the datagram that can be received
618 int bufLength = DatagramPackets.getBufLength(p);
619 ByteBuffer dst = tryBlockingReceive(connected, bufLength, nanos);
620 if (dst != null) {
621 // copy to DatagramPacket, set length and sender
622 try {
623 int len = dst.limit();
624 dst.get(p.getData(), p.getOffset(), len);
625 DatagramPackets.setLength(p, len);
626 p.setSocketAddress(sourceSocketAddress());
627 } finally {
628 Util.offerFirstTemporaryDirectBuffer(dst);
629 }
630 completed = true;
631 }
632
633 } finally {
634 endRead(true, completed);
635 }
636
637 } finally {
638 readLock.unlock();
639 }
640 }
641
642 /**
643 * Attempt to receive a datagram.
644 *
645 * @param connected if the channel's socket is connected
646 * @param len the maximum size of the datagram to receive
647 * @param nanos the timeout, should be Long.MAX_VALUE for untimed
648 * @return a direct buffer containing the datagram or null if channel is closed
649 * @throws SocketTimeoutException if the timeout elapses
650 */
651 private ByteBuffer tryBlockingReceive(boolean connected, int len, long nanos)
652 throws IOException
653 {
654 assert nanos >= 0;
655 long startNanos = System.nanoTime();
656 ByteBuffer dst = Util.getTemporaryDirectBuffer(len);
657 int n = -1;
658 try {
659 n = receive(dst, connected);
660 while (n == IOStatus.UNAVAILABLE && isOpen()) {
661 // virtual thread needs to release temporary direct buffer before parking
662 if (Thread.currentThread().isVirtual()) {
663 Util.offerFirstTemporaryDirectBuffer(dst);
664 dst = null;
665 }
666 if (nanos > 0) {
667 long remainingNanos = nanos - (System.nanoTime() - startNanos);
668 if (remainingNanos <= 0) {
669 throw new SocketTimeoutException("Receive timed out");
670 }
671 park(Net.POLLIN, remainingNanos);
672 } else {
673 park(Net.POLLIN);
674 }
675 // virtual thread needs to re-allocate temporary direct buffer after parking
676 if (Thread.currentThread().isVirtual()) {
677 dst = Util.getTemporaryDirectBuffer(len);
678 }
679 n = receive(dst, connected);
680 }
681 dst.flip();
682 } finally {
683 // release buffer if no datagram received
684 if (dst != null && (n < 0 || (n == 0 && !isOpen()))) {
685 Util.offerFirstTemporaryDirectBuffer(dst);
686 dst = null;
687 }
688 }
689 return dst;
690 }
691
692 /**
693 * Receives a datagram into the buffer.
694 * @param connected true if the channel is connected
695 */
696 private int receive(ByteBuffer dst, boolean connected) throws IOException {
697 int pos = dst.position();
698 int lim = dst.limit();
699 assert (pos <= lim);
700 int rem = (pos <= lim ? lim - pos : 0);
701 if (dst instanceof DirectBuffer && rem > 0)
702 return receiveIntoNativeBuffer(dst, rem, pos, connected);
703
704 // Substitute a native buffer. If the supplied buffer is empty
705 // we must instead use a nonempty buffer, otherwise the call
706 // will not block waiting for a datagram on some platforms.
707 int newSize = Math.max(rem, 1);
708 ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize);
709 try {
710 int n = receiveIntoNativeBuffer(bb, newSize, 0, connected);
711 bb.flip();
712 if (n > 0 && rem > 0)
713 dst.put(bb);
714 return n;
715 } finally {
716 Util.releaseTemporaryDirectBuffer(bb);
717 }
718 }
719
720 /**
721 * Receives a datagram into a direct buffer.
722 */
723 private int receiveIntoNativeBuffer(ByteBuffer bb, int rem, int pos,
724 boolean connected)
725 throws IOException
726 {
727 NIO_ACCESS.acquireSession(bb);
728 try {
729 long bufAddress = NIO_ACCESS.getBufferAddress(bb);
730 int n = receive0(fd,
731 bufAddress + pos,
732 rem,
733 sourceSockAddr.address(),
734 connected);
735 if (n > 0)
736 bb.position(pos + n);
737 return n;
738 } finally {
739 NIO_ACCESS.releaseSession(bb);
740 }
741 }
742
743 /**
744 * Return an InetSocketAddress to represent the source/sender socket address
745 * in sourceSockAddr. Returns the cached InetSocketAddress if the source
746 * address is the same as the cached address.
747 */
748 private InetSocketAddress sourceSocketAddress() throws IOException {
749 assert readLock.isHeldByCurrentThread();
750 if (cachedInetSocketAddress != null && sourceSockAddr.equals(cachedSockAddr)) {
751 return cachedInetSocketAddress;
752 }
753 InetSocketAddress isa = sourceSockAddr.decode();
754 // swap sourceSockAddr and cachedSockAddr
755 NativeSocketAddress tmp = cachedSockAddr;
756 cachedSockAddr = sourceSockAddr;
757 sourceSockAddr = tmp;
758 cachedInetSocketAddress = isa;
759 return isa;
760 }
761
762 @Override
763 public int send(ByteBuffer src, SocketAddress target)
764 throws IOException
765 {
766 Objects.requireNonNull(src);
767 InetSocketAddress isa = Net.checkAddress(target, family);
768
769 writeLock.lock();
770 try {
771 ensureOpen();
772 boolean blocking = isBlocking();
773 int n;
774 boolean completed = false;
775 try {
776 SocketAddress remote = beginWrite(blocking, false);
777 configureSocketNonBlockingIfVirtualThread();
778 if (remote != null) {
779 // connected
780 if (!target.equals(remote)) {
781 throw new AlreadyConnectedException();
782 }
783 n = IOUtil.write(fd, src, -1, nd);
784 if (blocking) {
785 while (IOStatus.okayToRetry(n) && isOpen()) {
786 park(Net.POLLOUT);
787 n = IOUtil.write(fd, src, -1, nd);
788 }
789 }
790 completed = (n > 0);
791 } else {
792 // not connected
793 InetAddress ia = isa.getAddress();
794 if (ia.isLinkLocalAddress())
795 isa = IPAddressUtil.toScopedAddress(isa);
796 if (isa.getPort() == 0)
797 throw new SocketException("Can't send to port 0");
798 n = send(fd, src, isa);
799 if (blocking) {
800 while (IOStatus.okayToRetry(n) && isOpen()) {
801 park(Net.POLLOUT);
802 n = send(fd, src, isa);
803 }
804 }
805 completed = (n >= 0);
806 }
807 } finally {
808 endWrite(blocking, completed);
809 }
810 assert n >= 0 || n == IOStatus.UNAVAILABLE;
811 return IOStatus.normalize(n);
812 } finally {
813 writeLock.unlock();
814 }
815 }
816
817 /**
818 * Sends a datagram.
819 *
820 * @apiNote This method is for use by the socket adaptor.
821 *
822 * @throws IllegalArgumentException if not connected and target address not set
823 * @throws IllegalBlockingModeException if the channel is non-blocking
824 */
825 void blockingSend(DatagramPacket p) throws IOException {
826 assert Thread.holdsLock(p);
827
828 writeLock.lock();
829 try {
830 ensureOpen();
831 if (!isBlocking())
832 throw new IllegalBlockingModeException();
833
834 int len = p.getLength();
835 ByteBuffer src = Util.getTemporaryDirectBuffer(len);
836 try {
837 // copy bytes to temporary direct buffer
838 src.put(p.getData(), p.getOffset(), len);
839 src.flip();
840
841 // target address
842 InetSocketAddress target;
843 if (p.getAddress() == null) {
844 InetSocketAddress remote = remoteAddress();
845 if (remote == null) {
846 throw new IllegalArgumentException("Address not set");
847 }
848 // set address/port to be compatible with long-standing behavior
849 p.setAddress(remote.getAddress());
850 p.setPort(remote.getPort());
851 target = remote;
852 } else {
853 target = (InetSocketAddress) p.getSocketAddress();
854 }
855
856 // send the datagram (does not block)
857 send(src, target);
858 } finally {
859 Util.offerFirstTemporaryDirectBuffer(src);
860 }
861
862 } finally {
863 writeLock.unlock();
864 }
865 }
866
867 private int send(FileDescriptor fd, ByteBuffer src, InetSocketAddress target)
868 throws IOException
869 {
870 if (src instanceof DirectBuffer)
871 return sendFromNativeBuffer(fd, src, target);
872
873 // Substitute a native buffer
874 int pos = src.position();
875 int lim = src.limit();
876 assert (pos <= lim);
877 int rem = (pos <= lim ? lim - pos : 0);
878
879 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
880 try {
881 bb.put(src);
882 bb.flip();
883 // Do not update src until we see how many bytes were written
884 src.position(pos);
885
886 int n = sendFromNativeBuffer(fd, bb, target);
887 if (n > 0) {
888 // now update src
889 src.position(pos + n);
890 }
891 return n;
892 } finally {
893 Util.releaseTemporaryDirectBuffer(bb);
894 }
895 }
896
897 /**
898 * Send a datagram contained in a direct buffer.
899 */
900 private int sendFromNativeBuffer(FileDescriptor fd, ByteBuffer bb,
901 InetSocketAddress target)
902 throws IOException
903 {
904 int pos = bb.position();
905 int lim = bb.limit();
906 assert (pos <= lim);
907 int rem = (pos <= lim ? lim - pos : 0);
908
909 int written;
910 NIO_ACCESS.acquireSession(bb);
911 try {
912 long bufAddress = NIO_ACCESS.getBufferAddress(bb);
913 int addressLen = targetSocketAddress(target);
914 written = send0(fd,
915 bufAddress + pos,
916 rem,
917 targetSockAddr.address(),
918 addressLen);
919 } catch (PortUnreachableException pue) {
920 if (isConnected())
921 throw pue;
922 written = rem;
923 } finally {
924 NIO_ACCESS.releaseSession(bb);
925 }
926 if (written > 0)
927 bb.position(pos + written);
928 return written;
929 }
930
931 /**
932 * Encodes the given InetSocketAddress into targetSockAddr, returning the
933 * length of the sockaddr structure (sizeof struct sockaddr or sockaddr6).
934 */
935 private int targetSocketAddress(InetSocketAddress isa) {
936 assert writeLock.isHeldByCurrentThread();
937 // Nothing to do if target address is already in the buffer. Use
938 // identity rather than equals as Inet6Address.equals ignores scope_id.
939 if (isa == previousTarget)
940 return previousSockAddrLength;
941 previousTarget = null;
942 int len = targetSockAddr.encode(family, isa);
943 previousTarget = isa;
944 previousSockAddrLength = len;
945 return len;
946 }
947
948 @Override
949 public int read(ByteBuffer buf) throws IOException {
950 Objects.requireNonNull(buf);
951
952 readLock.lock();
953 try {
954 ensureOpen();
955 boolean blocking = isBlocking();
956 int n = 0;
957 try {
958 beginRead(blocking, true);
959 configureSocketNonBlockingIfVirtualThread();
960 n = IOUtil.read(fd, buf, -1, nd);
961 if (blocking) {
962 while (IOStatus.okayToRetry(n) && isOpen()) {
963 park(Net.POLLIN);
964 n = IOUtil.read(fd, buf, -1, nd);
965 }
966 }
967 } finally {
968 endRead(blocking, n > 0);
969 assert IOStatus.check(n);
970 }
971 return IOStatus.normalize(n);
972 } finally {
973 readLock.unlock();
974 }
975 }
976
977 @Override
978 public long read(ByteBuffer[] dsts, int offset, int length)
979 throws IOException
980 {
981 Objects.checkFromIndexSize(offset, length, dsts.length);
982
983 readLock.lock();
984 try {
985 ensureOpen();
986 boolean blocking = isBlocking();
987 long n = 0;
988 try {
989 beginRead(blocking, true);
990 configureSocketNonBlockingIfVirtualThread();
991 n = IOUtil.read(fd, dsts, offset, length, nd);
992 if (blocking) {
993 while (IOStatus.okayToRetry(n) && isOpen()) {
994 park(Net.POLLIN);
995 n = IOUtil.read(fd, dsts, offset, length, nd);
996 }
997 }
998 } finally {
999 endRead(blocking, n > 0);
1000 assert IOStatus.check(n);
1001 }
1002 return IOStatus.normalize(n);
1003 } finally {
1004 readLock.unlock();
1005 }
1006 }
1007
1008 /**
1009 * Marks the beginning of a write operation that might block.
1010 * @param blocking true if configured blocking
1011 * @param mustBeConnected true if the socket must be connected
1012 * @return remote address if connected
1013 * @throws ClosedChannelException if the channel is closed
1014 * @throws NotYetConnectedException if mustBeConnected and not connected
1015 * @throws IOException if socket not bound and cannot be bound
1016 */
1017 private SocketAddress beginWrite(boolean blocking, boolean mustBeConnected)
1018 throws IOException
1019 {
1020 if (blocking && interruptible) {
1021 // set hook for Thread.interrupt
1022 begin();
1023 }
1024 SocketAddress remote;
1025 synchronized (stateLock) {
1026 ensureOpen();
1027 remote = remoteAddress;
1028 if ((remote == null) && mustBeConnected)
1029 throw new NotYetConnectedException();
1030 if (localAddress == null)
1031 bindInternal(null);
1032 if (blocking)
1033 writerThread = NativeThread.current();
1034 }
1035 return remote;
1036 }
1037
1038 /**
1039 * Marks the end of a write operation that may have blocked.
1040 *
1041 * @throws AsynchronousCloseException if the channel was closed asynchronously
1042 */
1043 private void endWrite(boolean blocking, boolean completed)
1044 throws AsynchronousCloseException
1045 {
1046 if (blocking) {
1047 synchronized (stateLock) {
1048 writerThread = 0;
1049 if (state == ST_CLOSING) {
1050 tryFinishClose();
1051 }
1052 }
1053
1054 if (interruptible) {
1055 // remove hook for Thread.interrupt (may throw AsynchronousCloseException)
1056 end(completed);
1057 } else if (!completed && !isOpen()) {
1058 throw new AsynchronousCloseException();
1059 }
1060 }
1061 }
1062
1063 @Override
1064 public int write(ByteBuffer buf) throws IOException {
1065 Objects.requireNonNull(buf);
1066
1067 writeLock.lock();
1068 try {
1069 ensureOpen();
1070 boolean blocking = isBlocking();
1071 int n = 0;
1072 try {
1073 beginWrite(blocking, true);
1074 configureSocketNonBlockingIfVirtualThread();
1075 n = IOUtil.write(fd, buf, -1, nd);
1076 if (blocking) {
1077 while (IOStatus.okayToRetry(n) && isOpen()) {
1078 park(Net.POLLOUT);
1079 n = IOUtil.write(fd, buf, -1, nd);
1080 }
1081 }
1082 } finally {
1083 endWrite(blocking, n > 0);
1084 assert IOStatus.check(n);
1085 }
1086 return IOStatus.normalize(n);
1087 } finally {
1088 writeLock.unlock();
1089 }
1090 }
1091
1092 @Override
1093 public long write(ByteBuffer[] srcs, int offset, int length)
1094 throws IOException
1095 {
1096 Objects.checkFromIndexSize(offset, length, srcs.length);
1097
1098 writeLock.lock();
1099 try {
1100 ensureOpen();
1101 boolean blocking = isBlocking();
1102 long n = 0;
1103 try {
1104 beginWrite(blocking, true);
1105 configureSocketNonBlockingIfVirtualThread();
1106 n = IOUtil.write(fd, srcs, offset, length, nd);
1107 if (blocking) {
1108 while (IOStatus.okayToRetry(n) && isOpen()) {
1109 park(Net.POLLOUT);
1110 n = IOUtil.write(fd, srcs, offset, length, nd);
1111 }
1112 }
1113 } finally {
1114 endWrite(blocking, n > 0);
1115 assert IOStatus.check(n);
1116 }
1117 return IOStatus.normalize(n);
1118 } finally {
1119 writeLock.unlock();
1120 }
1121 }
1122
1123 @Override
1124 protected void implConfigureBlocking(boolean block) throws IOException {
1125 readLock.lock();
1126 try {
1127 writeLock.lock();
1128 try {
1129 lockedConfigureBlocking(block);
1130 } finally {
1131 writeLock.unlock();
1132 }
1133 } finally {
1134 readLock.unlock();
1135 }
1136 }
1137
1138 /**
1139 * Adjusts the blocking mode. readLock or writeLock must already be held.
1140 */
1141 private void lockedConfigureBlocking(boolean block) throws IOException {
1142 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
1143 synchronized (stateLock) {
1144 ensureOpen();
1145 // do nothing if virtual thread has forced the socket to be non-blocking
1146 if (!forcedNonBlocking) {
1147 IOUtil.configureBlocking(fd, block);
1148 }
1149 }
1150 }
1151
1152 /**
1153 * Attempts to adjust the blocking mode if the channel is open.
1154 * @return {@code true} if the blocking mode was adjusted
1155 */
1156 private boolean tryLockedConfigureBlocking(boolean block) throws IOException {
1157 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
1158 synchronized (stateLock) {
1159 if (!forcedNonBlocking && isOpen()) {
1160 IOUtil.configureBlocking(fd, block);
1161 return true;
1162 } else {
1163 return false;
1164 }
1165 }
1166 }
1167
1168 /**
1169 * Ensures that the socket is configured non-blocking.
1170 * @throws IOException if there is an I/O error changing the blocking mode
1171 */
1172 private void configureSocketNonBlocking() throws IOException {
1173 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
1174 if (!forcedNonBlocking) {
1175 synchronized (stateLock) {
1176 ensureOpen();
1177 IOUtil.configureBlocking(fd, false);
1178 forcedNonBlocking = true;
1179 }
1180 }
1181 }
1182
1183 /**
1184 * Ensures that the socket is configured non-blocking when on a virtual thread.
1185 * @throws IOException if there is an I/O error changing the blocking mode
1186 */
1187 private void configureSocketNonBlockingIfVirtualThread() throws IOException {
1188 if (Thread.currentThread().isVirtual()) {
1189 configureSocketNonBlocking();
1190 }
1191 }
1192
1193 InetSocketAddress localAddress() {
1194 synchronized (stateLock) {
1195 return localAddress;
1196 }
1197 }
1198
1199 InetSocketAddress remoteAddress() {
1200 synchronized (stateLock) {
1201 return remoteAddress;
1202 }
1203 }
1204
1205 @Override
1206 public DatagramChannel bind(SocketAddress local) throws IOException {
1207 readLock.lock();
1208 try {
1209 writeLock.lock();
1210 try {
1211 synchronized (stateLock) {
1212 ensureOpen();
1213 if (localAddress != null)
1214 throw new AlreadyBoundException();
1215 bindInternal(local);
1216 }
1217 } finally {
1218 writeLock.unlock();
1219 }
1220 } finally {
1221 readLock.unlock();
1222 }
1223 return this;
1224 }
1225
1226 private void bindInternal(SocketAddress local) throws IOException {
1227 assert Thread.holdsLock(stateLock )&& (localAddress == null);
1228
1229 InetSocketAddress isa;
1230 if (local == null) {
1231 // only Inet4Address allowed with IPv4 socket
1232 if (family == StandardProtocolFamily.INET) {
1233 isa = new InetSocketAddress(InetAddress.getByName("0.0.0.0"), 0);
1234 } else {
1235 isa = new InetSocketAddress(0);
1236 }
1237 } else {
1238 isa = Net.checkAddress(local, family);
1239 }
1240
1241 Net.bind(family, fd, isa.getAddress(), isa.getPort());
1242 localAddress = Net.localAddress(fd);
1243 }
1244
1245 @Override
1246 public boolean isConnected() {
1247 synchronized (stateLock) {
1248 return (state == ST_CONNECTED);
1249 }
1250 }
1251
1252 @Override
1253 public DatagramChannel connect(SocketAddress sa) throws IOException {
1254 return connect(sa, true);
1255 }
1256
1257 /**
1258 * Connects the channel's socket.
1259 *
1260 * @param sa the remote address to which this channel is to be connected
1261 * @param check true to check if the channel is already connected.
1262 */
1263 DatagramChannel connect(SocketAddress sa, boolean check) throws IOException {
1264 InetSocketAddress isa = Net.checkAddress(sa, family);
1265
1266 readLock.lock();
1267 try {
1268 writeLock.lock();
1269 try {
1270 synchronized (stateLock) {
1271 ensureOpen();
1272 if (check && state == ST_CONNECTED)
1273 throw new AlreadyConnectedException();
1274 if (isa.getPort() == 0)
1275 throw new SocketException("Can't connect to port 0");
1276
1277 // ensure that the socket is bound
1278 if (localAddress == null) {
1279 bindInternal(null);
1280 }
1281
1282 // capture local address before connect
1283 initialLocalAddress = localAddress;
1284
1285 int n = Net.connect(family,
1286 fd,
1287 isa.getAddress(),
1288 isa.getPort());
1289 if (n <= 0)
1290 throw new Error(); // Can't happen
1291
1292 // connected
1293 remoteAddress = isa;
1294 state = ST_CONNECTED;
1295
1296 // refresh local address
1297 localAddress = Net.localAddress(fd);
1298
1299 // flush any packets already received.
1300 boolean blocking = isBlocking();
1301 if (blocking) {
1302 lockedConfigureBlocking(false);
1303 }
1304 try {
1305 ByteBuffer buf = ByteBuffer.allocate(100);
1306 while (receive(buf, false) >= 0) {
1307 buf.clear();
1308 }
1309 } finally {
1310 if (blocking) {
1311 tryLockedConfigureBlocking(true);
1312 }
1313 }
1314 }
1315 } finally {
1316 writeLock.unlock();
1317 }
1318 } finally {
1319 readLock.unlock();
1320 }
1321 return this;
1322 }
1323
1324 @Override
1325 public DatagramChannel disconnect() throws IOException {
1326 readLock.lock();
1327 try {
1328 writeLock.lock();
1329 try {
1330 synchronized (stateLock) {
1331 if (!isOpen() || (state != ST_CONNECTED))
1332 return this;
1333
1334 // disconnect socket
1335 boolean isIPv6 = (family == StandardProtocolFamily.INET6);
1336 disconnect0(fd, isIPv6);
1337
1338 // no longer connected
1339 remoteAddress = null;
1340 state = ST_UNCONNECTED;
1341
1342 // refresh localAddress, should be same as it was prior to connect
1343 localAddress = Net.localAddress(fd);
1344 try {
1345 if (!localAddress.equals(initialLocalAddress)) {
1346 // Workaround connect(2) issues on Linux and macOS
1347 repairSocket(initialLocalAddress);
1348 assert (localAddress != null)
1349 && localAddress.equals(Net.localAddress(fd))
1350 && localAddress.equals(initialLocalAddress);
1351 }
1352 } finally {
1353 initialLocalAddress = null;
1354 }
1355 }
1356 } finally {
1357 writeLock.unlock();
1358 }
1359 } finally {
1360 readLock.unlock();
1361 }
1362 return this;
1363 }
1364
1365 /**
1366 * "Repair" the channel's socket after a disconnect that didn't restore the
1367 * local address.
1368 *
1369 * On Linux, connect(2) dissolves the association but changes the local port
1370 * to 0 when it was initially bound to an ephemeral port. The workaround here
1371 * is to rebind to the original port.
1372 *
1373 * On macOS, connect(2) dissolves the association but rebinds the socket to
1374 * the wildcard address when it was initially bound to a specific address.
1375 * The workaround here is to re-create the socket.
1376 */
1377 private void repairSocket(InetSocketAddress target)
1378 throws IOException
1379 {
1380 assert Thread.holdsLock(stateLock);
1381
1382 // Linux: try to bind the socket to the original address/port
1383 if (localAddress.getPort() == 0) {
1384 assert localAddress.getAddress().equals(target.getAddress());
1385 Net.bind(family, fd, target.getAddress(), target.getPort());
1386 localAddress = Net.localAddress(fd);
1387 return;
1388 }
1389
1390 // capture the value of all existing socket options
1391 Map<SocketOption<?>, Object> map = new HashMap<>();
1392 for (SocketOption<?> option : supportedOptions()) {
1393 Object value = getOption(option);
1394 if (value != null) {
1395 map.put(option, value);
1396 }
1397 }
1398
1399 // macOS: re-create the socket.
1400 FileDescriptor newfd = Net.socket(family, false);
1401 try {
1402 // copy the socket options that are protocol family agnostic
1403 for (Map.Entry<SocketOption<?>, Object> e : map.entrySet()) {
1404 SocketOption<?> option = e.getKey();
1405 if (SocketOptionRegistry.findOption(option, Net.UNSPEC) != null) {
1406 Object value = e.getValue();
1407 try {
1408 Net.setSocketOption(newfd, Net.UNSPEC, option, value);
1409 } catch (IOException ignore) { }
1410 }
1411 }
1412
1413 // copy the blocking mode
1414 if (!isBlocking() || forcedNonBlocking) {
1415 IOUtil.configureBlocking(newfd, false);
1416 }
1417
1418 // dup this channel's socket to the new socket. If this succeeds then
1419 // fd will reference the new socket. If it fails then it will still
1420 // reference the old socket.
1421 nd.dup(newfd, fd);
1422 } finally {
1423 // release the file descriptor
1424 nd.close(newfd);
1425 }
1426
1427 // bind to the original local address
1428 try {
1429 Net.bind(family, fd, target.getAddress(), target.getPort());
1430 } catch (IOException ioe) {
1431 // bind failed, socket is left unbound
1432 localAddress = null;
1433 throw ioe;
1434 }
1435
1436 // restore local address
1437 localAddress = Net.localAddress(fd);
1438
1439 // restore all socket options (including those set in first pass)
1440 for (Map.Entry<SocketOption<?>, Object> e : map.entrySet()) {
1441 @SuppressWarnings("unchecked")
1442 SocketOption<Object> option = (SocketOption<Object>) e.getKey();
1443 Object value = e.getValue();
1444 try {
1445 setOption(option, value);
1446 } catch (IOException ignore) { }
1447 }
1448
1449 // restore multicast group membership
1450 MembershipRegistry registry = this.registry;
1451 if (registry != null) {
1452 registry.forEach(k -> {
1453 if (k instanceof MembershipKeyImpl.Type6) {
1454 MembershipKeyImpl.Type6 key6 = (MembershipKeyImpl.Type6) k;
1455 Net.join6(fd, key6.groupAddress(), key6.index(), key6.source());
1456 } else {
1457 MembershipKeyImpl.Type4 key4 = (MembershipKeyImpl.Type4) k;
1458 Net.join4(fd, key4.groupAddress(), key4.interfaceAddress(), key4.source());
1459 }
1460 });
1461 }
1462
1463 // reset registration in all Selectors that this channel is registered with
1464 AbstractSelectableChannels.forEach(this, SelectionKeyImpl::reset);
1465 }
1466
1467 /**
1468 * Defines static methods to access AbstractSelectableChannel non-public members.
1469 */
1470 private static class AbstractSelectableChannels {
1471 private static final Method FOREACH;
1472 static {
1473 try {
1474 Method m = AbstractSelectableChannel.class.getDeclaredMethod("forEach", Consumer.class);
1475 m.setAccessible(true);
1476 FOREACH = m;
1477 } catch (Exception e) {
1478 throw new InternalError(e);
1479 }
1480 }
1481 static void forEach(AbstractSelectableChannel ch, Consumer<SelectionKeyImpl> action) {
1482 try {
1483 FOREACH.invoke(ch, action);
1484 } catch (Exception e) {
1485 throw new InternalError(e);
1486 }
1487 }
1488 }
1489
1490 /**
1491 * Joins channel's socket to the given group/interface and
1492 * optional source address.
1493 */
1494 private MembershipKey innerJoin(InetAddress group,
1495 NetworkInterface interf,
1496 InetAddress source)
1497 throws IOException
1498 {
1499 if (!group.isMulticastAddress())
1500 throw new IllegalArgumentException("Group not a multicast address");
1501
1502 // check multicast address is compatible with this socket
1503 if (group instanceof Inet4Address) {
1504 if (family == StandardProtocolFamily.INET6 && !Net.canIPv6SocketJoinIPv4Group())
1505 throw new IllegalArgumentException("IPv6 socket cannot join IPv4 multicast group");
1506 } else if (group instanceof Inet6Address) {
1507 if (family != StandardProtocolFamily.INET6)
1508 throw new IllegalArgumentException("Only IPv6 sockets can join IPv6 multicast group");
1509 } else {
1510 throw new IllegalArgumentException("Address type not supported");
1511 }
1512
1513 // check source address
1514 if (source != null) {
1515 if (source.isAnyLocalAddress())
1516 throw new IllegalArgumentException("Source address is a wildcard address");
1517 if (source.isMulticastAddress())
1518 throw new IllegalArgumentException("Source address is multicast address");
1519 if (source.getClass() != group.getClass())
1520 throw new IllegalArgumentException("Source address is different type to group");
1521 }
1522
1523 synchronized (stateLock) {
1524 ensureOpen();
1525
1526 // check the registry to see if we are already a member of the group
1527 if (registry == null) {
1528 registry = new MembershipRegistry();
1529 } else {
1530 // return existing membership key
1531 MembershipKey key = registry.checkMembership(group, interf, source);
1532 if (key != null)
1533 return key;
1534 }
1535
1536 MembershipKeyImpl key;
1537 if ((family == StandardProtocolFamily.INET6) &&
1538 ((group instanceof Inet6Address) || Net.canJoin6WithIPv4Group()))
1539 {
1540 int index = interf.getIndex();
1541 if (index == -1)
1542 throw new IOException("Network interface cannot be identified");
1543
1544 // need multicast and source address as byte arrays
1545 byte[] groupAddress = Net.inet6AsByteArray(group);
1546 byte[] sourceAddress = (source == null) ? null :
1547 Net.inet6AsByteArray(source);
1548
1549 // join the group
1550 int n = Net.join6(fd, groupAddress, index, sourceAddress);
1551 if (n == IOStatus.UNAVAILABLE)
1552 throw new UnsupportedOperationException();
1553
1554 key = new MembershipKeyImpl.Type6(this, group, interf, source,
1555 groupAddress, index, sourceAddress);
1556
1557 } else {
1558 // need IPv4 address to identify interface
1559 Inet4Address target = Net.anyInet4Address(interf);
1560 if (target == null)
1561 throw new IOException("Network interface not configured for IPv4");
1562
1563 int groupAddress = Net.inet4AsInt(group);
1564 int targetAddress = Net.inet4AsInt(target);
1565 int sourceAddress = (source == null) ? 0 : Net.inet4AsInt(source);
1566
1567 // join the group
1568 int n = Net.join4(fd, groupAddress, targetAddress, sourceAddress);
1569 if (n == IOStatus.UNAVAILABLE)
1570 throw new UnsupportedOperationException();
1571
1572 key = new MembershipKeyImpl.Type4(this, group, interf, source,
1573 groupAddress, targetAddress, sourceAddress);
1574 }
1575
1576 registry.add(key);
1577 return key;
1578 }
1579 }
1580
1581 @Override
1582 public MembershipKey join(InetAddress group,
1583 NetworkInterface interf)
1584 throws IOException
1585 {
1586 return innerJoin(group, interf, null);
1587 }
1588
1589 @Override
1590 public MembershipKey join(InetAddress group,
1591 NetworkInterface interf,
1592 InetAddress source)
1593 throws IOException
1594 {
1595 Objects.requireNonNull(source);
1596 return innerJoin(group, interf, source);
1597 }
1598
1599 // package-private
1600 void drop(MembershipKeyImpl key) {
1601 assert key.channel() == this;
1602
1603 synchronized (stateLock) {
1604 if (!key.isValid())
1605 return;
1606
1607 try {
1608 if (key instanceof MembershipKeyImpl.Type6) {
1609 MembershipKeyImpl.Type6 key6 =
1610 (MembershipKeyImpl.Type6)key;
1611 Net.drop6(fd, key6.groupAddress(), key6.index(), key6.source());
1612 } else {
1613 MembershipKeyImpl.Type4 key4 = (MembershipKeyImpl.Type4)key;
1614 Net.drop4(fd, key4.groupAddress(), key4.interfaceAddress(),
1615 key4.source());
1616 }
1617 } catch (IOException ioe) {
1618 // should not happen
1619 throw new AssertionError(ioe);
1620 }
1621
1622 key.invalidate();
1623 registry.remove(key);
1624 }
1625 }
1626
1627 /**
1628 * Finds an existing membership of a multicast group. Returns null if this
1629 * channel's socket is not a member of the group.
1630 *
1631 * @apiNote This method is for use by the socket adaptor
1632 */
1633 MembershipKey findMembership(InetAddress group, NetworkInterface interf) {
1634 synchronized (stateLock) {
1635 if (registry != null) {
1636 return registry.checkMembership(group, interf, null);
1637 } else {
1638 return null;
1639 }
1640 }
1641 }
1642
1643 /**
1644 * Block datagrams from the given source.
1645 */
1646 void block(MembershipKeyImpl key, InetAddress source)
1647 throws IOException
1648 {
1649 assert key.channel() == this;
1650 assert key.sourceAddress() == null;
1651
1652 synchronized (stateLock) {
1653 if (!key.isValid())
1654 throw new IllegalStateException("key is no longer valid");
1655 if (source.isAnyLocalAddress())
1656 throw new IllegalArgumentException("Source address is a wildcard address");
1657 if (source.isMulticastAddress())
1658 throw new IllegalArgumentException("Source address is multicast address");
1659 if (source.getClass() != key.group().getClass())
1660 throw new IllegalArgumentException("Source address is different type to group");
1661
1662 int n;
1663 if (key instanceof MembershipKeyImpl.Type6) {
1664 MembershipKeyImpl.Type6 key6 =
1665 (MembershipKeyImpl.Type6)key;
1666 n = Net.block6(fd, key6.groupAddress(), key6.index(),
1667 Net.inet6AsByteArray(source));
1668 } else {
1669 MembershipKeyImpl.Type4 key4 =
1670 (MembershipKeyImpl.Type4)key;
1671 n = Net.block4(fd, key4.groupAddress(), key4.interfaceAddress(),
1672 Net.inet4AsInt(source));
1673 }
1674 if (n == IOStatus.UNAVAILABLE) {
1675 // ancient kernel
1676 throw new UnsupportedOperationException();
1677 }
1678 }
1679 }
1680
1681 /**
1682 * Unblock the given source.
1683 */
1684 void unblock(MembershipKeyImpl key, InetAddress source) {
1685 assert key.channel() == this;
1686 assert key.sourceAddress() == null;
1687
1688 synchronized (stateLock) {
1689 if (!key.isValid())
1690 throw new IllegalStateException("key is no longer valid");
1691
1692 try {
1693 if (key instanceof MembershipKeyImpl.Type6) {
1694 MembershipKeyImpl.Type6 key6 =
1695 (MembershipKeyImpl.Type6)key;
1696 Net.unblock6(fd, key6.groupAddress(), key6.index(),
1697 Net.inet6AsByteArray(source));
1698 } else {
1699 MembershipKeyImpl.Type4 key4 =
1700 (MembershipKeyImpl.Type4)key;
1701 Net.unblock4(fd, key4.groupAddress(), key4.interfaceAddress(),
1702 Net.inet4AsInt(source));
1703 }
1704 } catch (IOException ioe) {
1705 // should not happen
1706 throw new AssertionError(ioe);
1707 }
1708 }
1709 }
1710
1711 /**
1712 * Closes the socket if there are no I/O operations in progress and the
1713 * channel is not registered with a Selector.
1714 */
1715 private boolean tryClose() throws IOException {
1716 assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
1717 if ((readerThread == 0) && (writerThread == 0) && !isRegistered()) {
1718 state = ST_CLOSED;
1719 try {
1720 // close socket
1721 cleaner.clean();
1722 } catch (UncheckedIOException ioe) {
1723 throw ioe.getCause();
1724 }
1725 return true;
1726 } else {
1727 return false;
1728 }
1729 }
1730
1731 /**
1732 * Invokes tryClose to attempt to close the socket.
1733 *
1734 * This method is used for deferred closing by I/O and Selector operations.
1735 */
1736 private void tryFinishClose() {
1737 try {
1738 tryClose();
1739 } catch (IOException ignore) { }
1740 }
1741
1742 /**
1743 * Closes this channel when configured in blocking mode.
1744 *
1745 * If there is an I/O operation in progress then the socket is pre-closed
1746 * and the I/O threads signalled, in which case the final close is deferred
1747 * until all I/O operations complete.
1748 */
1749 private void implCloseBlockingMode() throws IOException {
1750 synchronized (stateLock) {
1751 assert state < ST_CLOSING;
1752 state = ST_CLOSING;
1753
1754 // if member of any multicast groups then invalidate the keys
1755 if (registry != null)
1756 registry.invalidateAll();
1757
1758 if (!tryClose()) {
1759 nd.preClose(fd, readerThread, writerThread);
1760 }
1761 }
1762 }
1763
1764 /**
1765 * Closes this channel when configured in non-blocking mode.
1766 *
1767 * If the channel is registered with a Selector then the close is deferred
1768 * until the channel is flushed from all Selectors.
1769 */
1770 private void implCloseNonBlockingMode() throws IOException {
1771 synchronized (stateLock) {
1772 assert state < ST_CLOSING;
1773 state = ST_CLOSING;
1774
1775 // if member of any multicast groups then invalidate the keys
1776 if (registry != null)
1777 registry.invalidateAll();
1778 }
1779
1780 // wait for any read/write operations to complete before trying to close
1781 readLock.lock();
1782 readLock.unlock();
1783 writeLock.lock();
1784 writeLock.unlock();
1785 synchronized (stateLock) {
1786 if (state == ST_CLOSING) {
1787 tryClose();
1788 }
1789 }
1790 }
1791
1792 /**
1793 * Invoked by implCloseChannel to close the channel.
1794 */
1795 @Override
1796 protected void implCloseSelectableChannel() throws IOException {
1797 assert !isOpen();
1798 if (isBlocking()) {
1799 implCloseBlockingMode();
1800 } else {
1801 implCloseNonBlockingMode();
1802 }
1803 }
1804
1805 @Override
1806 public void kill() {
1807 // wait for any read/write operations to complete before trying to close
1808 readLock.lock();
1809 readLock.unlock();
1810 writeLock.lock();
1811 writeLock.unlock();
1812 synchronized (stateLock) {
1813 if (state == ST_CLOSING) {
1814 tryFinishClose();
1815 }
1816 }
1817 }
1818
1819 /**
1820 * Translates native poll revent set into a ready operation set
1821 */
1822 public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
1823 int intOps = ski.nioInterestOps();
1824 int oldOps = ski.nioReadyOps();
1825 int newOps = initialOps;
1826
1827 if ((ops & Net.POLLNVAL) != 0) {
1828 // This should only happen if this channel is pre-closed while a
1829 // selection operation is in progress
1830 // ## Throw an error if this channel has not been pre-closed
1831 return false;
1832 }
1833
1834 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
1835 newOps = intOps;
1836 ski.nioReadyOps(newOps);
1837 return (newOps & ~oldOps) != 0;
1838 }
1839
1840 if (((ops & Net.POLLIN) != 0) &&
1841 ((intOps & SelectionKey.OP_READ) != 0))
1842 newOps |= SelectionKey.OP_READ;
1843
1844 if (((ops & Net.POLLOUT) != 0) &&
1845 ((intOps & SelectionKey.OP_WRITE) != 0))
1846 newOps |= SelectionKey.OP_WRITE;
1847
1848 ski.nioReadyOps(newOps);
1849 return (newOps & ~oldOps) != 0;
1850 }
1851
1852 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {
1853 return translateReadyOps(ops, ski.nioReadyOps(), ski);
1854 }
1855
1856 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {
1857 return translateReadyOps(ops, 0, ski);
1858 }
1859
1860 /**
1861 * Translates an interest operation set into a native poll event set
1862 */
1863 public int translateInterestOps(int ops) {
1864 int newOps = 0;
1865 if ((ops & SelectionKey.OP_READ) != 0)
1866 newOps |= Net.POLLIN;
1867 if ((ops & SelectionKey.OP_WRITE) != 0)
1868 newOps |= Net.POLLOUT;
1869 if ((ops & SelectionKey.OP_CONNECT) != 0)
1870 newOps |= Net.POLLIN;
1871 return newOps;
1872 }
1873
1874 public FileDescriptor getFD() {
1875 return fd;
1876 }
1877
1878 public int getFDVal() {
1879 return fdVal;
1880 }
1881
1882 /**
1883 * Returns an action to release the given file descriptor and socket addresses.
1884 */
1885 private static Runnable releaserFor(FileDescriptor fd, NativeSocketAddress... sockAddrs) {
1886 return () -> {
1887 try {
1888 nd.close(fd);
1889 } catch (IOException ioe) {
1890 throw new UncheckedIOException(ioe);
1891 } finally {
1892 // release memory
1893 NativeSocketAddress.freeAll(sockAddrs);
1894 }
1895 };
1896 }
1897
1898 /**
1899 * Defines static methods to get/set DatagramPacket fields and workaround
1900 * DatagramPacket deficiencies.
1901 */
1902 private static class DatagramPackets {
1903 private static final VarHandle LENGTH;
1904 private static final VarHandle BUF_LENGTH;
1905 static {
1906 try {
1907 MethodHandles.Lookup l = MethodHandles.privateLookupIn(DatagramPacket.class, MethodHandles.lookup());
1908 LENGTH = l.findVarHandle(DatagramPacket.class, "length", int.class);
1909 BUF_LENGTH = l.findVarHandle(DatagramPacket.class, "bufLength", int.class);
1910 } catch (Exception e) {
1911 throw new ExceptionInInitializerError(e);
1912 }
1913 }
1914
1915 /**
1916 * Sets the DatagramPacket.length field. DatagramPacket.setLength cannot be
1917 * used at this time because it sets both the length and bufLength fields.
1918 */
1919 static void setLength(DatagramPacket p, int value) {
1920 assert Thread.holdsLock(p);
1921 LENGTH.set(p, value);
1922 }
1923
1924 /**
1925 * Returns the value of the DatagramPacket.bufLength field.
1926 */
1927 static int getBufLength(DatagramPacket p) {
1928 assert Thread.holdsLock(p);
1929 return (int) BUF_LENGTH.get(p);
1930 }
1931 }
1932
1933 // -- Native methods --
1934
1935 private static native void disconnect0(FileDescriptor fd, boolean isIPv6)
1936 throws IOException;
1937
1938 private static native int receive0(FileDescriptor fd, long address, int len,
1939 long senderAddress, boolean connected)
1940 throws IOException;
1941
1942 private static native int send0(FileDescriptor fd, long address, int len,
1943 long targetAddress, int targetAddressLen)
1944 throws IOException;
1945
1946 static {
1947 IOUtil.load();
1948 }
1949 }