1 /*
  2  * Copyright (c) 2020, 2022, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 
 26 package sun.nio.ch;
 27 
 28 import java.io.FileDescriptor;
 29 import java.io.IOException;
 30 import java.nio.ByteBuffer;
 31 import java.nio.channels.ClosedSelectorException;
 32 import java.nio.channels.Pipe;
 33 import java.nio.channels.SelectionKey;
 34 import java.nio.channels.Selector;
 35 import java.nio.channels.spi.SelectorProvider;
 36 import java.util.ArrayDeque;
 37 import java.util.Deque;
 38 import java.util.HashMap;
 39 import java.util.Map;
 40 import java.util.function.Consumer;
 41 import jdk.internal.misc.Blocker;
 42 
 43 import static sun.nio.ch.WEPoll.*;
 44 
 45 /**
 46  * Windows wepoll based Selector implementation
 47  */
 48 class WEPollSelectorImpl extends SelectorImpl {
 49     // maximum number of events to poll in one call to epoll_wait
 50     private static final int NUM_EPOLLEVENTS = 256;
 51 
 52     // wepoll handle
 53     private final long eph;
 54 
 55     // address of epoll_event array when polling with epoll_wait
 56     private final long pollArrayAddress;
 57 
 58     // maps SOCKET to selection key, synchronize on selector
 59     private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
 60 
 61     // pending new registrations/updates, queued by setEventOps
 62     private final Object updateLock = new Object();
 63     private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
 64 
 65     // interrupt/wakeup
 66     private final Object interruptLock = new Object();
 67     private boolean interruptTriggered;
 68     private final PipeImpl pipe;
 69     private final int fd0Val, fd1Val;
 70 
 71     WEPollSelectorImpl(SelectorProvider sp) throws IOException {
 72         super(sp);
 73 
 74         this.eph = WEPoll.create();
 75         this.pollArrayAddress = WEPoll.allocatePollArray(NUM_EPOLLEVENTS);
 76 
 77         // wakeup support
 78         try {
 79             this.pipe = new PipeImpl(sp, /* AF_UNIX */ true, /*buffering*/ false);
 80         } catch (IOException ioe) {
 81             WEPoll.freePollArray(pollArrayAddress);
 82             WEPoll.close(eph);
 83             throw ioe;
 84         }
 85         this.fd0Val = pipe.source().getFDVal();
 86         this.fd1Val = pipe.sink().getFDVal();
 87 
 88         // register one end of the pipe for wakeups
 89         WEPoll.ctl(eph, EPOLL_CTL_ADD, fd0Val, WEPoll.EPOLLIN);
 90     }
 91 
 92     private void ensureOpen() {
 93         if (!isOpen())
 94             throw new ClosedSelectorException();
 95     }
 96 
 97     @Override
 98     protected int doSelect(Consumer<SelectionKey> action, long timeout)
 99         throws IOException
100     {
101         assert Thread.holdsLock(this);
102 
103         // epoll_wait timeout is int
104         int to = (int) Math.min(timeout, Integer.MAX_VALUE);
105         boolean blocking = (to != 0);
106 
107         int numEntries;
108         processUpdateQueue();
109         processDeregisterQueue();
110         try {
111             begin(blocking);
112             long comp = Blocker.begin(blocking);
113             try {
114                 numEntries = WEPoll.wait(eph, pollArrayAddress, NUM_EPOLLEVENTS, to);
115             } finally {
116                 Blocker.end(comp);
117             }
118         } finally {
119             end(blocking);
120         }
121         processDeregisterQueue();
122         return processEvents(numEntries, action);
123     }
124 
125     /**
126      * Process changes to the interest ops.
127      */
128     private void processUpdateQueue() {
129         assert Thread.holdsLock(this);
130 
131         synchronized (updateLock) {
132             SelectionKeyImpl ski;
133             while ((ski = updateKeys.pollFirst()) != null) {
134                 if (ski.isValid()) {
135                     int fd = ski.getFDVal();
136                     // add to fdToKey if needed
137                     SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
138                     assert (previous == null) || (previous == ski);
139                     int newOps = ski.translateInterestOps();
140                     int registeredOps = ski.registeredEvents();
141                     if (newOps != registeredOps) {
142                         if (newOps == 0) {
143                             // remove from epoll
144                             WEPoll.ctl(eph, EPOLL_CTL_DEL, fd, 0);
145                         } else {
146                             int events = toEPollEvents(newOps);
147                             if (registeredOps == 0) {
148                                 // add to epoll
149                                 WEPoll.ctl(eph, EPOLL_CTL_ADD, fd, events);
150                             } else {
151                                 // modify events
152                                 WEPoll.ctl(eph, EPOLL_CTL_MOD, fd, events);
153                             }
154                         }
155                         ski.registeredEvents(newOps);
156                     }
157                 }
158             }
159         }
160     }
161 
162     /**
163      * Process the polled events.
164      * If the interrupt fd has been selected, drain it and clear the interrupt.
165      */
166     private int processEvents(int numEntries, Consumer<SelectionKey> action)
167         throws IOException
168     {
169         assert Thread.holdsLock(this);
170 
171         boolean interrupted = false;
172         int numKeysUpdated = 0;
173         for (int i = 0; i < numEntries; i++) {
174             long event = WEPoll.getEvent(pollArrayAddress, i);
175             int fd = WEPoll.getDescriptor(event);
176             if (fd == fd0Val) {
177                 interrupted = true;
178             } else {
179                 SelectionKeyImpl ski = fdToKey.get(fd);
180                 if (ski != null) {
181                     int events = WEPoll.getEvents(event);
182                     if ((events & WEPoll.EPOLLPRI) != 0) {
183                         Net.discardOOB(ski.getFD());
184                     }
185                     int rOps = toReadyOps(events);
186                     numKeysUpdated += processReadyEvents(rOps, ski, action);
187                 }
188             }
189         }
190 
191         if (interrupted) {
192             clearInterrupt();
193         }
194 
195         return numKeysUpdated;
196     }
197 
198     @Override
199     protected void implClose() throws IOException {
200         assert !isOpen() && Thread.holdsLock(this);
201 
202         // prevent further wakeup
203         synchronized (interruptLock) {
204             interruptTriggered = true;
205         }
206 
207         // close the epoll port and free resources
208         WEPoll.close(eph);
209         WEPoll.freePollArray(pollArrayAddress);
210         pipe.sink().close();
211         pipe.source().close();
212     }
213 
214     @Override
215     protected void implDereg(SelectionKeyImpl ski) throws IOException {
216         assert !ski.isValid() && Thread.holdsLock(this);
217 
218         int fd = ski.getFDVal();
219         if (fdToKey.remove(fd) != null) {
220             if (ski.registeredEvents() != 0) {
221                 WEPoll.ctl(eph, EPOLL_CTL_DEL, fd, 0);
222                 ski.registeredEvents(0);
223             }
224         } else {
225             assert ski.registeredEvents() == 0;
226         }
227     }
228 
229     @Override
230     public void setEventOps(SelectionKeyImpl ski) {
231         ensureOpen();
232         synchronized (updateLock) {
233             updateKeys.addLast(ski);
234         }
235     }
236 
237     @Override
238     public Selector wakeup() {
239         synchronized (interruptLock) {
240             if (!interruptTriggered) {
241                 try {
242                     IOUtil.write1(fd1Val, (byte) 0);
243                 } catch (IOException ioe) {
244                     throw new InternalError(ioe);
245                 }
246                 interruptTriggered = true;
247             }
248         }
249         return this;
250     }
251 
252     private void clearInterrupt() throws IOException {
253         synchronized (interruptLock) {
254             IOUtil.drain(fd0Val);
255             interruptTriggered = false;
256         }
257     }
258 
259     /**
260      * Maps interest ops to epoll events
261      */
262     private static int toEPollEvents(int ops) {
263         int events = EPOLLPRI;
264         if ((ops & Net.POLLIN) != 0)
265             events |= EPOLLIN;
266         if ((ops & (Net.POLLOUT | Net.POLLCONN)) != 0)
267             events |= EPOLLOUT;
268         return events;
269     }
270 
271     /**
272      * Map epoll events to ready ops
273      */
274     private static int toReadyOps(int events) {
275         int ops = 0;
276         if ((events & WEPoll.EPOLLIN) != 0) ops |= Net.POLLIN;
277         if ((events & WEPoll.EPOLLOUT) != 0) ops |= (Net.POLLOUT | Net.POLLCONN);
278         if ((events & WEPoll.EPOLLHUP) != 0) ops |= Net.POLLHUP;
279         if ((events & WEPoll.EPOLLERR) != 0) ops |= Net.POLLERR;
280         return ops;
281     }
282 }