1 /*
  2  * Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 
 26 package sun.nio.ch;
 27 
 28 import java.io.FileDescriptor;
 29 import java.io.IOException;
 30 import java.nio.ByteBuffer;
 31 import java.nio.channels.AsynchronousCloseException;
 32 import java.nio.channels.ClosedChannelException;
 33 import java.nio.channels.NotYetConnectedException;
 34 import java.nio.channels.Pipe;
 35 import java.nio.channels.SelectionKey;
 36 import java.nio.channels.spi.SelectorProvider;
 37 import java.util.Objects;
 38 import java.util.concurrent.locks.ReentrantLock;
 39 
 40 class SourceChannelImpl
 41     extends Pipe.SourceChannel
 42     implements SelChImpl
 43 {
 44     // Used to make native read and write calls
 45     private static final NativeDispatcher nd = new FileDispatcherImpl();
 46 
 47     // The file descriptor associated with this channel
 48     private final FileDescriptor fd;
 49     private final int fdVal;
 50 
 51     // Lock held by current reading thread
 52     private final ReentrantLock readLock = new ReentrantLock();
 53 
 54     // Lock held by any thread that modifies the state fields declared below
 55     // DO NOT invoke a blocking I/O operation while holding this lock!
 56     private final Object stateLock = new Object();
 57 
 58     // -- The following fields are protected by stateLock
 59 
 60     // Channel state
 61     private static final int ST_INUSE = 0;
 62     private static final int ST_CLOSING = 1;
 63     private static final int ST_CLOSED = 2;
 64     private int state;
 65 
 66     // ID of native thread doing read, for signalling
 67     private long thread;
 68 
 69     // -- End of fields protected by stateLock
 70 
 71 
 72     public FileDescriptor getFD() {
 73         return fd;
 74     }
 75 
 76     public int getFDVal() {
 77         return fdVal;
 78     }
 79 
 80     SourceChannelImpl(SelectorProvider sp, FileDescriptor fd) throws IOException {
 81         super(sp);
 82         IOUtil.configureBlocking(fd, false);
 83         this.fd = fd;
 84         this.fdVal = IOUtil.fdVal(fd);
 85     }
 86 
 87     /**
 88      * Closes the read end of the pipe if there are no read operation in
 89      * progress and the channel is not registered with a Selector.
 90      */
 91     private boolean tryClose() throws IOException {
 92         assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
 93         if (thread == 0 && !isRegistered()) {
 94             state = ST_CLOSED;
 95             nd.close(fd);
 96             return true;
 97         } else {
 98             return false;
 99         }
100     }
101 
102     /**
103      * Invokes tryClose to attempt to close the read end of the pipe.
104      *
105      * This method is used for deferred closing by I/O and Selector operations.
106      */
107     private void tryFinishClose() {
108         try {
109             tryClose();
110         } catch (IOException ignore) { }
111     }
112 
113     /**
114      * Closes this channel when configured in blocking mode.
115      *
116      * If there is a read operation in progress then the read-end of the pipe
117      * is pre-closed and the reader is signalled, in which case the final close
118      * is deferred until the reader aborts.
119      */
120     private void implCloseBlockingMode() throws IOException {
121         synchronized (stateLock) {
122             assert state < ST_CLOSING;
123             state = ST_CLOSING;
124             if (!tryClose()) {
125                 long th = thread;
126                 if (th != 0) {
127                     if (NativeThread.isVirtualThread(th)) {
128                         Poller.stopPoll(fdVal);
129                     } else {
130                         nd.preClose(fd);
131                         NativeThread.signal(th);
132                     }
133                 }
134             }
135         }
136     }
137 
138     /**
139      * Closes this channel when configured in non-blocking mode.
140      *
141      * If the channel is registered with a Selector then the close is deferred
142      * until the channel is flushed from all Selectors.
143      */
144     private void implCloseNonBlockingMode() throws IOException {
145         synchronized (stateLock) {
146             assert state < ST_CLOSING;
147             state = ST_CLOSING;
148         }
149         // wait for any read operation to complete before trying to close
150         readLock.lock();
151         readLock.unlock();
152         synchronized (stateLock) {
153             if (state == ST_CLOSING) {
154                 tryClose();
155             }
156         }
157     }
158 
159     /**
160      * Invoked by implCloseChannel to close the channel.
161      */
162     @Override
163     protected void implCloseSelectableChannel() throws IOException {
164         assert !isOpen();
165         if (isBlocking()) {
166             implCloseBlockingMode();
167         } else {
168             implCloseNonBlockingMode();
169         }
170     }
171     @Override
172     public void kill() {
173         synchronized (stateLock) {
174             assert !isOpen();
175             if (state == ST_CLOSING) {
176                 tryFinishClose();
177             }
178         }
179     }
180 
181     @Override
182     protected void implConfigureBlocking(boolean block) throws IOException {
183         readLock.lock();
184         try {
185             synchronized (stateLock) {
186                 if (!isOpen())
187                     throw new ClosedChannelException();
188                 IOUtil.configureBlocking(fd, block);
189             }
190         } finally {
191             readLock.unlock();
192         }
193     }
194 
195     public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
196         int intOps = ski.nioInterestOps();
197         int oldOps = ski.nioReadyOps();
198         int newOps = initialOps;
199 
200         if ((ops & Net.POLLNVAL) != 0)
201             throw new Error("POLLNVAL detected");
202 
203         if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
204             newOps = intOps;
205             ski.nioReadyOps(newOps);
206             return (newOps & ~oldOps) != 0;
207         }
208 
209         if (((ops & Net.POLLIN) != 0) &&
210             ((intOps & SelectionKey.OP_READ) != 0))
211             newOps |= SelectionKey.OP_READ;
212 
213         ski.nioReadyOps(newOps);
214         return (newOps & ~oldOps) != 0;
215     }
216 
217     public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {
218         return translateReadyOps(ops, ski.nioReadyOps(), ski);
219     }
220 
221     public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {
222         return translateReadyOps(ops, 0, ski);
223     }
224 
225     public int translateInterestOps(int ops) {
226         int newOps = 0;
227         if (ops == SelectionKey.OP_READ)
228             newOps |= Net.POLLIN;
229         return newOps;
230     }
231 
232     /**
233      * Marks the beginning of a read operation that might block.
234      *
235      * @throws ClosedChannelException if the channel is closed
236      * @throws NotYetConnectedException if the channel is not yet connected
237      */
238     private void beginRead(boolean blocking) throws ClosedChannelException {
239         if (blocking) {
240             // set hook for Thread.interrupt
241             begin();
242         }
243         synchronized (stateLock) {
244             if (!isOpen())
245                 throw new ClosedChannelException();
246             if (blocking)
247                 thread = NativeThread.current();
248         }
249     }
250 
251     /**
252      * Marks the end of a read operation that may have blocked.
253      *
254      * @throws AsynchronousCloseException if the channel was closed due to this
255      * thread being interrupted on a blocking read operation.
256      */
257     private void endRead(boolean blocking, boolean completed)
258         throws AsynchronousCloseException
259     {
260         if (blocking) {
261             synchronized (stateLock) {
262                 thread = 0;
263                 if (state == ST_CLOSING) {
264                     tryFinishClose();
265                 }
266             }
267             // remove hook for Thread.interrupt
268             end(completed);
269         }
270     }
271 
272     @Override
273     public int read(ByteBuffer dst) throws IOException {
274         Objects.requireNonNull(dst);
275 
276         readLock.lock();
277         try {
278             boolean blocking = isBlocking();
279             int n = 0;
280             try {
281                 beginRead(blocking);
282                 n = IOUtil.read(fd, dst, -1, nd);
283                 if (blocking) {
284                     while (IOStatus.okayToRetry(n) && isOpen()) {
285                         park(Net.POLLIN);
286                         n = IOUtil.read(fd, dst, -1, nd);
287                     }
288                 }
289             } finally {
290                 endRead(blocking, n > 0);
291                 assert IOStatus.check(n);
292             }
293             return IOStatus.normalize(n);
294         } finally {
295             readLock.unlock();
296         }
297     }
298 
299     @Override
300     public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
301         Objects.checkFromIndexSize(offset, length, dsts.length);
302 
303         readLock.lock();
304         try {
305             boolean blocking = isBlocking();
306             long n = 0;
307             try {
308                 beginRead(blocking);
309                 n = IOUtil.read(fd, dsts, offset, length, nd);
310                 if (blocking) {
311                     while (IOStatus.okayToRetry(n) && isOpen()) {
312                         park(Net.POLLIN);
313                         n = IOUtil.read(fd, dsts, offset, length, nd);
314                     }
315                 }
316             } finally {
317                 endRead(blocking, n > 0);
318                 assert IOStatus.check(n);
319             }
320             return IOStatus.normalize(n);
321         } finally {
322             readLock.unlock();
323         }
324     }
325 
326     @Override
327     public long read(ByteBuffer[] dsts) throws IOException {
328         return read(dsts, 0, dsts.length);
329     }
330 }