1 /*
   2  * Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.io.FileDescriptor;
  29 import java.io.IOException;
  30 import java.nio.ByteBuffer;
  31 import java.nio.channels.AsynchronousCloseException;
  32 import java.nio.channels.ClosedChannelException;
  33 import java.nio.channels.NotYetConnectedException;
  34 import java.nio.channels.Pipe;
  35 import java.nio.channels.SelectionKey;
  36 import java.nio.channels.spi.SelectorProvider;
  37 import java.util.Objects;
  38 import java.util.concurrent.locks.ReentrantLock;
  39 
  40 class SourceChannelImpl
  41     extends Pipe.SourceChannel
  42     implements SelChImpl
  43 {
  44     // Used to make native read and write calls
  45     private static final NativeDispatcher nd = new FileDispatcherImpl();
  46 
  47     // The file descriptor associated with this channel
  48     private final FileDescriptor fd;
  49     private final int fdVal;
  50 
  51     // Lock held by current reading thread
  52     private final ReentrantLock readLock = new ReentrantLock();
  53 
  54     // Lock held by any thread that modifies the state fields declared below
  55     // DO NOT invoke a blocking I/O operation while holding this lock!
  56     private final Object stateLock = new Object();
  57 
  58     // -- The following fields are protected by stateLock
  59 
  60     // Channel state
  61     private static final int ST_INUSE = 0;
  62     private static final int ST_CLOSING = 1;
  63     private static final int ST_CLOSED = 2;
  64     private int state;
  65 
  66     // ID of native thread doing read, for signalling
  67     private long thread;
  68 
  69     // -- End of fields protected by stateLock
  70 
  71 
  72     public FileDescriptor getFD() {
  73         return fd;
  74     }
  75 
  76     public int getFDVal() {
  77         return fdVal;
  78     }
  79 
  80     SourceChannelImpl(SelectorProvider sp, FileDescriptor fd) throws IOException {
  81         super(sp);
  82         IOUtil.configureBlocking(fd, false);
  83         this.fd = fd;
  84         this.fdVal = IOUtil.fdVal(fd);
  85     }
  86 
  87     /**
  88      * Closes the read end of the pipe if there are no read operation in
  89      * progress and the channel is not registered with a Selector.
  90      */
  91     private boolean tryClose() throws IOException {
  92         assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
  93         if (thread == 0 && !isRegistered()) {
  94             state = ST_CLOSED;
  95             nd.close(fd);
  96             return true;
  97         } else {
  98             return false;
  99         }
 100     }
 101 
 102     /**
 103      * Invokes tryClose to attempt to close the read end of the pipe.
 104      *
 105      * This method is used for deferred closing by I/O and Selector operations.
 106      */
 107     private void tryFinishClose() {
 108         try {
 109             tryClose();
 110         } catch (IOException ignore) { }
 111     }
 112 
 113     /**
 114      * Closes this channel when configured in blocking mode.
 115      *
 116      * If there is a read operation in progress then the read-end of the pipe
 117      * is pre-closed and the reader is signalled, in which case the final close
 118      * is deferred until the reader aborts.
 119      */
 120     private void implCloseBlockingMode() throws IOException {
 121         synchronized (stateLock) {
 122             assert state < ST_CLOSING;
 123             state = ST_CLOSING;
 124             if (!tryClose()) {
 125                 long th = thread;
 126                 if (th != 0) {
 127                     if (NativeThread.isFiber(th))
 128                         Poller.stopPoll(fdVal);
 129                     nd.preClose(fd);
 130                     if (NativeThread.isKernelThread(th))
 131                         NativeThread.signal(th);
 132                 }
 133             }
 134         }
 135     }
 136 
 137     /**
 138      * Closes this channel when configured in non-blocking mode.
 139      *
 140      * If the channel is registered with a Selector then the close is deferred
 141      * until the channel is flushed from all Selectors.
 142      */
 143     private void implCloseNonBlockingMode() throws IOException {
 144         synchronized (stateLock) {
 145             assert state < ST_CLOSING;
 146             state = ST_CLOSING;
 147         }
 148         // wait for any read operation to complete before trying to close
 149         readLock.lock();
 150         readLock.unlock();
 151         synchronized (stateLock) {
 152             if (state == ST_CLOSING) {
 153                 tryClose();
 154             }
 155         }
 156     }
 157 
 158     /**
 159      * Invoked by implCloseChannel to close the channel.
 160      */
 161     @Override
 162     protected void implCloseSelectableChannel() throws IOException {
 163         assert !isOpen();
 164         if (isBlocking()) {
 165             implCloseBlockingMode();
 166         } else {
 167             implCloseNonBlockingMode();
 168         }
 169     }
 170     @Override
 171     public void kill() {
 172         synchronized (stateLock) {
 173             assert !isOpen();
 174             if (state == ST_CLOSING) {
 175                 tryFinishClose();
 176             }
 177         }
 178     }
 179 
 180     @Override
 181     protected void implConfigureBlocking(boolean block) throws IOException {
 182         readLock.lock();
 183         try {
 184             synchronized (stateLock) {
 185                 if (!isOpen())
 186                     throw new ClosedChannelException();
 187                 IOUtil.configureBlocking(fd, block);
 188             }
 189         } finally {
 190             readLock.unlock();
 191         }
 192     }
 193 
 194     public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
 195         int intOps = ski.nioInterestOps();
 196         int oldOps = ski.nioReadyOps();
 197         int newOps = initialOps;
 198 
 199         if ((ops & Net.POLLNVAL) != 0)
 200             throw new Error("POLLNVAL detected");
 201 
 202         if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
 203             newOps = intOps;
 204             ski.nioReadyOps(newOps);
 205             return (newOps & ~oldOps) != 0;
 206         }
 207 
 208         if (((ops & Net.POLLIN) != 0) &&
 209             ((intOps & SelectionKey.OP_READ) != 0))
 210             newOps |= SelectionKey.OP_READ;
 211 
 212         ski.nioReadyOps(newOps);
 213         return (newOps & ~oldOps) != 0;
 214     }
 215 
 216     public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {
 217         return translateReadyOps(ops, ski.nioReadyOps(), ski);
 218     }
 219 
 220     public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {
 221         return translateReadyOps(ops, 0, ski);
 222     }
 223 
 224     public int translateInterestOps(int ops) {
 225         int newOps = 0;
 226         if (ops == SelectionKey.OP_READ)
 227             newOps |= Net.POLLIN;
 228         return newOps;
 229     }
 230 
 231     /**
 232      * Marks the beginning of a read operation that might block.
 233      *
 234      * @throws ClosedChannelException if the channel is closed
 235      * @throws NotYetConnectedException if the channel is not yet connected
 236      */
 237     private void beginRead(boolean blocking) throws ClosedChannelException {
 238         if (blocking) {
 239             // set hook for Thread.interrupt
 240             begin();
 241         }
 242         synchronized (stateLock) {
 243             if (!isOpen())
 244                 throw new ClosedChannelException();
 245             if (blocking)
 246                 thread = NativeThread.current();
 247         }
 248     }
 249 
 250     /**
 251      * Marks the end of a read operation that may have blocked.
 252      *
 253      * @throws AsynchronousCloseException if the channel was closed due to this
 254      * thread being interrupted on a blocking read operation.
 255      */
 256     private void endRead(boolean blocking, boolean completed)
 257         throws AsynchronousCloseException
 258     {
 259         if (blocking) {
 260             synchronized (stateLock) {
 261                 thread = 0;
 262                 if (state == ST_CLOSING) {
 263                     tryFinishClose();
 264                 }
 265             }
 266             // remove hook for Thread.interrupt
 267             end(completed);
 268         }
 269     }
 270 
 271     @Override
 272     public int read(ByteBuffer dst) throws IOException {
 273         Objects.requireNonNull(dst);
 274 
 275         readLock.lock();
 276         try {
 277             boolean blocking = isBlocking();
 278             int n = 0;
 279             try {
 280                 beginRead(blocking);
 281                 n = IOUtil.read(fd, dst, -1, nd);
 282                 if (blocking) {
 283                     while (IOStatus.okayToRetry(n) && isOpen()) {
 284                         park(Net.POLLIN);
 285                         n = IOUtil.read(fd, dst, -1, nd);
 286                     }
 287                 }
 288             } finally {
 289                 endRead(blocking, n > 0);
 290                 assert IOStatus.check(n);
 291             }
 292             return IOStatus.normalize(n);
 293         } finally {
 294             readLock.unlock();
 295         }
 296     }
 297 
 298     @Override
 299     public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
 300         Objects.checkFromIndexSize(offset, length, dsts.length);
 301 
 302         readLock.lock();
 303         try {
 304             boolean blocking = isBlocking();
 305             long n = 0;
 306             try {
 307                 beginRead(blocking);
 308                 n = IOUtil.read(fd, dsts, offset, length, nd);
 309                 if (blocking) {
 310                     while (IOStatus.okayToRetry(n) && isOpen()) {
 311                         park(Net.POLLIN);
 312                         n = IOUtil.read(fd, dsts, offset, length, nd);
 313                     }
 314                 }
 315             } finally {
 316                 endRead(blocking, n > 0);
 317                 assert IOStatus.check(n);
 318             }
 319             return IOStatus.normalize(n);
 320         } finally {
 321             readLock.unlock();
 322         }
 323     }
 324 
 325     @Override
 326     public long read(ByteBuffer[] dsts) throws IOException {
 327         return read(dsts, 0, dsts.length);
 328     }
 329 }