1 /*
  2  * Copyright (c) 2020, 2021, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 
 26 package sun.nio.ch;
 27 
 28 import java.io.FileDescriptor;
 29 import java.io.IOException;
 30 import java.nio.ByteBuffer;
 31 import java.nio.channels.ClosedSelectorException;
 32 import java.nio.channels.Pipe;
 33 import java.nio.channels.SelectionKey;
 34 import java.nio.channels.Selector;
 35 import java.nio.channels.spi.SelectorProvider;
 36 import java.util.ArrayDeque;
 37 import java.util.Deque;
 38 import java.util.HashMap;
 39 import java.util.Map;
 40 import java.util.function.Consumer;
 41 
 42 import static sun.nio.ch.WEPoll.*;
 43 
 44 /**
 45  * Windows wepoll based Selector implementation
 46  */
 47 class WEPollSelectorImpl extends SelectorImpl {
 48     // maximum number of events to poll in one call to epoll_wait
 49     private static final int NUM_EPOLLEVENTS = 256;
 50 
 51     // wepoll handle
 52     private final long eph;
 53 
 54     // address of epoll_event array when polling with epoll_wait
 55     private final long pollArrayAddress;
 56 
 57     // maps SOCKET to selection key, synchronize on selector
 58     private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
 59 
 60     // pending new registrations/updates, queued by setEventOps
 61     private final Object updateLock = new Object();
 62     private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
 63 
 64     // interrupt/wakeup
 65     private final Object interruptLock = new Object();
 66     private boolean interruptTriggered;
 67     private final PipeImpl pipe;
 68     private final int fd0Val, fd1Val;
 69 
 70     WEPollSelectorImpl(SelectorProvider sp) throws IOException {
 71         super(sp);
 72 
 73         this.eph = WEPoll.create();
 74         this.pollArrayAddress = WEPoll.allocatePollArray(NUM_EPOLLEVENTS);
 75 
 76         // wakeup support
 77         try {
 78             this.pipe = new PipeImpl(sp, /*buffering*/ false);
 79         } catch (IOException ioe) {
 80             WEPoll.freePollArray(pollArrayAddress);
 81             WEPoll.close(eph);
 82             throw ioe;
 83         }
 84         this.fd0Val = pipe.source().getFDVal();
 85         this.fd1Val = pipe.sink().getFDVal();
 86 
 87         // register one end of the pipe for wakeups
 88         WEPoll.ctl(eph, EPOLL_CTL_ADD, fd0Val, WEPoll.EPOLLIN);
 89     }
 90 
 91     private void ensureOpen() {
 92         if (!isOpen())
 93             throw new ClosedSelectorException();
 94     }
 95 
 96     @Override
 97     protected int doSelect(Consumer<SelectionKey> action, long timeout)
 98         throws IOException
 99     {
100         assert Thread.holdsLock(this);
101 
102         // epoll_wait timeout is int
103         int to = (int) Math.min(timeout, Integer.MAX_VALUE);
104         boolean blocking = (to != 0);
105 
106         int numEntries;
107         processUpdateQueue();
108         processDeregisterQueue();
109         try {
110             begin(blocking);
111             numEntries = WEPoll.wait(eph, pollArrayAddress, NUM_EPOLLEVENTS, to);
112         } finally {
113             end(blocking);
114         }
115         processDeregisterQueue();
116         return processEvents(numEntries, action);
117     }
118 
119     /**
120      * Process changes to the interest ops.
121      */
122     private void processUpdateQueue() {
123         assert Thread.holdsLock(this);
124 
125         synchronized (updateLock) {
126             SelectionKeyImpl ski;
127             while ((ski = updateKeys.pollFirst()) != null) {
128                 if (ski.isValid()) {
129                     int fd = ski.getFDVal();
130                     // add to fdToKey if needed
131                     SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
132                     assert (previous == null) || (previous == ski);
133                     int newOps = ski.translateInterestOps();
134                     int registeredOps = ski.registeredEvents();
135                     if (newOps != registeredOps) {
136                         if (newOps == 0) {
137                             // remove from epoll
138                             WEPoll.ctl(eph, EPOLL_CTL_DEL, fd, 0);
139                         } else {
140                             int events = toEPollEvents(newOps);
141                             if (registeredOps == 0) {
142                                 // add to epoll
143                                 WEPoll.ctl(eph, EPOLL_CTL_ADD, fd, events);
144                             } else {
145                                 // modify events
146                                 WEPoll.ctl(eph, EPOLL_CTL_MOD, fd, events);
147                             }
148                         }
149                         ski.registeredEvents(newOps);
150                     }
151                 }
152             }
153         }
154     }
155 
156     /**
157      * Process the polled events.
158      * If the interrupt fd has been selected, drain it and clear the interrupt.
159      */
160     private int processEvents(int numEntries, Consumer<SelectionKey> action)
161         throws IOException
162     {
163         assert Thread.holdsLock(this);
164 
165         boolean interrupted = false;
166         int numKeysUpdated = 0;
167         for (int i = 0; i < numEntries; i++) {
168             long event = WEPoll.getEvent(pollArrayAddress, i);
169             int fd = WEPoll.getDescriptor(event);
170             if (fd == fd0Val) {
171                 interrupted = true;
172             } else {
173                 SelectionKeyImpl ski = fdToKey.get(fd);
174                 if (ski != null) {
175                     int events = WEPoll.getEvents(event);
176                     if ((events & WEPoll.EPOLLPRI) != 0) {
177                         Net.discardOOB(ski.getFD());
178                     }
179                     int rOps = toReadyOps(events);
180                     numKeysUpdated += processReadyEvents(rOps, ski, action);
181                 }
182             }
183         }
184 
185         if (interrupted) {
186             clearInterrupt();
187         }
188 
189         return numKeysUpdated;
190     }
191 
192     @Override
193     protected void implClose() throws IOException {
194         assert !isOpen() && Thread.holdsLock(this);
195 
196         // prevent further wakeup
197         synchronized (interruptLock) {
198             interruptTriggered = true;
199         }
200 
201         // close the epoll port and free resources
202         WEPoll.close(eph);
203         WEPoll.freePollArray(pollArrayAddress);
204         pipe.sink().close();
205         pipe.source().close();
206     }
207 
208     @Override
209     protected void implDereg(SelectionKeyImpl ski) throws IOException {
210         assert !ski.isValid() && Thread.holdsLock(this);
211 
212         int fd = ski.getFDVal();
213         if (fdToKey.remove(fd) != null) {
214             if (ski.registeredEvents() != 0) {
215                 WEPoll.ctl(eph, EPOLL_CTL_DEL, fd, 0);
216                 ski.registeredEvents(0);
217             }
218         } else {
219             assert ski.registeredEvents() == 0;
220         }
221     }
222 
223     @Override
224     public void setEventOps(SelectionKeyImpl ski) {
225         ensureOpen();
226         synchronized (updateLock) {
227             updateKeys.addLast(ski);
228         }
229     }
230 
231     @Override
232     public Selector wakeup() {
233         synchronized (interruptLock) {
234             if (!interruptTriggered) {
235                 try {
236                     IOUtil.write1(fd1Val, (byte) 0);
237                 } catch (IOException ioe) {
238                     throw new InternalError(ioe);
239                 }
240                 interruptTriggered = true;
241             }
242         }
243         return this;
244     }
245 
246     private void clearInterrupt() throws IOException {
247         synchronized (interruptLock) {
248             IOUtil.drain(fd0Val);
249             interruptTriggered = false;
250         }
251     }
252 
253     /**
254      * Maps interest ops to epoll events
255      */
256     private static int toEPollEvents(int ops) {
257         int events = EPOLLPRI;
258         if ((ops & Net.POLLIN) != 0)
259             events |= EPOLLIN;
260         if ((ops & (Net.POLLOUT | Net.POLLCONN)) != 0)
261             events |= EPOLLOUT;
262         return events;
263     }
264 
265     /**
266      * Map epoll events to ready ops
267      */
268     private static int toReadyOps(int events) {
269         int ops = 0;
270         if ((events & WEPoll.EPOLLIN) != 0) ops |= Net.POLLIN;
271         if ((events & WEPoll.EPOLLOUT) != 0) ops |= (Net.POLLOUT | Net.POLLCONN);
272         if ((events & WEPoll.EPOLLHUP) != 0) ops |= Net.POLLHUP;
273         if ((events & WEPoll.EPOLLERR) != 0) ops |= Net.POLLERR;
274         return ops;
275     }
276 }