1 /*
2 * Copyright (c) 2000, 2025, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package sun.nio.ch;
27
28 import java.io.FileDescriptor;
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.nio.channels.AsynchronousCloseException;
32 import java.nio.channels.ClosedChannelException;
33 import java.nio.channels.NotYetConnectedException;
34 import java.nio.channels.Pipe;
35 import java.nio.channels.SelectionKey;
36 import java.nio.channels.spi.SelectorProvider;
37 import java.util.Objects;
38 import java.util.concurrent.locks.ReentrantLock;
39
40 class SourceChannelImpl
41 extends Pipe.SourceChannel
42 implements SelChImpl
43 {
44 // Used to make native read and write calls
45 private static final NativeDispatcher nd = new SocketDispatcher();
46
47 // The file descriptor associated with this channel
48 private final FileDescriptor fd;
49 private final int fdVal;
50
51 // Lock held by current reading thread
52 private final ReentrantLock readLock = new ReentrantLock();
53
54 // Lock held by any thread that modifies the state fields declared below
55 // DO NOT invoke a blocking I/O operation while holding this lock!
56 private final Object stateLock = new Object();
57
58 // -- The following fields are protected by stateLock
59
60 // Channel state
61 private static final int ST_INUSE = 0;
62 private static final int ST_CLOSING = 1;
63 private static final int ST_CLOSED = 2;
64 private int state;
65
66 // Thread doing read, for signalling
67 private Thread readerThread;
68
69 // True if the channel's socket has been forced into non-blocking mode
70 // by a virtual thread. It cannot be reset. When the channel is in
71 // blocking mode and the channel's socket is in non-blocking mode then
72 // operations that don't complete immediately will poll the socket and
73 // preserve the semantics of blocking operations.
74 private volatile boolean forcedNonBlocking;
75
76 // -- End of fields protected by stateLock
77
78
79 public FileDescriptor getFD() {
80 return fd;
81 }
82
83 public int getFDVal() {
84 return fdVal;
85 }
86
87 SourceChannelImpl(SelectorProvider sp, FileDescriptor fd) throws IOException {
88 super(sp);
89 this.fd = fd;
90 this.fdVal = IOUtil.fdVal(fd);
91 }
92
93 /**
94 * Checks that the channel is open.
95 *
96 * @throws ClosedChannelException if channel is closed (or closing)
97 */
98 private void ensureOpen() throws ClosedChannelException {
99 if (!isOpen())
100 throw new ClosedChannelException();
101 }
102
103 /**
104 * Ensures that the socket is configured non-blocking when on a virtual thread.
105 */
106 private void configureSocketNonBlockingIfVirtualThread() throws IOException {
107 assert readLock.isHeldByCurrentThread();
108 if (!forcedNonBlocking && Thread.currentThread().isVirtual()) {
109 synchronized (stateLock) {
110 ensureOpen();
111 IOUtil.configureBlocking(fd, false);
112 forcedNonBlocking = true;
113 }
114 }
115 }
116
117 /**
118 * Closes the read end of the pipe if there are no read operation in
119 * progress and the channel is not registered with a Selector.
120 */
121 private boolean tryClose() throws IOException {
122 assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
123 if (readerThread == null && !isRegistered()) {
124 state = ST_CLOSED;
125 nd.close(fd);
126 return true;
127 } else {
128 return false;
129 }
130 }
131
132 /**
133 * Invokes tryClose to attempt to close the read end of the pipe.
134 *
135 * This method is used for deferred closing by I/O and Selector operations.
136 */
137 private void tryFinishClose() {
138 try {
139 tryClose();
140 } catch (IOException ignore) { }
141 }
142
143 /**
144 * Closes this channel when configured in blocking mode.
145 *
146 * If there is a read operation in progress then the read-end of the pipe
147 * is pre-closed and the reader is signalled, in which case the final close
148 * is deferred until the reader aborts.
149 */
150 private void implCloseBlockingMode() throws IOException {
151 synchronized (stateLock) {
152 assert state < ST_CLOSING;
153 state = ST_CLOSING;
154 if (!tryClose()) {
155 nd.preClose(fd, readerThread, null);
156 }
157 }
158 }
159
160 /**
161 * Closes this channel when configured in non-blocking mode.
162 *
163 * If the channel is registered with a Selector then the close is deferred
164 * until the channel is flushed from all Selectors.
165 */
166 private void implCloseNonBlockingMode() throws IOException {
167 synchronized (stateLock) {
168 assert state < ST_CLOSING;
169 state = ST_CLOSING;
170 }
171 // wait for any read operation to complete before trying to close
172 readLock.lock();
173 readLock.unlock();
174 synchronized (stateLock) {
175 if (state == ST_CLOSING) {
176 tryClose();
177 }
178 }
179 }
180
181 /**
182 * Invoked by implCloseChannel to close the channel.
183 */
184 @Override
185 protected void implCloseSelectableChannel() throws IOException {
186 assert !isOpen();
187 if (isBlocking()) {
188 implCloseBlockingMode();
189 } else {
190 implCloseNonBlockingMode();
191 }
192 }
193 @Override
194 public void kill() {
195 // wait for any read operation to complete before trying to close
196 readLock.lock();
197 readLock.unlock();
198 synchronized (stateLock) {
199 assert !isOpen();
200 if (state == ST_CLOSING) {
201 tryFinishClose();
202 }
203 }
204 }
205
206 @Override
207 protected void implConfigureBlocking(boolean block) throws IOException {
208 readLock.lock();
209 try {
210 synchronized (stateLock) {
211 ensureOpen();
212 // do nothing if virtual thread has forced the socket to be non-blocking
213 if (!forcedNonBlocking) {
214 IOUtil.configureBlocking(fd, block);
215 }
216 }
217 } finally {
218 readLock.unlock();
219 }
220 }
221
222 public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
223 int intOps = ski.nioInterestOps();
224 int oldOps = ski.nioReadyOps();
225 int newOps = initialOps;
226
227 if ((ops & Net.POLLNVAL) != 0)
228 throw new Error("POLLNVAL detected");
229
230 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
231 newOps = intOps;
232 ski.nioReadyOps(newOps);
233 return (newOps & ~oldOps) != 0;
234 }
235
236 if (((ops & Net.POLLIN) != 0) &&
237 ((intOps & SelectionKey.OP_READ) != 0))
238 newOps |= SelectionKey.OP_READ;
239
240 ski.nioReadyOps(newOps);
241 return (newOps & ~oldOps) != 0;
242 }
243
244 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {
245 return translateReadyOps(ops, ski.nioReadyOps(), ski);
246 }
247
248 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {
249 return translateReadyOps(ops, 0, ski);
250 }
251
252 public int translateInterestOps(int ops) {
253 int newOps = 0;
254 if (ops == SelectionKey.OP_READ)
255 newOps |= Net.POLLIN;
256 return newOps;
257 }
258
259 /**
260 * Marks the beginning of a read operation that might block.
261 *
262 * @throws ClosedChannelException if the channel is closed
263 * @throws NotYetConnectedException if the channel is not yet connected
264 */
265 private void beginRead(boolean blocking) throws ClosedChannelException {
266 if (blocking) {
267 // set hook for Thread.interrupt
268 begin();
269 }
270 synchronized (stateLock) {
271 ensureOpen();
272 if (blocking) {
273 readerThread = NativeThread.threadToSignal();
274 }
275 }
276 }
277
278 /**
279 * Marks the end of a read operation that may have blocked.
280 *
281 * @throws AsynchronousCloseException if the channel was closed due to this
282 * thread being interrupted on a blocking read operation.
283 */
284 private void endRead(boolean blocking, boolean completed)
285 throws AsynchronousCloseException
286 {
287 if (blocking) {
288 synchronized (stateLock) {
289 readerThread = null;
290 if (state == ST_CLOSING) {
291 tryFinishClose();
292 }
293 }
294 // remove hook for Thread.interrupt
295 end(completed);
296 }
297 }
298
299 @Override
300 public int read(ByteBuffer dst) throws IOException {
301 Objects.requireNonNull(dst);
302
303 readLock.lock();
304 try {
305 ensureOpen();
306 boolean blocking = isBlocking();
307 int n = 0;
308 try {
309 beginRead(blocking);
310 configureSocketNonBlockingIfVirtualThread();
311 n = IOUtil.read(fd, dst, -1, nd);
312 if (blocking) {
313 while (IOStatus.okayToRetry(n) && isOpen()) {
314 park(Net.POLLIN);
315 n = IOUtil.read(fd, dst, -1, nd);
316 }
317 }
318 } finally {
319 endRead(blocking, n > 0);
320 assert IOStatus.check(n);
321 }
322 return IOStatus.normalize(n);
323 } finally {
324 readLock.unlock();
325 }
326 }
327
328 @Override
329 public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
330 Objects.checkFromIndexSize(offset, length, dsts.length);
331
332 readLock.lock();
333 try {
334 ensureOpen();
335 boolean blocking = isBlocking();
336 long n = 0;
337 try {
338 beginRead(blocking);
339 configureSocketNonBlockingIfVirtualThread();
340 n = IOUtil.read(fd, dsts, offset, length, nd);
341 if (blocking) {
342 while (IOStatus.okayToRetry(n) && isOpen()) {
343 park(Net.POLLIN);
344 n = IOUtil.read(fd, dsts, offset, length, nd);
345 }
346 }
347 } finally {
348 endRead(blocking, n > 0);
349 assert IOStatus.check(n);
350 }
351 return IOStatus.normalize(n);
352 } finally {
353 readLock.unlock();
354 }
355 }
356
357 @Override
358 public long read(ByteBuffer[] dsts) throws IOException {
359 return read(dsts, 0, dsts.length);
360 }
361 }