1 /* 2 * Copyright (c) 2000, 2025, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.io.FileDescriptor; 29 import java.io.IOException; 30 import java.nio.ByteBuffer; 31 import java.nio.channels.AsynchronousCloseException; 32 import java.nio.channels.ClosedChannelException; 33 import java.nio.channels.NotYetConnectedException; 34 import java.nio.channels.Pipe; 35 import java.nio.channels.SelectionKey; 36 import java.nio.channels.spi.SelectorProvider; 37 import java.util.Objects; 38 import java.util.concurrent.locks.ReentrantLock; 39 40 class SourceChannelImpl 41 extends Pipe.SourceChannel 42 implements SelChImpl 43 { 44 // Used to make native read and write calls 45 private static final NativeDispatcher nd = new SocketDispatcher(); 46 47 // The file descriptor associated with this channel 48 private final FileDescriptor fd; 49 private final int fdVal; 50 51 // Lock held by current reading thread 52 private final ReentrantLock readLock = new ReentrantLock(); 53 54 // Lock held by any thread that modifies the state fields declared below 55 // DO NOT invoke a blocking I/O operation while holding this lock! 56 private final Object stateLock = new Object(); 57 58 // -- The following fields are protected by stateLock 59 60 // Channel state 61 private static final int ST_INUSE = 0; 62 private static final int ST_CLOSING = 1; 63 private static final int ST_CLOSED = 2; 64 private int state; 65 66 // ID of native thread doing read, for signalling 67 private NativeThread reader; 68 69 // True if the channel's socket has been forced into non-blocking mode 70 // by a virtual thread. It cannot be reset. When the channel is in 71 // blocking mode and the channel's socket is in non-blocking mode then 72 // operations that don't complete immediately will poll the socket and 73 // preserve the semantics of blocking operations. 74 private volatile boolean forcedNonBlocking; 75 76 // -- End of fields protected by stateLock 77 78 79 public FileDescriptor getFD() { 80 return fd; 81 } 82 83 public int getFDVal() { 84 return fdVal; 85 } 86 87 SourceChannelImpl(SelectorProvider sp, FileDescriptor fd) throws IOException { 88 super(sp); 89 this.fd = fd; 90 this.fdVal = IOUtil.fdVal(fd); 91 } 92 93 /** 94 * Checks that the channel is open. 95 * 96 * @throws ClosedChannelException if channel is closed (or closing) 97 */ 98 private void ensureOpen() throws ClosedChannelException { 99 if (!isOpen()) 100 throw new ClosedChannelException(); 101 } 102 103 /** 104 * Ensures that the socket is configured non-blocking when on a virtual thread. 105 */ 106 private void configureSocketNonBlockingIfVirtualThread() throws IOException { 107 assert readLock.isHeldByCurrentThread(); 108 if (!forcedNonBlocking && Thread.currentThread().isVirtual()) { 109 synchronized (stateLock) { 110 ensureOpen(); 111 IOUtil.configureBlocking(fd, false); 112 forcedNonBlocking = true; 113 } 114 } 115 } 116 117 /** 118 * Closes the read end of the pipe if there are no read operation in 119 * progress and the channel is not registered with a Selector. 120 */ 121 private boolean tryClose() throws IOException { 122 assert Thread.holdsLock(stateLock) && state == ST_CLOSING; 123 if (reader == null && !isRegistered()) { 124 state = ST_CLOSED; 125 nd.close(fd); 126 return true; 127 } else { 128 return false; 129 } 130 } 131 132 /** 133 * Invokes tryClose to attempt to close the read end of the pipe. 134 * 135 * This method is used for deferred closing by I/O and Selector operations. 136 */ 137 private void tryFinishClose() { 138 try { 139 tryClose(); 140 } catch (IOException ignore) { } 141 } 142 143 /** 144 * Closes this channel when configured in blocking mode. 145 * 146 * If there is a read operation in progress then the read-end of the pipe 147 * is pre-closed and the reader is signalled, in which case the final close 148 * is deferred until the reader aborts. 149 */ 150 private void implCloseBlockingMode() throws IOException { 151 synchronized (stateLock) { 152 assert state < ST_CLOSING; 153 state = ST_CLOSING; 154 if (!tryClose()) { 155 nd.preClose(fd, reader, null); 156 } 157 } 158 } 159 160 /** 161 * Closes this channel when configured in non-blocking mode. 162 * 163 * If the channel is registered with a Selector then the close is deferred 164 * until the channel is flushed from all Selectors. 165 */ 166 private void implCloseNonBlockingMode() throws IOException { 167 synchronized (stateLock) { 168 assert state < ST_CLOSING; 169 state = ST_CLOSING; 170 } 171 // wait for any read operation to complete before trying to close 172 readLock.lock(); 173 readLock.unlock(); 174 synchronized (stateLock) { 175 if (state == ST_CLOSING) { 176 tryClose(); 177 } 178 } 179 } 180 181 /** 182 * Invoked by implCloseChannel to close the channel. 183 */ 184 @Override 185 protected void implCloseSelectableChannel() throws IOException { 186 assert !isOpen(); 187 if (isBlocking()) { 188 implCloseBlockingMode(); 189 } else { 190 implCloseNonBlockingMode(); 191 } 192 } 193 @Override 194 public void kill() { 195 // wait for any read operation to complete before trying to close 196 readLock.lock(); 197 readLock.unlock(); 198 synchronized (stateLock) { 199 assert !isOpen(); 200 if (state == ST_CLOSING) { 201 tryFinishClose(); 202 } 203 } 204 } 205 206 @Override 207 protected void implConfigureBlocking(boolean block) throws IOException { 208 readLock.lock(); 209 try { 210 synchronized (stateLock) { 211 ensureOpen(); 212 // do nothing if virtual thread has forced the socket to be non-blocking 213 if (!forcedNonBlocking) { 214 IOUtil.configureBlocking(fd, block); 215 } 216 } 217 } finally { 218 readLock.unlock(); 219 } 220 } 221 222 public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) { 223 int intOps = ski.nioInterestOps(); 224 int oldOps = ski.nioReadyOps(); 225 int newOps = initialOps; 226 227 if ((ops & Net.POLLNVAL) != 0) 228 throw new Error("POLLNVAL detected"); 229 230 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) { 231 newOps = intOps; 232 ski.nioReadyOps(newOps); 233 return (newOps & ~oldOps) != 0; 234 } 235 236 if (((ops & Net.POLLIN) != 0) && 237 ((intOps & SelectionKey.OP_READ) != 0)) 238 newOps |= SelectionKey.OP_READ; 239 240 ski.nioReadyOps(newOps); 241 return (newOps & ~oldOps) != 0; 242 } 243 244 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) { 245 return translateReadyOps(ops, ski.nioReadyOps(), ski); 246 } 247 248 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) { 249 return translateReadyOps(ops, 0, ski); 250 } 251 252 public int translateInterestOps(int ops) { 253 int newOps = 0; 254 if (ops == SelectionKey.OP_READ) 255 newOps |= Net.POLLIN; 256 return newOps; 257 } 258 259 /** 260 * Marks the beginning of a read operation that might block. 261 * 262 * @throws ClosedChannelException if the channel is closed 263 * @throws NotYetConnectedException if the channel is not yet connected 264 */ 265 private void beginRead(boolean blocking) throws ClosedChannelException { 266 if (blocking) { 267 // set hook for Thread.interrupt 268 begin(); 269 } 270 synchronized (stateLock) { 271 ensureOpen(); 272 if (blocking) { 273 reader = NativeThread.current(); 274 } 275 } 276 } 277 278 /** 279 * Marks the end of a read operation that may have blocked. 280 * 281 * @throws AsynchronousCloseException if the channel was closed due to this 282 * thread being interrupted on a blocking read operation. 283 */ 284 private void endRead(boolean blocking, boolean completed) 285 throws AsynchronousCloseException 286 { 287 if (blocking) { 288 synchronized (stateLock) { 289 reader = null; 290 if (state == ST_CLOSING) { 291 tryFinishClose(); 292 } 293 } 294 // remove hook for Thread.interrupt 295 end(completed); 296 } 297 } 298 299 @Override 300 public int read(ByteBuffer dst) throws IOException { 301 Objects.requireNonNull(dst); 302 303 readLock.lock(); 304 try { 305 ensureOpen(); 306 boolean blocking = isBlocking(); 307 int n = 0; 308 try { 309 beginRead(blocking); 310 configureSocketNonBlockingIfVirtualThread(); 311 n = IOUtil.read(fd, dst, -1, nd); 312 if (blocking) { 313 while (IOStatus.okayToRetry(n) && isOpen()) { 314 park(Net.POLLIN); 315 n = IOUtil.read(fd, dst, -1, nd); 316 } 317 } 318 } finally { 319 endRead(blocking, n > 0); 320 assert IOStatus.check(n); 321 } 322 return IOStatus.normalize(n); 323 } finally { 324 readLock.unlock(); 325 } 326 } 327 328 @Override 329 public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { 330 Objects.checkFromIndexSize(offset, length, dsts.length); 331 332 readLock.lock(); 333 try { 334 ensureOpen(); 335 boolean blocking = isBlocking(); 336 long n = 0; 337 try { 338 beginRead(blocking); 339 configureSocketNonBlockingIfVirtualThread(); 340 n = IOUtil.read(fd, dsts, offset, length, nd); 341 if (blocking) { 342 while (IOStatus.okayToRetry(n) && isOpen()) { 343 park(Net.POLLIN); 344 n = IOUtil.read(fd, dsts, offset, length, nd); 345 } 346 } 347 } finally { 348 endRead(blocking, n > 0); 349 assert IOStatus.check(n); 350 } 351 return IOStatus.normalize(n); 352 } finally { 353 readLock.unlock(); 354 } 355 } 356 357 @Override 358 public long read(ByteBuffer[] dsts) throws IOException { 359 return read(dsts, 0, dsts.length); 360 } 361 }