< prev index next >

src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java

Print this page

        

*** 53,62 **** --- 53,63 ---- import java.util.HashSet; import java.util.Objects; import java.util.Set; import java.util.concurrent.locks.ReentrantLock; + import jdk.internal.misc.Strands; import sun.net.ResourceManager; import sun.net.ext.ExtendedSocketOptions; import sun.net.util.IPAddressUtil; /**
*** 119,128 **** --- 120,132 ---- private boolean reuseAddressEmulated; // set true/false when socket is already bound and SO_REUSEADDR is emulated private boolean isReuseAddress; + // lazily set to true when the socket is configured non-blocking + private volatile boolean nonBlocking; + // -- End of fields protected by stateLock public DatagramChannelImpl(SelectorProvider sp) throws IOException {
*** 407,416 **** --- 411,421 ---- boolean blocking = isBlocking(); int n = 0; ByteBuffer bb = null; try { SocketAddress remote = beginRead(blocking, false); + lockedConfigureNonBlockingIfFiber(); boolean connected = (remote != null); SecurityManager sm = System.getSecurityManager(); if (connected || (sm == null)) { // connected or no security manager n = receive(fd, dst, connected);
*** 511,520 **** --- 516,526 ---- try { boolean blocking = isBlocking(); int n = 0; try { SocketAddress remote = beginWrite(blocking, false); + lockedConfigureNonBlockingIfFiber(); if (remote != null) { // connected if (!target.equals(remote)) { throw new AlreadyConnectedException(); }
*** 618,627 **** --- 624,634 ---- try { boolean blocking = isBlocking(); int n = 0; try { beginRead(blocking, true); + lockedConfigureNonBlockingIfFiber(); n = IOUtil.read(fd, buf, -1, nd); if (blocking) { while (IOStatus.okayToRetry(n) && isOpen()) { park(Net.POLLIN); n = IOUtil.read(fd, buf, -1, nd);
*** 647,656 **** --- 654,664 ---- try { boolean blocking = isBlocking(); long n = 0; try { beginRead(blocking, true); + lockedConfigureNonBlockingIfFiber(); n = IOUtil.read(fd, dsts, offset, length, nd); if (blocking) { while (IOStatus.okayToRetry(n) && isOpen()) { park(Net.POLLIN); n = IOUtil.read(fd, dsts, offset, length, nd);
*** 724,733 **** --- 732,742 ---- try { boolean blocking = isBlocking(); int n = 0; try { beginWrite(blocking, true); + lockedConfigureNonBlockingIfFiber(); n = IOUtil.write(fd, buf, -1, nd); if (blocking) { while (IOStatus.okayToRetry(n) && isOpen()) { park(Net.POLLOUT); n = IOUtil.write(fd, buf, -1, nd);
*** 753,762 **** --- 762,772 ---- try { boolean blocking = isBlocking(); long n = 0; try { beginWrite(blocking, true); + lockedConfigureNonBlockingIfFiber(); n = IOUtil.write(fd, srcs, offset, length, nd); if (blocking) { while (IOStatus.okayToRetry(n) && isOpen()) { park(Net.POLLOUT); n = IOUtil.write(fd, srcs, offset, length, nd);
*** 776,797 **** protected void implConfigureBlocking(boolean block) throws IOException { readLock.lock(); try { writeLock.lock(); try { ! synchronized (stateLock) { ! ensureOpen(); ! IOUtil.configureBlocking(fd, block); ! } } finally { writeLock.unlock(); } } finally { readLock.unlock(); } } InetSocketAddress localAddress() { synchronized (stateLock) { return localAddress; } } --- 786,834 ---- protected void implConfigureBlocking(boolean block) throws IOException { readLock.lock(); try { writeLock.lock(); try { ! lockedConfigureBlocking(block); } finally { writeLock.unlock(); } } finally { readLock.unlock(); } } + /** + * Adjust the blocking mode while holding readLock or writeLock. + */ + private void lockedConfigureBlocking(boolean block) throws IOException { + assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); + synchronized (stateLock) { + ensureOpen(); + // do nothing if fiber has forced the socket to be non-blocking + if (!nonBlocking) { + IOUtil.configureBlocking(fd, block); + } + } + } + + /** + * Ensures that the socket is configured non-blocking when the current + * strand is a fiber or a timeout is specified. + * @throws IOException if there is an I/O error changing the blocking mode + */ + private void lockedConfigureNonBlockingIfFiber() throws IOException { + assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); + if (!nonBlocking && (Strands.currentStrand() instanceof Fiber)) { + synchronized (stateLock) { + ensureOpen(); + IOUtil.configureBlocking(fd, false); + nonBlocking = true; + } + } + } + InetSocketAddress localAddress() { synchronized (stateLock) { return localAddress; } }
*** 890,909 **** localAddress = Net.localAddress(fd); // flush any packets already received. boolean blocking = isBlocking(); if (blocking) { ! IOUtil.configureBlocking(fd, false); } try { ByteBuffer buf = ByteBuffer.allocate(100); while (receive(fd, buf, false) > 0) { buf.clear(); } } finally { if (blocking) { ! IOUtil.configureBlocking(fd, true); } } } } finally { writeLock.unlock(); --- 927,946 ---- localAddress = Net.localAddress(fd); // flush any packets already received. boolean blocking = isBlocking(); if (blocking) { ! lockedConfigureBlocking(false); } try { ByteBuffer buf = ByteBuffer.allocate(100); while (receive(fd, buf, false) > 0) { buf.clear(); } } finally { if (blocking) { ! lockedConfigureBlocking(true); } } } } finally { writeLock.unlock();
*** 1203,1216 **** if (!tryClose()) { long reader = readerThread; long writer = writerThread; if (reader != 0 || writer != 0) { nd.preClose(fd); ! if (reader != 0) NativeThread.signal(reader); ! if (writer != 0) NativeThread.signal(writer); } } } } --- 1240,1256 ---- if (!tryClose()) { long reader = readerThread; long writer = writerThread; if (reader != 0 || writer != 0) { + if (NativeThread.isFiber(reader) || NativeThread.isFiber(writer)) { + Poller.stopPoll(fdVal); + } nd.preClose(fd); ! if (NativeThread.isKernelThread(reader)) NativeThread.signal(reader); ! if (NativeThread.isKernelThread(writer)) NativeThread.signal(writer); } } } }
< prev index next >