1 /*
  2  * Copyright (c) 2005, 2022, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 
 26 package sun.nio.ch;
 27 
 28 import java.io.IOException;
 29 import java.nio.channels.ClosedSelectorException;
 30 import java.nio.channels.SelectionKey;
 31 import java.nio.channels.Selector;
 32 import java.nio.channels.spi.SelectorProvider;
 33 import java.util.ArrayDeque;
 34 import java.util.Deque;
 35 import java.util.HashMap;
 36 import java.util.Map;
 37 import java.util.concurrent.TimeUnit;
 38 import java.util.function.Consumer;
 39 
 40 import static sun.nio.ch.EPoll.EPOLLIN;
 41 import static sun.nio.ch.EPoll.EPOLL_CTL_ADD;
 42 import static sun.nio.ch.EPoll.EPOLL_CTL_DEL;
 43 import static sun.nio.ch.EPoll.EPOLL_CTL_MOD;
 44 
 45 
 46 /**
 47  * Linux epoll based Selector implementation
 48  */
 49 
 50 class EPollSelectorImpl extends SelectorImpl {
 51 
 52     // maximum number of events to poll in one call to epoll_wait
 53     private static final int NUM_EPOLLEVENTS = Math.min(IOUtil.fdLimit(), 1024);
 54 
 55     // epoll file descriptor
 56     private final int epfd;
 57 
 58     // address of poll array when polling with epoll_wait
 59     private final long pollArrayAddress;
 60 
 61     // eventfd object used for interrupt
 62     private final EventFD eventfd;
 63 
 64     // maps file descriptor to selection key, synchronize on selector
 65     private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
 66 
 67     // pending new registrations/updates, queued by setEventOps
 68     private final Object updateLock = new Object();
 69     private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
 70 
 71     // interrupt triggering and clearing
 72     private final Object interruptLock = new Object();
 73     private boolean interruptTriggered;
 74 
 75     EPollSelectorImpl(SelectorProvider sp) throws IOException {
 76         super(sp);
 77 
 78         this.epfd = EPoll.create();
 79         this.pollArrayAddress = EPoll.allocatePollArray(NUM_EPOLLEVENTS);
 80 
 81         try {
 82             this.eventfd = new EventFD();
 83             IOUtil.configureBlocking(IOUtil.newFD(eventfd.efd()), false);
 84         } catch (IOException ioe) {
 85             EPoll.freePollArray(pollArrayAddress);
 86             FileDispatcherImpl.closeIntFD(epfd);
 87             throw ioe;
 88         }
 89 
 90         // register the eventfd object for wakeups
 91         EPoll.ctl(epfd, EPOLL_CTL_ADD, eventfd.efd(), EPOLLIN);
 92     }
 93 
 94     private void ensureOpen() {
 95         if (!isOpen())
 96             throw new ClosedSelectorException();
 97     }
 98 
 99     @Override
100     protected int doSelect(Consumer<SelectionKey> action, long timeout)
101         throws IOException
102     {
103         assert Thread.holdsLock(this);
104 
105         // epoll_wait timeout is int
106         int to = (int) Math.min(timeout, Integer.MAX_VALUE);
107         boolean blocking = (to != 0);
108         boolean timedPoll = (to > 0);
109 
110         int numEntries;
111         processUpdateQueue();
112         processDeregisterQueue();
113 
114         if (Thread.currentThread().isVirtual()) {
115             numEntries = (timedPoll)
116                     ? timedPoll(TimeUnit.MILLISECONDS.toNanos(to))
117                     : untimedPoll(blocking);
118         } else {
119             try {
120                 begin(blocking);
121                 do {
122                     long startTime = timedPoll ? System.nanoTime() : 0;
123                     numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to);
124                     if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
125                         // timed poll interrupted so need to adjust timeout
126                         long adjust = System.nanoTime() - startTime;
127                         to -= (int) TimeUnit.NANOSECONDS.toMillis(adjust);
128                         if (to <= 0) {
129                             // timeout expired so no retry
130                             numEntries = 0;
131                         }
132                     }
133                 } while (numEntries == IOStatus.INTERRUPTED);
134             } finally {
135                 end(blocking);
136             }
137         }
138         assert IOStatus.check(numEntries);
139 
140         processDeregisterQueue();
141         return processEvents(numEntries, action);
142     }
143 
144     /**
145      * If blocking, parks the current virtual thread until a file descriptor is polled
146      * or the thread is interrupted.
147      */
148     private int untimedPoll(boolean block) throws IOException {
149         int numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, 0);
150         if (block) {
151             while (numEntries == 0 && !Thread.currentThread().isInterrupted()) {
152                 Poller.pollSelector(epfd, 0);
153                 numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, 0);
154             }
155         }
156         return numEntries;
157     }
158 
159     /**
160      * Parks the current virtual thread until a file descriptor is polled, or the thread
161      * is interrupted, for up to the specified waiting time.
162      */
163     private int timedPoll(long nanos) throws IOException {
164         long startNanos = System.nanoTime();
165         int numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, 0);
166         while (numEntries == 0 && !Thread.currentThread().isInterrupted()) {
167             long remainingNanos = nanos - (System.nanoTime() - startNanos);
168             if (remainingNanos <= 0) {
169                 // timeout
170                 break;
171             }
172             Poller.pollSelector(epfd, remainingNanos);
173             numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, 0);
174         }
175         return numEntries;
176     }
177 
178     /**
179      * Process changes to the interest ops.
180      */
181     private void processUpdateQueue() {
182         assert Thread.holdsLock(this);
183 
184         synchronized (updateLock) {
185             SelectionKeyImpl ski;
186             while ((ski = updateKeys.pollFirst()) != null) {
187                 if (ski.isValid()) {
188                     int fd = ski.getFDVal();
189                     // add to fdToKey if needed
190                     SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
191                     assert (previous == null) || (previous == ski);
192 
193                     int newEvents = ski.translateInterestOps();
194                     int registeredEvents = ski.registeredEvents();
195                     if (newEvents != registeredEvents) {
196                         if (newEvents == 0) {
197                             // remove from epoll
198                             EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0);
199                         } else {
200                             if (registeredEvents == 0) {
201                                 // add to epoll
202                                 EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, newEvents);
203                             } else {
204                                 // modify events
205                                 EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, newEvents);
206                             }
207                         }
208                         ski.registeredEvents(newEvents);
209                     }
210                 }
211             }
212         }
213     }
214 
215     /**
216      * Process the polled events.
217      * If the interrupt fd has been selected, drain it and clear the interrupt.
218      */
219     private int processEvents(int numEntries, Consumer<SelectionKey> action)
220         throws IOException
221     {
222         assert Thread.holdsLock(this);
223 
224         boolean interrupted = false;
225         int numKeysUpdated = 0;
226         for (int i=0; i<numEntries; i++) {
227             long event = EPoll.getEvent(pollArrayAddress, i);
228             int fd = EPoll.getDescriptor(event);
229             if (fd == eventfd.efd()) {
230                 interrupted = true;
231             } else {
232                 SelectionKeyImpl ski = fdToKey.get(fd);
233                 if (ski != null) {
234                     int rOps = EPoll.getEvents(event);
235                     numKeysUpdated += processReadyEvents(rOps, ski, action);
236                 }
237             }
238         }
239 
240         if (interrupted) {
241             clearInterrupt();
242         }
243 
244         return numKeysUpdated;
245     }
246 
247     @Override
248     protected void implClose() throws IOException {
249         assert Thread.holdsLock(this);
250 
251         // prevent further wakeup
252         synchronized (interruptLock) {
253             interruptTriggered = true;
254         }
255 
256         FileDispatcherImpl.closeIntFD(epfd);
257         EPoll.freePollArray(pollArrayAddress);
258 
259         eventfd.close();
260     }
261 
262     @Override
263     protected void implDereg(SelectionKeyImpl ski) throws IOException {
264         assert !ski.isValid();
265         assert Thread.holdsLock(this);
266 
267         int fd = ski.getFDVal();
268         if (fdToKey.remove(fd) != null) {
269             if (ski.registeredEvents() != 0) {
270                 EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0);
271                 ski.registeredEvents(0);
272             }
273         } else {
274             assert ski.registeredEvents() == 0;
275         }
276     }
277 
278     @Override
279     public void setEventOps(SelectionKeyImpl ski) {
280         ensureOpen();
281         synchronized (updateLock) {
282             updateKeys.addLast(ski);
283         }
284     }
285 
286     @Override
287     public Selector wakeup() {
288         synchronized (interruptLock) {
289             if (!interruptTriggered) {
290                 try {
291                     eventfd.set();
292                 } catch (IOException ioe) {
293                     throw new InternalError(ioe);
294                 }
295                 interruptTriggered = true;
296             }
297         }
298         return this;
299     }
300 
301     private void clearInterrupt() throws IOException {
302         synchronized (interruptLock) {
303             eventfd.reset();
304             interruptTriggered = false;
305         }
306     }
307 }