1 /*
  2  * Copyright (c) 2000, 2025, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 
 26 package sun.nio.ch;
 27 
 28 import java.io.FileDescriptor;
 29 import java.io.IOException;
 30 import java.nio.ByteBuffer;
 31 import java.nio.channels.AsynchronousCloseException;
 32 import java.nio.channels.ClosedChannelException;
 33 import java.nio.channels.NotYetConnectedException;
 34 import java.nio.channels.Pipe;
 35 import java.nio.channels.SelectionKey;
 36 import java.nio.channels.spi.SelectorProvider;
 37 import java.util.Objects;
 38 import java.util.concurrent.locks.ReentrantLock;
 39 
 40 class SinkChannelImpl
 41     extends Pipe.SinkChannel
 42     implements SelChImpl
 43 {
 44     // Used to make native read and write calls
 45     private static final NativeDispatcher nd = new SocketDispatcher();
 46 
 47     // The file descriptor associated with this channel
 48     private final FileDescriptor fd;
 49     private final int fdVal;
 50 
 51     // Lock held by current writing thread
 52     private final ReentrantLock writeLock = new ReentrantLock();
 53 
 54     // Lock held by any thread that modifies the state fields declared below
 55     // DO NOT invoke a blocking I/O operation while holding this lock!
 56     private final Object stateLock = new Object();
 57 
 58     // -- The following fields are protected by stateLock
 59 
 60     // Channel state
 61     private static final int ST_INUSE = 0;
 62     private static final int ST_CLOSING = 1;
 63     private static final int ST_CLOSED = 2;
 64     private int state;
 65 
 66     // ID of native thread doing write, for signalling
 67     private long thread;
 68 
 69     // True if the channel's socket has been forced into non-blocking mode
 70     // by a virtual thread. It cannot be reset. When the channel is in
 71     // blocking mode and the channel's socket is in non-blocking mode then
 72     // operations that don't complete immediately will poll the socket and
 73     // preserve the semantics of blocking operations.
 74     private volatile boolean forcedNonBlocking;
 75 
 76     // -- End of fields protected by stateLock
 77 
 78 
 79     public FileDescriptor getFD() {
 80         return fd;
 81     }
 82 
 83     public int getFDVal() {
 84         return fdVal;
 85     }
 86 
 87     SinkChannelImpl(SelectorProvider sp, FileDescriptor fd) throws IOException {
 88         super(sp);
 89         this.fd = fd;
 90         this.fdVal = IOUtil.fdVal(fd);
 91     }
 92 
 93     /**
 94      * Checks that the channel is open.
 95      *
 96      * @throws ClosedChannelException if channel is closed (or closing)
 97      */
 98     private void ensureOpen() throws ClosedChannelException {
 99         if (!isOpen())
100             throw new ClosedChannelException();
101     }
102 
103     /**
104      * Ensures that the socket is configured non-blocking when on a virtual thread.
105      */
106     private void configureSocketNonBlockingIfVirtualThread() throws IOException {
107         assert writeLock.isHeldByCurrentThread();
108         if (!forcedNonBlocking && Thread.currentThread().isVirtual()) {
109             synchronized (stateLock) {
110                 ensureOpen();
111                 IOUtil.configureBlocking(fd, false);
112                 forcedNonBlocking = true;
113             }
114         }
115     }
116 
117     /**
118      * Closes the write end of the pipe if there are no write operation in
119      * progress and the channel is not registered with a Selector.
120      */
121     private boolean tryClose() throws IOException {
122         assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
123         if (thread == 0 && !isRegistered()) {
124             state = ST_CLOSED;
125             nd.close(fd);
126             return true;
127         } else {
128             return false;
129         }
130     }
131 
132     /**
133      * Invokes tryClose to attempt to close the write end of the pipe.
134      *
135      * This method is used for deferred closing by I/O and Selector operations.
136      */
137     private void tryFinishClose() {
138         try {
139             tryClose();
140         } catch (IOException ignore) { }
141     }
142 
143     /**
144      * Closes this channel when configured in blocking mode.
145      *
146      * If there is a write operation in progress then the write-end of the pipe
147      * is pre-closed and the writer is signalled, in which case the final close
148      * is deferred until the writer aborts.
149      */
150     private void implCloseBlockingMode() throws IOException {
151         synchronized (stateLock) {
152             assert state < ST_CLOSING;
153             state = ST_CLOSING;
154             if (!tryClose()) {
155                 nd.preClose(fd, thread, 0);
156             }
157         }
158     }
159 
160     /**
161      * Closes this channel when configured in non-blocking mode.
162      *
163      * If the channel is registered with a Selector then the close is deferred
164      * until the channel is flushed from all Selectors.
165      */
166     private void implCloseNonBlockingMode() throws IOException {
167         synchronized (stateLock) {
168             assert state < ST_CLOSING;
169             state = ST_CLOSING;
170         }
171         // wait for any write operation to complete before trying to close
172         writeLock.lock();
173         writeLock.unlock();
174         synchronized (stateLock) {
175             if (state == ST_CLOSING) {
176                 tryClose();
177             }
178         }
179     }
180 
181     /**
182      * Invoked by implCloseChannel to close the channel.
183      */
184     @Override
185     protected void implCloseSelectableChannel() throws IOException {
186         assert !isOpen();
187         if (isBlocking()) {
188             implCloseBlockingMode();
189         } else {
190             implCloseNonBlockingMode();
191         }
192     }
193 
194     @Override
195     public void kill() {
196         // wait for any write operation to complete before trying to close
197         writeLock.lock();
198         writeLock.unlock();
199         synchronized (stateLock) {
200             if (state == ST_CLOSING) {
201                 tryFinishClose();
202             }
203         }
204     }
205 
206     @Override
207     protected void implConfigureBlocking(boolean block) throws IOException {
208         writeLock.lock();
209         try {
210             synchronized (stateLock) {
211                 ensureOpen();
212                 // do nothing if virtual thread has forced the socket to be non-blocking
213                 if (!forcedNonBlocking) {
214                     IOUtil.configureBlocking(fd, block);
215                 }
216             }
217         } finally {
218             writeLock.unlock();
219         }
220     }
221 
222     public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
223         int intOps = ski.nioInterestOps();
224         int oldOps = ski.nioReadyOps();
225         int newOps = initialOps;
226 
227         if ((ops & Net.POLLNVAL) != 0)
228             throw new Error("POLLNVAL detected");
229 
230         if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
231             newOps = intOps;
232             ski.nioReadyOps(newOps);
233             return (newOps & ~oldOps) != 0;
234         }
235 
236         if (((ops & Net.POLLOUT) != 0) &&
237             ((intOps & SelectionKey.OP_WRITE) != 0))
238             newOps |= SelectionKey.OP_WRITE;
239 
240         ski.nioReadyOps(newOps);
241         return (newOps & ~oldOps) != 0;
242     }
243 
244     public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {
245         return translateReadyOps(ops, ski.nioReadyOps(), ski);
246     }
247 
248     public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {
249         return translateReadyOps(ops, 0, ski);
250     }
251 
252     public int translateInterestOps(int ops) {
253         int newOps = 0;
254         if (ops == SelectionKey.OP_WRITE)
255             newOps |= Net.POLLOUT;
256         return newOps;
257     }
258 
259     /**
260      * Marks the beginning of a write operation that might block.
261      *
262      * @throws ClosedChannelException if the channel is closed
263      * @throws NotYetConnectedException if the channel is not yet connected
264      */
265     private void beginWrite(boolean blocking) throws ClosedChannelException {
266         if (blocking) {
267             // set hook for Thread.interrupt
268             begin();
269         }
270         synchronized (stateLock) {
271             ensureOpen();
272             if (blocking)
273                 thread = NativeThread.current();
274         }
275     }
276 
277     /**
278      * Marks the end of a write operation that may have blocked.
279      *
280      * @throws AsynchronousCloseException if the channel was closed due to this
281      * thread being interrupted on a blocking write operation.
282      */
283     private void endWrite(boolean blocking, boolean completed)
284         throws AsynchronousCloseException
285     {
286         if (blocking) {
287             synchronized (stateLock) {
288                 thread = 0;
289                 if (state == ST_CLOSING) {
290                     tryFinishClose();
291                 }
292             }
293             // remove hook for Thread.interrupt
294             end(completed);
295         }
296     }
297 
298     @Override
299     public int write(ByteBuffer src) throws IOException {
300         Objects.requireNonNull(src);
301 
302         writeLock.lock();
303         try {
304             ensureOpen();
305             boolean blocking = isBlocking();
306             int n = 0;
307             try {
308                 beginWrite(blocking);
309                 configureSocketNonBlockingIfVirtualThread();
310                 n = IOUtil.write(fd, src, -1, nd);
311                 if (blocking) {
312                     while (IOStatus.okayToRetry(n) && isOpen()) {
313                         park(Net.POLLOUT);
314                         n = IOUtil.write(fd, src, -1, nd);
315                     }
316                 }
317             } finally {
318                 endWrite(blocking, n > 0);
319                 assert IOStatus.check(n);
320             }
321             return IOStatus.normalize(n);
322         } finally {
323             writeLock.unlock();
324         }
325     }
326 
327     @Override
328     public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
329         Objects.checkFromIndexSize(offset, length, srcs.length);
330 
331         writeLock.lock();
332         try {
333             ensureOpen();
334             boolean blocking = isBlocking();
335             long n = 0;
336             try {
337                 beginWrite(blocking);
338                 configureSocketNonBlockingIfVirtualThread();
339                 n = IOUtil.write(fd, srcs, offset, length, nd);
340                 if (blocking) {
341                     while (IOStatus.okayToRetry(n) && isOpen()) {
342                         park(Net.POLLOUT);
343                         n = IOUtil.write(fd, srcs, offset, length, nd);
344                     }
345                 }
346             } finally {
347                 endWrite(blocking, n > 0);
348                 assert IOStatus.check(n);
349             }
350             return IOStatus.normalize(n);
351         } finally {
352             writeLock.unlock();
353         }
354     }
355 
356     @Override
357     public long write(ByteBuffer[] srcs) throws IOException {
358         return write(srcs, 0, srcs.length);
359     }
360 }