1 /*
   2  * Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.io.FileDescriptor;
  29 import java.io.IOException;
  30 import java.nio.ByteBuffer;
  31 import java.nio.channels.AsynchronousCloseException;
  32 import java.nio.channels.ClosedChannelException;
  33 import java.nio.channels.NotYetConnectedException;
  34 import java.nio.channels.Pipe;
  35 import java.nio.channels.SelectionKey;
  36 import java.nio.channels.spi.SelectorProvider;
  37 import java.util.Objects;
  38 import java.util.concurrent.locks.ReentrantLock;
  39 
  40 class SinkChannelImpl
  41     extends Pipe.SinkChannel
  42     implements SelChImpl
  43 {
  44     // Used to make native read and write calls
  45     private static final NativeDispatcher nd = new FileDispatcherImpl();
  46 
  47     // The file descriptor associated with this channel
  48     private final FileDescriptor fd;
  49     private final int fdVal;
  50 
  51     // Lock held by current writing thread
  52     private final ReentrantLock writeLock = new ReentrantLock();
  53 
  54     // Lock held by any thread that modifies the state fields declared below
  55     // DO NOT invoke a blocking I/O operation while holding this lock!
  56     private final Object stateLock = new Object();
  57 
  58     // -- The following fields are protected by stateLock
  59 
  60     // Channel state
  61     private static final int ST_INUSE = 0;
  62     private static final int ST_CLOSING = 1;
  63     private static final int ST_CLOSED = 2;
  64     private int state;
  65 
  66     // ID of native thread doing write, for signalling
  67     private long thread;
  68 
  69     // -- End of fields protected by stateLock
  70 
  71 
  72     public FileDescriptor getFD() {
  73         return fd;
  74     }
  75 
  76     public int getFDVal() {
  77         return fdVal;
  78     }
  79 
  80     SinkChannelImpl(SelectorProvider sp, FileDescriptor fd) {
  81         super(sp);
  82         this.fd = fd;
  83         this.fdVal = IOUtil.fdVal(fd);
  84     }
  85 
  86     /**
  87      * Closes the write end of the pipe if there are no write operation in
  88      * progress and the channel is not registered with a Selector.
  89      */
  90     private boolean tryClose() throws IOException {
  91         assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
  92         if (thread == 0 && !isRegistered()) {
  93             state = ST_CLOSED;
  94             nd.close(fd);
  95             return true;
  96         } else {
  97             return false;
  98         }
  99     }
 100 
 101     /**
 102      * Invokes tryClose to attempt to close the write end of the pipe.
 103      *
 104      * This method is used for deferred closing by I/O and Selector operations.
 105      */
 106     private void tryFinishClose() {
 107         try {
 108             tryClose();
 109         } catch (IOException ignore) { }
 110     }
 111 
 112     /**
 113      * Closes this channel when configured in blocking mode.
 114      *
 115      * If there is a write operation in progress then the write-end of the pipe
 116      * is pre-closed and the writer is signalled, in which case the final close
 117      * is deferred until the writer aborts.
 118      */
 119     private void implCloseBlockingMode() throws IOException {
 120         synchronized (stateLock) {
 121             assert state < ST_CLOSING;
 122             state = ST_CLOSING;
 123             if (!tryClose()) {
 124                 long th = thread;
 125                 if (th != 0) {
 126                     nd.preClose(fd);
 127                     NativeThread.signal(th);
 128                 }
 129             }
 130         }
 131     }
 132 
 133     /**
 134      * Closes this channel when configured in non-blocking mode.
 135      *
 136      * If the channel is registered with a Selector then the close is deferred
 137      * until the channel is flushed from all Selectors.
 138      */
 139     private void implCloseNonBlockingMode() throws IOException {
 140         synchronized (stateLock) {
 141             assert state < ST_CLOSING;
 142             state = ST_CLOSING;
 143         }
 144         // wait for any write operation to complete before trying to close
 145         writeLock.lock();
 146         writeLock.unlock();
 147         synchronized (stateLock) {
 148             if (state == ST_CLOSING) {
 149                 tryClose();
 150             }
 151         }
 152     }
 153 
 154     /**
 155      * Invoked by implCloseChannel to close the channel.
 156      */
 157     @Override
 158     protected void implCloseSelectableChannel() throws IOException {
 159         assert !isOpen();
 160         if (isBlocking()) {
 161             implCloseBlockingMode();
 162         } else {
 163             implCloseNonBlockingMode();
 164         }
 165     }
 166 
 167     @Override
 168     public void kill() {
 169         synchronized (stateLock) {
 170             if (state == ST_CLOSING) {
 171                 tryFinishClose();
 172             }
 173         }
 174     }
 175 
 176     @Override
 177     protected void implConfigureBlocking(boolean block) throws IOException {
 178         writeLock.lock();
 179         try {
 180             synchronized (stateLock) {
 181                 if (!isOpen())
 182                     throw new ClosedChannelException();
 183                 IOUtil.configureBlocking(fd, block);
 184             }
 185         } finally {
 186             writeLock.unlock();
 187         }
 188     }
 189 
 190     public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
 191         int intOps = ski.nioInterestOps();
 192         int oldOps = ski.nioReadyOps();
 193         int newOps = initialOps;
 194 
 195         if ((ops & Net.POLLNVAL) != 0)
 196             throw new Error("POLLNVAL detected");
 197 
 198         if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
 199             newOps = intOps;
 200             ski.nioReadyOps(newOps);
 201             return (newOps & ~oldOps) != 0;
 202         }
 203 
 204         if (((ops & Net.POLLOUT) != 0) &&
 205             ((intOps & SelectionKey.OP_WRITE) != 0))
 206             newOps |= SelectionKey.OP_WRITE;
 207 
 208         ski.nioReadyOps(newOps);
 209         return (newOps & ~oldOps) != 0;
 210     }
 211 
 212     public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {
 213         return translateReadyOps(ops, ski.nioReadyOps(), ski);
 214     }
 215 
 216     public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {
 217         return translateReadyOps(ops, 0, ski);
 218     }
 219 
 220     public int translateInterestOps(int ops) {
 221         int newOps = 0;
 222         if (ops == SelectionKey.OP_WRITE)
 223             newOps |= Net.POLLOUT;
 224         return newOps;
 225     }
 226 
 227     /**
 228      * Marks the beginning of a write operation that might block.
 229      *
 230      * @throws ClosedChannelException if the channel is closed
 231      * @throws NotYetConnectedException if the channel is not yet connected
 232      */
 233     private void beginWrite(boolean blocking) throws ClosedChannelException {
 234         if (blocking) {
 235             // set hook for Thread.interrupt
 236             begin();
 237         }
 238         synchronized (stateLock) {
 239             if (!isOpen())
 240                 throw new ClosedChannelException();
 241             if (blocking)
 242                 thread = NativeThread.current();
 243         }
 244     }
 245 
 246     /**
 247      * Marks the end of a write operation that may have blocked.
 248      *
 249      * @throws AsynchronousCloseException if the channel was closed due to this
 250      * thread being interrupted on a blocking write operation.
 251      */
 252     private void endWrite(boolean blocking, boolean completed)
 253         throws AsynchronousCloseException
 254     {
 255         if (blocking) {
 256             synchronized (stateLock) {
 257                 thread = 0;
 258                 if (state == ST_CLOSING) {
 259                     tryFinishClose();
 260                 }
 261             }
 262             // remove hook for Thread.interrupt
 263             end(completed);
 264         }
 265     }
 266 
 267     @Override
 268     public int write(ByteBuffer src) throws IOException {
 269         Objects.requireNonNull(src);
 270 
 271         writeLock.lock();
 272         try {
 273             boolean blocking = isBlocking();
 274             int n = 0;
 275             try {
 276                 beginWrite(blocking);
 277                 n = IOUtil.write(fd, src, -1, nd);
 278                 if (blocking) {
 279                     while (IOStatus.okayToRetry(n) && isOpen()) {
 280                         park(Net.POLLOUT);
 281                         n = IOUtil.write(fd, src, -1, nd);
 282                     }
 283                 }
 284             } finally {
 285                 endWrite(blocking, n > 0);
 286                 assert IOStatus.check(n);
 287             }
 288             return IOStatus.normalize(n);
 289         } finally {
 290             writeLock.unlock();
 291         }
 292     }
 293 
 294     @Override
 295     public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
 296         Objects.checkFromIndexSize(offset, length, srcs.length);
 297 
 298         writeLock.lock();
 299         try {
 300             boolean blocking = isBlocking();
 301             long n = 0;
 302             try {
 303                 beginWrite(blocking);
 304                 n = IOUtil.write(fd, srcs, offset, length, nd);
 305                 if (blocking) {
 306                     while (IOStatus.okayToRetry(n) && isOpen()) {
 307                         park(Net.POLLOUT);
 308                         n = IOUtil.write(fd, srcs, offset, length, nd);
 309                     }
 310                 }
 311             } finally {
 312                 endWrite(blocking, n > 0);
 313                 assert IOStatus.check(n);
 314             }
 315             return IOStatus.normalize(n);
 316         } finally {
 317             writeLock.unlock();
 318         }
 319     }
 320 
 321     @Override
 322     public long write(ByteBuffer[] srcs) throws IOException {
 323         return write(srcs, 0, srcs.length);
 324     }
 325 }