1 /*
  2  * Copyright (c) 2001, 2018, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package sun.nio.ch;
 26 
 27 import java.io.IOException;
 28 import java.nio.channels.ClosedSelectorException;
 29 import java.nio.channels.SelectionKey;
 30 import java.nio.channels.Selector;
 31 import java.nio.channels.spi.SelectorProvider;
 32 import java.util.ArrayDeque;
 33 import java.util.ArrayList;
 34 import java.util.Deque;
 35 import java.util.List;
 36 import java.util.concurrent.TimeUnit;
 37 import java.util.function.Consumer;
 38 import jdk.internal.misc.Unsafe;
 39 
 40 /**
 41  * Selector implementation based on poll
 42  */
 43 
 44 class PollSelectorImpl extends SelectorImpl {
 45 
 46     // initial capacity of poll array
 47     private static final int INITIAL_CAPACITY = 16;
 48 
 49     // poll array, grows as needed
 50     private int pollArrayCapacity = INITIAL_CAPACITY;
 51     private int pollArraySize;
 52     private AllocatedNativeObject pollArray;
 53 
 54     // file descriptors used for interrupt
 55     private final int fd0;
 56     private final int fd1;
 57 
 58     // keys for file descriptors in poll array, synchronize on selector
 59     private final List<SelectionKeyImpl> pollKeys = new ArrayList<>();
 60 
 61     // pending updates, queued by putEventOps
 62     private final Object updateLock = new Object();
 63     private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
 64 
 65     // interrupt triggering and clearing
 66     private final Object interruptLock = new Object();
 67     private boolean interruptTriggered;
 68 
 69     PollSelectorImpl(SelectorProvider sp) throws IOException {
 70         super(sp);
 71 
 72         int size = pollArrayCapacity * SIZE_POLLFD;
 73         this.pollArray = new AllocatedNativeObject(size, false);
 74 
 75         try {
 76             long fds = IOUtil.makePipe(false);
 77             this.fd0 = (int) (fds >>> 32);
 78             this.fd1 = (int) fds;
 79         } catch (IOException ioe) {
 80             pollArray.free();
 81             throw ioe;
 82         }
 83 
 84         // wakeup support
 85         synchronized (this) {
 86             setFirst(fd0, Net.POLLIN);
 87         }
 88     }
 89 
 90     private void ensureOpen() {
 91         if (!isOpen())
 92             throw new ClosedSelectorException();
 93     }
 94 
 95     @Override
 96     protected int doSelect(Consumer<SelectionKey> action, long timeout)
 97         throws IOException
 98     {
 99         assert Thread.holdsLock(this);
100 
101         int to = (int) Math.min(timeout, Integer.MAX_VALUE); // max poll timeout
102         boolean blocking = (to != 0);
103         boolean timedPoll = (to > 0);
104 
105         processUpdateQueue();
106         processDeregisterQueue();
107         try {
108             begin(blocking);
109 
110             int numPolled;
111             do {
112                 long startTime = timedPoll ? System.nanoTime() : 0;
113                 numPolled = poll(to);
114                 if (numPolled == IOStatus.INTERRUPTED && timedPoll) {
115                     // timed poll interrupted so need to adjust timeout
116                     long adjust = System.nanoTime() - startTime;
117                     to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
118                     if (to <= 0) {
119                         // timeout expired so no retry
120                         numPolled = 0;
121                     }
122                 }
123             } while (numPolled == IOStatus.INTERRUPTED);
124             assert numPolled <= pollArraySize;
125 
126         } finally {
127             end(blocking);
128         }
129 
130         processDeregisterQueue();
131         return processEvents(action);
132     }
133 
134     @Override
135     protected int implPoll(long timeout) throws IOException {
136         return poll(pollArray.address(), pollArraySize, (int) timeout);
137     }
138 
139     /**
140      * Process changes to the interest ops.
141      */
142     private void processUpdateQueue() {
143         assert Thread.holdsLock(this);
144 
145         synchronized (updateLock) {
146             SelectionKeyImpl ski;
147             while ((ski = updateKeys.pollFirst()) != null) {
148                 int newEvents = ski.translateInterestOps();
149                 if (ski.isValid()) {
150                     int index = ski.getIndex();
151                     assert index >= 0 && index < pollArraySize;
152                     if (index > 0) {
153                         assert pollKeys.get(index) == ski;
154                         if (newEvents == 0) {
155                             remove(ski);
156                         } else {
157                             update(ski, newEvents);
158                         }
159                     } else if (newEvents != 0) {
160                         add(ski, newEvents);
161                     }
162                 }
163             }
164         }
165     }
166 
167     /**
168      * Process the polled events.
169      * If the interrupt fd has been selected, drain it and clear the interrupt.
170      */
171     private int processEvents(Consumer<SelectionKey> action)
172         throws IOException
173     {
174         assert Thread.holdsLock(this);
175         assert pollArraySize > 0 && pollArraySize == pollKeys.size();
176 
177         int numKeysUpdated = 0;
178         for (int i = 1; i < pollArraySize; i++) {
179             int rOps = getReventOps(i);
180             if (rOps != 0) {
181                 SelectionKeyImpl ski = pollKeys.get(i);
182                 assert ski.getFDVal() == getDescriptor(i);
183                 if (ski.isValid()) {
184                     numKeysUpdated += processReadyEvents(rOps, ski, action);
185                 }
186             }
187         }
188 
189         // check for interrupt
190         if (getReventOps(0) != 0) {
191             assert getDescriptor(0) == fd0;
192             clearInterrupt();
193         }
194 
195         return numKeysUpdated;
196     }
197 
198     @Override
199     protected void implClose() throws IOException {
200         assert !isOpen();
201         assert Thread.holdsLock(this);
202 
203         // prevent further wakeup
204         synchronized (interruptLock) {
205             interruptTriggered = true;
206         }
207 
208         pollArray.free();
209         FileDispatcherImpl.closeIntFD(fd0);
210         FileDispatcherImpl.closeIntFD(fd1);
211     }
212 
213     @Override
214     protected void implRegister(SelectionKeyImpl ski) {
215         assert ski.getIndex() == 0;
216         ensureOpen();
217     }
218 
219     @Override
220     protected void implDereg(SelectionKeyImpl ski) throws IOException {
221         assert !ski.isValid();
222         assert Thread.holdsLock(this);
223 
224         // remove from poll array
225         int index = ski.getIndex();
226         if (index > 0) {
227             remove(ski);
228         }
229     }
230 
231     @Override
232     public void setEventOps(SelectionKeyImpl ski) {
233         ensureOpen();
234         synchronized (updateLock) {
235             updateKeys.addLast(ski);
236         }
237     }
238 
239     @Override
240     public Selector wakeup() {
241         synchronized (interruptLock) {
242             if (!interruptTriggered) {
243                 try {
244                     IOUtil.write1(fd1, (byte)0);
245                 } catch (IOException ioe) {
246                     throw new InternalError(ioe);
247                 }
248                 interruptTriggered = true;
249             }
250         }
251         return this;
252     }
253 
254     private void clearInterrupt() throws IOException {
255         synchronized (interruptLock) {
256             IOUtil.drain(fd0);
257             interruptTriggered = false;
258         }
259     }
260 
261     /**
262      * Sets the first pollfd enty in the poll array to the given fd
263      */
264     private void setFirst(int fd, int ops) {
265         assert pollArraySize == 0;
266         assert pollKeys.isEmpty();
267 
268         putDescriptor(0, fd);
269         putEventOps(0, ops);
270         pollArraySize = 1;
271 
272         pollKeys.add(null);  // dummy element
273     }
274 
275     /**
276      * Adds a pollfd entry to the poll array, expanding the poll array if needed.
277      */
278     private void add(SelectionKeyImpl ski, int ops) {
279         expandIfNeeded();
280 
281         int index = pollArraySize;
282         assert index > 0;
283         putDescriptor(index, ski.getFDVal());
284         putEventOps(index, ops);
285         putReventOps(index, 0);
286         ski.setIndex(index);
287         pollArraySize++;
288 
289         pollKeys.add(ski);
290         assert pollKeys.size() == pollArraySize;
291     }
292 
293     /**
294      * Update the events of pollfd entry.
295      */
296     private void update(SelectionKeyImpl ski, int ops) {
297         int index = ski.getIndex();
298         assert index > 0 && index < pollArraySize;
299         assert getDescriptor(index) == ski.getFDVal();
300         putEventOps(index, ops);
301     }
302 
303     /**
304      * Removes a pollfd entry from the poll array
305      */
306     private void remove(SelectionKeyImpl ski) {
307         int index = ski.getIndex();
308         assert index > 0 && index < pollArraySize;
309         assert getDescriptor(index) == ski.getFDVal();
310 
311         // replace pollfd at index with the last pollfd in array
312         int lastIndex = pollArraySize - 1;
313         if (lastIndex != index) {
314             SelectionKeyImpl lastKey = pollKeys.get(lastIndex);
315             assert lastKey.getIndex() == lastIndex;
316             int lastFd = getDescriptor(lastIndex);
317             int lastOps = getEventOps(lastIndex);
318             int lastRevents = getReventOps(lastIndex);
319             assert lastKey.getFDVal() == lastFd;
320             putDescriptor(index, lastFd);
321             putEventOps(index, lastOps);
322             putReventOps(index, lastRevents);
323             pollKeys.set(index, lastKey);
324             lastKey.setIndex(index);
325         }
326         pollKeys.remove(lastIndex);
327         pollArraySize--;
328         assert pollKeys.size() == pollArraySize;
329 
330         ski.setIndex(0);
331     }
332 
333     /**
334      * Expand poll array if at capacity
335      */
336     private void expandIfNeeded() {
337         if (pollArraySize == pollArrayCapacity) {
338             int oldSize = pollArrayCapacity * SIZE_POLLFD;
339             int newCapacity = pollArrayCapacity + INITIAL_CAPACITY;
340             int newSize = newCapacity * SIZE_POLLFD;
341             AllocatedNativeObject newPollArray = new AllocatedNativeObject(newSize, false);
342             Unsafe.getUnsafe().copyMemory(pollArray.address(), newPollArray.address(), oldSize);
343             pollArray.free();
344             pollArray = newPollArray;
345             pollArrayCapacity = newCapacity;
346         }
347     }
348 
349     private static final short SIZE_POLLFD   = 8;
350     private static final short FD_OFFSET     = 0;
351     private static final short EVENT_OFFSET  = 4;
352     private static final short REVENT_OFFSET = 6;
353 
354     private void putDescriptor(int i, int fd) {
355         int offset = SIZE_POLLFD * i + FD_OFFSET;
356         pollArray.putInt(offset, fd);
357     }
358 
359     private int getDescriptor(int i) {
360         int offset = SIZE_POLLFD * i + FD_OFFSET;
361         return pollArray.getInt(offset);
362     }
363 
364     private void putEventOps(int i, int event) {
365         int offset = SIZE_POLLFD * i + EVENT_OFFSET;
366         pollArray.putShort(offset, (short)event);
367     }
368 
369     private int getEventOps(int i) {
370         int offset = SIZE_POLLFD * i + EVENT_OFFSET;
371         return pollArray.getShort(offset);
372     }
373 
374     private void putReventOps(int i, int revent) {
375         int offset = SIZE_POLLFD * i + REVENT_OFFSET;
376         pollArray.putShort(offset, (short)revent);
377     }
378 
379     private int getReventOps(int i) {
380         int offset = SIZE_POLLFD * i + REVENT_OFFSET;
381         return pollArray.getShort(offset);
382     }
383 
384     private static native int poll(long pollAddress, int numfds, int timeout);
385 
386     static {
387         IOUtil.load();
388     }
389 }