1 /*
  2  * Copyright (c) 2001, 2022, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package sun.nio.ch;
 26 
 27 import java.io.IOException;
 28 import java.nio.channels.ClosedSelectorException;
 29 import java.nio.channels.SelectionKey;
 30 import java.nio.channels.Selector;
 31 import java.nio.channels.spi.SelectorProvider;
 32 import java.util.ArrayDeque;
 33 import java.util.ArrayList;
 34 import java.util.Deque;
 35 import java.util.List;
 36 import java.util.concurrent.TimeUnit;
 37 import java.util.function.Consumer;
 38 import jdk.internal.misc.Blocker;
 39 import jdk.internal.misc.Unsafe;
 40 
 41 /**
 42  * Selector implementation based on poll
 43  */
 44 
 45 class PollSelectorImpl extends SelectorImpl {
 46 
 47     // initial capacity of poll array
 48     private static final int INITIAL_CAPACITY = 16;
 49 
 50     // poll array, grows as needed
 51     private int pollArrayCapacity = INITIAL_CAPACITY;
 52     private int pollArraySize;
 53     private AllocatedNativeObject pollArray;
 54 
 55     // file descriptors used for interrupt
 56     private final int fd0;
 57     private final int fd1;
 58 
 59     // keys for file descriptors in poll array, synchronize on selector
 60     private final List<SelectionKeyImpl> pollKeys = new ArrayList<>();
 61 
 62     // pending updates, queued by putEventOps
 63     private final Object updateLock = new Object();
 64     private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
 65 
 66     // interrupt triggering and clearing
 67     private final Object interruptLock = new Object();
 68     private boolean interruptTriggered;
 69 
 70     PollSelectorImpl(SelectorProvider sp) throws IOException {
 71         super(sp);
 72 
 73         int size = pollArrayCapacity * SIZE_POLLFD;
 74         this.pollArray = new AllocatedNativeObject(size, false);
 75 
 76         try {
 77             long fds = IOUtil.makePipe(false);
 78             this.fd0 = (int) (fds >>> 32);
 79             this.fd1 = (int) fds;
 80         } catch (IOException ioe) {
 81             pollArray.free();
 82             throw ioe;
 83         }
 84 
 85         // wakeup support
 86         synchronized (this) {
 87             setFirst(fd0, Net.POLLIN);
 88         }
 89     }
 90 
 91     private void ensureOpen() {
 92         if (!isOpen())
 93             throw new ClosedSelectorException();
 94     }
 95 
 96     @Override
 97     protected int doSelect(Consumer<SelectionKey> action, long timeout)
 98         throws IOException
 99     {
100         assert Thread.holdsLock(this);
101 
102         int to = (int) Math.min(timeout, Integer.MAX_VALUE); // max poll timeout
103         boolean blocking = (to != 0);
104         boolean timedPoll = (to > 0);
105 
106         processUpdateQueue();
107         processDeregisterQueue();
108         try {
109             begin(blocking);
110 
111             int numPolled;
112             do {
113                 long startTime = timedPoll ? System.nanoTime() : 0;
114                 long comp = Blocker.begin();
115                 try {
116                     numPolled = poll(pollArray.address(), pollArraySize, to);
117                 } finally {
118                     Blocker.end(comp);
119                 }
120                 if (numPolled == IOStatus.INTERRUPTED && timedPoll) {
121                     // timed poll interrupted so need to adjust timeout
122                     long adjust = System.nanoTime() - startTime;
123                     to -= (int) TimeUnit.NANOSECONDS.toMillis(adjust);
124                     if (to <= 0) {
125                         // timeout expired so no retry
126                         numPolled = 0;
127                     }
128                 }
129             } while (numPolled == IOStatus.INTERRUPTED);
130             assert numPolled <= pollArraySize;
131 
132         } finally {
133             end(blocking);
134         }
135 
136         processDeregisterQueue();
137         return processEvents(action);
138     }
139 
140     /**
141      * Process changes to the interest ops.
142      */
143     private void processUpdateQueue() {
144         assert Thread.holdsLock(this);
145 
146         synchronized (updateLock) {
147             SelectionKeyImpl ski;
148             while ((ski = updateKeys.pollFirst()) != null) {
149                 int newEvents = ski.translateInterestOps();
150                 if (ski.isValid()) {
151                     int index = ski.getIndex();
152                     assert index >= 0 && index < pollArraySize;
153                     if (index > 0) {
154                         assert pollKeys.get(index) == ski;
155                         if (newEvents == 0) {
156                             remove(ski);
157                         } else {
158                             update(ski, newEvents);
159                         }
160                     } else if (newEvents != 0) {
161                         add(ski, newEvents);
162                     }
163                 }
164             }
165         }
166     }
167 
168     /**
169      * Process the polled events.
170      * If the interrupt fd has been selected, drain it and clear the interrupt.
171      */
172     private int processEvents(Consumer<SelectionKey> action)
173         throws IOException
174     {
175         assert Thread.holdsLock(this);
176         assert pollArraySize > 0 && pollArraySize == pollKeys.size();
177 
178         int numKeysUpdated = 0;
179         for (int i = 1; i < pollArraySize; i++) {
180             int rOps = getReventOps(i);
181             if (rOps != 0) {
182                 SelectionKeyImpl ski = pollKeys.get(i);
183                 assert ski.getFDVal() == getDescriptor(i);
184                 if (ski.isValid()) {
185                     numKeysUpdated += processReadyEvents(rOps, ski, action);
186                 }
187             }
188         }
189 
190         // check for interrupt
191         if (getReventOps(0) != 0) {
192             assert getDescriptor(0) == fd0;
193             clearInterrupt();
194         }
195 
196         return numKeysUpdated;
197     }
198 
199     @Override
200     protected void implClose() throws IOException {
201         assert !isOpen();
202         assert Thread.holdsLock(this);
203 
204         // prevent further wakeup
205         synchronized (interruptLock) {
206             interruptTriggered = true;
207         }
208 
209         pollArray.free();
210         FileDispatcherImpl.closeIntFD(fd0);
211         FileDispatcherImpl.closeIntFD(fd1);
212     }
213 
214     @Override
215     protected void implRegister(SelectionKeyImpl ski) {
216         assert ski.getIndex() == 0;
217         ensureOpen();
218     }
219 
220     @Override
221     protected void implDereg(SelectionKeyImpl ski) throws IOException {
222         assert !ski.isValid();
223         assert Thread.holdsLock(this);
224 
225         // remove from poll array
226         int index = ski.getIndex();
227         if (index > 0) {
228             remove(ski);
229         }
230     }
231 
232     @Override
233     public void setEventOps(SelectionKeyImpl ski) {
234         ensureOpen();
235         synchronized (updateLock) {
236             updateKeys.addLast(ski);
237         }
238     }
239 
240     @Override
241     public Selector wakeup() {
242         synchronized (interruptLock) {
243             if (!interruptTriggered) {
244                 try {
245                     IOUtil.write1(fd1, (byte)0);
246                 } catch (IOException ioe) {
247                     throw new InternalError(ioe);
248                 }
249                 interruptTriggered = true;
250             }
251         }
252         return this;
253     }
254 
255     private void clearInterrupt() throws IOException {
256         synchronized (interruptLock) {
257             IOUtil.drain(fd0);
258             interruptTriggered = false;
259         }
260     }
261 
262     /**
263      * Sets the first pollfd entry in the poll array to the given fd
264      */
265     private void setFirst(int fd, int ops) {
266         assert pollArraySize == 0;
267         assert pollKeys.isEmpty();
268 
269         putDescriptor(0, fd);
270         putEventOps(0, ops);
271         pollArraySize = 1;
272 
273         pollKeys.add(null);  // dummy element
274     }
275 
276     /**
277      * Adds a pollfd entry to the poll array, expanding the poll array if needed.
278      */
279     private void add(SelectionKeyImpl ski, int ops) {
280         expandIfNeeded();
281 
282         int index = pollArraySize;
283         assert index > 0;
284         putDescriptor(index, ski.getFDVal());
285         putEventOps(index, ops);
286         putReventOps(index, 0);
287         ski.setIndex(index);
288         pollArraySize++;
289 
290         pollKeys.add(ski);
291         assert pollKeys.size() == pollArraySize;
292     }
293 
294     /**
295      * Update the events of pollfd entry.
296      */
297     private void update(SelectionKeyImpl ski, int ops) {
298         int index = ski.getIndex();
299         assert index > 0 && index < pollArraySize;
300         assert getDescriptor(index) == ski.getFDVal();
301         putEventOps(index, ops);
302     }
303 
304     /**
305      * Removes a pollfd entry from the poll array
306      */
307     private void remove(SelectionKeyImpl ski) {
308         int index = ski.getIndex();
309         assert index > 0 && index < pollArraySize;
310         assert getDescriptor(index) == ski.getFDVal();
311 
312         // replace pollfd at index with the last pollfd in array
313         int lastIndex = pollArraySize - 1;
314         if (lastIndex != index) {
315             SelectionKeyImpl lastKey = pollKeys.get(lastIndex);
316             assert lastKey.getIndex() == lastIndex;
317             int lastFd = getDescriptor(lastIndex);
318             int lastOps = getEventOps(lastIndex);
319             int lastRevents = getReventOps(lastIndex);
320             assert lastKey.getFDVal() == lastFd;
321             putDescriptor(index, lastFd);
322             putEventOps(index, lastOps);
323             putReventOps(index, lastRevents);
324             pollKeys.set(index, lastKey);
325             lastKey.setIndex(index);
326         }
327         pollKeys.remove(lastIndex);
328         pollArraySize--;
329         assert pollKeys.size() == pollArraySize;
330 
331         ski.setIndex(0);
332     }
333 
334     /**
335      * Expand poll array if at capacity
336      */
337     private void expandIfNeeded() {
338         if (pollArraySize == pollArrayCapacity) {
339             int oldSize = pollArrayCapacity * SIZE_POLLFD;
340             int newCapacity = pollArrayCapacity + INITIAL_CAPACITY;
341             int newSize = newCapacity * SIZE_POLLFD;
342             AllocatedNativeObject newPollArray = new AllocatedNativeObject(newSize, false);
343             Unsafe.getUnsafe().copyMemory(pollArray.address(), newPollArray.address(), oldSize);
344             pollArray.free();
345             pollArray = newPollArray;
346             pollArrayCapacity = newCapacity;
347         }
348     }
349 
350     private static final short SIZE_POLLFD   = 8;
351     private static final short FD_OFFSET     = 0;
352     private static final short EVENT_OFFSET  = 4;
353     private static final short REVENT_OFFSET = 6;
354 
355     private void putDescriptor(int i, int fd) {
356         int offset = SIZE_POLLFD * i + FD_OFFSET;
357         pollArray.putInt(offset, fd);
358     }
359 
360     private int getDescriptor(int i) {
361         int offset = SIZE_POLLFD * i + FD_OFFSET;
362         return pollArray.getInt(offset);
363     }
364 
365     private void putEventOps(int i, int event) {
366         int offset = SIZE_POLLFD * i + EVENT_OFFSET;
367         pollArray.putShort(offset, (short)event);
368     }
369 
370     private int getEventOps(int i) {
371         int offset = SIZE_POLLFD * i + EVENT_OFFSET;
372         return pollArray.getShort(offset);
373     }
374 
375     private void putReventOps(int i, int revent) {
376         int offset = SIZE_POLLFD * i + REVENT_OFFSET;
377         pollArray.putShort(offset, (short)revent);
378     }
379 
380     private int getReventOps(int i) {
381         int offset = SIZE_POLLFD * i + REVENT_OFFSET;
382         return pollArray.getShort(offset);
383     }
384 
385     private static native int poll(long pollAddress, int numfds, int timeout);
386 
387     static {
388         IOUtil.load();
389     }
390 }