1 /*
   2  * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package sun.nio.ch;
  26 
  27 import java.io.FileDescriptor;
  28 import java.io.IOException;
  29 import java.nio.channels.Pipe;
  30 import java.util.ArrayDeque;
  31 import java.util.Deque;
  32 import java.util.HashMap;
  33 import java.util.Map;
  34 
  35 import jdk.internal.misc.Unsafe;
  36 import jdk.internal.access.JavaLangAccess;
  37 import jdk.internal.access.SharedSecrets;
  38 
  39 /**
  40  * Simple/non-scalable implementation of Poller based on WSAPoll. This
  41  * implementation will be replaced with a more scalable implementation in the
  42  * future.
  43  *
  44  * Due to a bug in WSAPoll, this implementation does not support polling for a
  45  * connection to complete. If a socket is registered to poll for a connect to
  46  * complete then it never will be polled when the connection cannot be
  47  * established.
  48  */
  49 
  50 class PollPoller extends Poller {
  51     private static JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
  52     private static final Unsafe UNSAFE = Unsafe.getUnsafe();
  53     private static final NativeDispatcher ND = new SocketDispatcher();
  54     private static final long TEMP_BUF = UNSAFE.allocateMemory(1);
  55 
  56     // initial capacity of poll array
  57     private static final int INITIAL_CAPACITY = 16;
  58 
  59     // true if this is a poller for reading (POLLIN), false for writing (POLLOUT)
  60     private final boolean reader;
  61 
  62     // poll array, grows as needed
  63     private int pollArrayCapacity = INITIAL_CAPACITY;
  64     private int pollArraySize;
  65     private AllocatedNativeObject pollArray;
  66 
  67     // maps file descriptor to index in poll array
  68     private final Map<Integer, Integer> fdToIndex = new HashMap<>();
  69 
  70     // pipe and file descriptors used for wakeup
  71     private final Object wakeupLock = new Object();
  72     private boolean wakeupTriggered;
  73     private final Pipe pipe;
  74     private final FileDescriptor fd0, fd1;
  75 
  76     // registration updates
  77     private final Object updateLock = new Object();
  78     private final Deque<Integer> registerQueue = new ArrayDeque<>();
  79 
  80     // deregistration (stop) requests
  81     private static class DeregisterRequest {
  82         final int fdVal;
  83         DeregisterRequest(int fdVal) {
  84             this.fdVal = fdVal;
  85         }
  86         int fdVal() {
  87             return fdVal;
  88         }
  89     }
  90     private final Deque<DeregisterRequest> deregisterQueue = new ArrayDeque<>();
  91 
  92     /**
  93      * Creates a PollPoller to support reading (POLLIN) or writing (POLLOUT)
  94      * operations.
  95      */
  96     PollPoller(boolean reader) throws IOException {
  97         this.reader = reader;
  98 
  99         int size = pollArrayCapacity * SIZE_POLLFD;
 100         this.pollArray = new AllocatedNativeObject(size, false);
 101 
 102         // wakeup support
 103         this.pipe = makePipe();
 104         SourceChannelImpl source = (SourceChannelImpl) pipe.source();
 105         SinkChannelImpl sink = (SinkChannelImpl) pipe.sink();
 106         (sink.sc).socket().setTcpNoDelay(true);
 107         this.fd0 = source.getFD();
 108         this.fd1 = sink.getFD();
 109 
 110         // element 0 in poll array is for wakeup.
 111         putDescriptor(0, source.getFDVal());
 112         putEvents(0, Net.POLLIN);
 113         putRevents(0, (short) 0);
 114         pollArraySize = 1;
 115     }
 116 
 117     /**
 118      * Register the file descriptor.
 119      */
 120     @Override
 121     protected void implRegister(int fdVal) {
 122         Integer fd = Integer.valueOf(fdVal);
 123         synchronized (updateLock) {
 124             registerQueue.add(fd);
 125         }
 126         wakeup();
 127     }
 128 
 129     /**
 130      * Deregister the file descriptor. This method waits until the poller thread
 131      * has removed the file descriptor from the poll array.
 132      */
 133     @Override
 134     protected boolean implDeregister(int fdVal) {
 135         boolean interrupted = false;
 136         var request = new DeregisterRequest(fdVal);
 137         synchronized (request) {
 138             synchronized (updateLock) {
 139                 deregisterQueue.add(request);
 140             }
 141             wakeup();
 142             try {
 143                 request.wait();
 144             } catch (InterruptedException e) {
 145                 interrupted = true;
 146             }
 147         }
 148         if (interrupted) {
 149             Thread.currentThread().interrupt();
 150         }
 151         return true;
 152     }
 153 
 154     /**
 155      * Poller run loop.
 156      */
 157     @Override
 158     public void run() {
 159         try {
 160             for (;;) {
 161                 // process any updates
 162                 synchronized (updateLock) {
 163                     processRegisterQueue();
 164                     processDeregisterQueue();
 165                 }
 166 
 167                 // poll for wakeup and/or events
 168                 int numPolled = poll(pollArray.address(), pollArraySize, -1);
 169                 boolean polledWakeup = (getRevents(0) != 0);
 170                 if (polledWakeup) {
 171                     numPolled--;
 172                 }
 173                 processEvents(numPolled);
 174 
 175                 // clear wakeup
 176                 if (polledWakeup) {
 177                     clearWakeup();
 178                 }
 179             }
 180         } catch (Throwable e) {
 181             e.printStackTrace();
 182         }
 183     }
 184 
 185     /**
 186      * Process the queue of file descriptors to poll
 187      */
 188     private void processRegisterQueue() {
 189         assert Thread.holdsLock(updateLock);
 190         Integer fd;
 191         while ((fd = registerQueue.pollFirst()) != null) {
 192             short events = (reader) ? Net.POLLIN : Net.POLLOUT;
 193             int index = add(fd, events);
 194             fdToIndex.put(fd, index);
 195         }
 196     }
 197 
 198     /**
 199      * Process the queue of file descriptors to stop polling
 200      */
 201     private void processDeregisterQueue() {
 202         assert Thread.holdsLock(updateLock);
 203         DeregisterRequest request;
 204         while ((request = deregisterQueue.pollFirst()) != null) {
 205             Integer index = fdToIndex.remove(request.fdVal);
 206             if (index != null) {
 207                 remove(index);
 208             }
 209             synchronized (request) {
 210                 request.notifyAll();
 211             }
 212         }
 213     }
 214 
 215     /**
 216      * Process the polled events, skipping the first (0) entry in the poll array
 217      * as that is used by the wakeup mechanism.
 218      *
 219      * @param numPolled the number of polled sockets in the array (from index 1)
 220      */
 221     private void processEvents(int numPolled) {
 222         int index = 1;
 223         int remaining = numPolled;
 224         while (index < pollArraySize && remaining > 0) {
 225             short revents = getRevents(index);
 226             if (revents != 0) {
 227                 int fd = getDescriptor(index);
 228                 assert fdToIndex.get(fd) == index;
 229                 polled(fd);
 230                 remove(index);
 231                 fdToIndex.remove(fd);
 232                 remaining--;
 233             } else {
 234                 index++;
 235             }
 236         }
 237     }
 238 
 239     /**
 240      * Wake up the poller thread
 241      */
 242     private void wakeup() {
 243         synchronized (wakeupLock) {
 244             if (!wakeupTriggered) {
 245                 try {
 246                     ND.write(fd1, TEMP_BUF, 1);
 247                 } catch (IOException ioe) {
 248                     throw new InternalError(ioe);
 249                 }
 250                 wakeupTriggered = true;
 251             }
 252         }
 253     }
 254 
 255     /**
 256      * Clear the wakeup event
 257      */
 258     private void clearWakeup() throws IOException {
 259         synchronized (wakeupLock) {
 260             ND.read(fd0, TEMP_BUF, 1);
 261             putRevents(0, (short) 0);
 262             wakeupTriggered = false;
 263         }
 264 
 265     }
 266 
 267     /**
 268      * Add a pollfd entry to the poll array.
 269      * 
 270      * @return the index of the pollfd entry in the poll array
 271      */
 272     private int add(int fd, short events) {
 273         expandIfNeeded();
 274         int index = pollArraySize;
 275         assert index > 0;
 276         putDescriptor(index, fd);
 277         putEvents(index, events);
 278         putRevents(index, (short) 0);
 279         pollArraySize++;
 280         return index;
 281     }
 282 
 283     /**
 284      * Removes a pollfd entry from the poll array.
 285      */
 286     private void remove(int index) {
 287         assert index > 0 && index < pollArraySize;
 288 
 289         // replace pollfd at index with the last pollfd in array
 290         int lastIndex = pollArraySize - 1;
 291         if (lastIndex != index) {
 292             int lastFd = getDescriptor(lastIndex);
 293             short lastEvents = getEvents(lastIndex);
 294             short lastRevents = getRevents(lastIndex);
 295             putDescriptor(index, lastFd);
 296             putEvents(index, lastEvents);
 297             putRevents(index, lastRevents);
 298 
 299             assert fdToIndex.get(lastFd) == lastIndex;
 300             fdToIndex.put(lastFd, index);
 301         }
 302         pollArraySize--;
 303     }
 304 
 305     /**
 306      * Expand poll array if at capacity.
 307      */
 308     private void expandIfNeeded() {
 309         if (pollArraySize == pollArrayCapacity) {
 310             int oldSize = pollArrayCapacity * SIZE_POLLFD;
 311             int newCapacity = pollArrayCapacity + INITIAL_CAPACITY;
 312             int newSize = newCapacity * SIZE_POLLFD;
 313             AllocatedNativeObject newPollArray = new AllocatedNativeObject(newSize, false);
 314             UNSAFE.copyMemory(pollArray.address(), newPollArray.address(), oldSize);
 315             pollArray.free();
 316             pollArray = newPollArray;
 317             pollArrayCapacity = newCapacity;
 318         }
 319     }
 320 
 321     /**
 322      * Returns a PipeImpl. The creation is done on the carrier thread to avoid
 323      * recursive parking when the loopback connection is created.
 324      */
 325     private static PipeImpl makePipe() throws IOException {
 326         try {
 327             return JLA.executeOnCarrierThread(() -> new PipeImpl(null));
 328         } catch (IOException ioe) {
 329             throw ioe;
 330         } catch (Throwable e) {
 331             throw new InternalError(e);
 332         }
 333     }
 334 
 335     /**
 336      * typedef struct pollfd {
 337      *   SOCKET fd;
 338      *   SHORT events;
 339      *   SHORT revents;
 340      * } WSAPOLLFD;
 341      */
 342     private static final short SIZE_POLLFD    = 16;
 343     private static final short FD_OFFSET      = 0;
 344     private static final short EVENTS_OFFSET  = 8;
 345     private static final short REVENTS_OFFSET = 10;
 346 
 347     private void putDescriptor(int i, int fd) {
 348         int offset = SIZE_POLLFD * i + FD_OFFSET;
 349         pollArray.putLong(offset, fd);
 350     }
 351 
 352     private int getDescriptor(int i) {
 353         int offset = SIZE_POLLFD * i + FD_OFFSET;
 354         return (int) pollArray.getLong(offset);
 355     }
 356 
 357     private void putEvents(int i, short events) {
 358         int offset = SIZE_POLLFD * i + EVENTS_OFFSET;
 359         pollArray.putShort(offset, events);
 360     }
 361 
 362     private short getEvents(int i) {
 363         int offset = SIZE_POLLFD * i + EVENTS_OFFSET;
 364         return pollArray.getShort(offset);
 365     }
 366 
 367     private void putRevents(int i, short revents) {
 368         int offset = SIZE_POLLFD * i + REVENTS_OFFSET;
 369         pollArray.putShort(offset, revents);
 370     }
 371 
 372     private short getRevents(int i) {
 373         int offset = SIZE_POLLFD * i + REVENTS_OFFSET;
 374         return pollArray.getShort(offset);
 375     }
 376 
 377     private static native int poll(long pollAddress, int numfds, int timeout)
 378         throws IOException;
 379 
 380     static {
 381         IOUtil.load();
 382     }
 383 }