1 /*
  2  * Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 
 26 package sun.nio.ch;
 27 
 28 import java.io.IOException;
 29 import java.nio.channels.ClosedSelectorException;
 30 import java.nio.channels.IllegalSelectorException;
 31 import java.nio.channels.SelectableChannel;
 32 import java.nio.channels.SelectionKey;
 33 import java.nio.channels.spi.AbstractSelectableChannel;
 34 import java.nio.channels.spi.AbstractSelector;
 35 import java.nio.channels.spi.SelectorProvider;
 36 import java.util.ArrayDeque;
 37 import java.util.Collections;
 38 import java.util.Deque;
 39 import java.util.HashSet;
 40 import java.util.Iterator;
 41 import java.util.Objects;
 42 import java.util.Set;
 43 import java.util.concurrent.Callable;
 44 import java.util.concurrent.ConcurrentHashMap;
 45 import java.util.concurrent.ForkJoinPool;
 46 import java.util.concurrent.ForkJoinWorkerThread;
 47 import java.util.function.Consumer;
 48 
 49 import jdk.internal.access.JavaLangAccess;
 50 import jdk.internal.access.SharedSecrets;
 51 import jdk.internal.misc.Unsafe;
 52 
 53 /**
 54  * Base Selector implementation class.
 55  */
 56 
 57 public abstract class SelectorImpl
 58     extends AbstractSelector
 59 {
 60     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 61 
 62     // The set of keys registered with this Selector
 63     private final Set<SelectionKey> keys;
 64 
 65     // The set of keys with data ready for an operation
 66     private final Set<SelectionKey> selectedKeys;
 67 
 68     // Public views of the key sets
 69     private final Set<SelectionKey> publicKeys;             // Immutable
 70     private final Set<SelectionKey> publicSelectedKeys;     // Removal allowed, but not addition
 71 
 72     // pending cancelled keys for deregistration
 73     private final Deque<SelectionKeyImpl> cancelledKeys = new ArrayDeque<>();
 74 
 75     // used to check for reentrancy
 76     private boolean inSelect;
 77 
 78     // ManagedBlocker for use by virtual threads to do selection operations
 79     private ManagedSelect managedSelect;
 80 
 81     protected SelectorImpl(SelectorProvider sp) {
 82         super(sp);
 83         keys = ConcurrentHashMap.newKeySet();
 84         selectedKeys = new HashSet<>();
 85         publicKeys = Collections.unmodifiableSet(keys);
 86         publicSelectedKeys = Util.ungrowableSet(selectedKeys);
 87     }
 88 
 89     private void ensureOpen() {
 90         if (!isOpen())
 91             throw new ClosedSelectorException();
 92     }
 93 
 94     @Override
 95     public final Set<SelectionKey> keys() {
 96         ensureOpen();
 97         return publicKeys;
 98     }
 99 
100     @Override
101     public final Set<SelectionKey> selectedKeys() {
102         ensureOpen();
103         return publicSelectedKeys;
104     }
105 
106     /**
107      * Marks the beginning of a select operation that might block
108      */
109     protected final void begin(boolean blocking) {
110         if (blocking) begin();
111     }
112 
113     /**
114      * Marks the end of a select operation that may have blocked
115      */
116     protected final void end(boolean blocking) {
117         if (blocking) end();
118     }
119 
120     /**
121      * Selects the keys for channels that are ready for I/O operations.
122      *
123      * @param action  the action to perform, can be null
124      * @param timeout timeout in milliseconds to wait, 0 to not wait, -1 to
125      *                wait indefinitely
126      */
127     protected abstract int doSelect(Consumer<SelectionKey> action, long timeout)
128         throws IOException;
129 
130     private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
131         throws IOException
132     {
133         synchronized (this) {
134             ensureOpen();
135             if (inSelect)
136                 throw new IllegalStateException("select in progress");
137             inSelect = true;
138             try {
139                 synchronized (publicSelectedKeys) {
140                     return doSelect(action, timeout);
141                 }
142             } finally {
143                 inSelect = false;
144             }
145         }
146     }
147 
148     @Override
149     public final int select(long timeout) throws IOException {
150         if (timeout < 0)
151             throw new IllegalArgumentException("Negative timeout");
152         return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
153     }
154 
155     @Override
156     public final int select() throws IOException {
157         return lockAndDoSelect(null, -1);
158     }
159 
160     @Override
161     public final int selectNow() throws IOException {
162         return lockAndDoSelect(null, 0);
163     }
164 
165     @Override
166     public final int select(Consumer<SelectionKey> action, long timeout)
167         throws IOException
168     {
169         Objects.requireNonNull(action);
170         if (timeout < 0)
171             throw new IllegalArgumentException("Negative timeout");
172         return lockAndDoSelect(action, (timeout == 0) ? -1 : timeout);
173     }
174 
175     @Override
176     public final int select(Consumer<SelectionKey> action) throws IOException {
177         Objects.requireNonNull(action);
178         return lockAndDoSelect(action, -1);
179     }
180 
181     @Override
182     public final int selectNow(Consumer<SelectionKey> action) throws IOException {
183         Objects.requireNonNull(action);
184         return lockAndDoSelect(action, 0);
185     }
186 
187     /**
188      * ManagedBlocker for use by virtual threads to do selection operations
189      * on the carrier thread.
190      */
191     private static class ManagedSelect
192             implements ForkJoinPool.ManagedBlocker, Callable<Void> {
193         private final SelectorImpl selector;
194         private long timeout;
195         private int numEntries;
196         private boolean done;
197 
198         ManagedSelect(SelectorImpl selector) {
199             this.selector = selector;
200         }
201 
202         void prepare(long timeout) {
203             this.timeout = timeout;
204         }
205 
206         int result() {
207             return numEntries;
208         }
209 
210         @Override
211         public boolean block() {
212             try {
213                 numEntries = selector.implPoll(timeout);
214             } catch (IOException ioe) {
215                 Unsafe.getUnsafe().throwException(ioe);
216             } finally {
217                 done = true;
218             }
219             return true;
220         }
221 
222         @Override
223         public boolean isReleasable() {
224             return done;
225         }
226 
227         @Override
228         public Void call() {
229             done = false;
230             try {
231                 ForkJoinPool.managedBlock(this);
232             } catch (InterruptedException e) {
233                 throw new InternalError(e);
234             }
235             return null;
236         }
237     }
238 
239     /**
240      * Returns the ManagedSelect object for this Selector if running on a virtual
241      * thread and its carrier is a ForkJoinWorkerThread, otherwise returns null.
242      */
243     private ManagedSelect managedSelect() {
244         if (Thread.currentThread().isVirtual()
245                 && JLA.currentCarrierThread() instanceof ForkJoinWorkerThread) {
246             if (managedSelect == null)
247                 managedSelect = new ManagedSelect(this);
248             return managedSelect;
249         } else {
250             return null;
251         }
252     }
253 
254     /**
255      * Invoked by doSelect to poll for file descriptors that are ready for I/O
256      * operations. If invoked on a virtual thread mounted on a ForkJoinWorkerThread
257      * then blocking polls are done in ForkJoinPool.ManagedBlocker.
258      */
259     protected final int poll(long timeout) throws IOException {
260         assert Thread.holdsLock(this);
261         ManagedSelect managedSelect;
262         if (timeout != 0 && (managedSelect = managedSelect()) != null) {
263             managedSelect.prepare(timeout);
264             try {
265                 JLA.executeOnCarrierThread(managedSelect);
266             } catch (Exception e) {
267                 Unsafe.getUnsafe().throwException(e);
268             }
269             return managedSelect.result();
270         } else {
271             return implPoll(timeout);
272         }
273     }
274 
275     /**
276      * Polls for file descriptors that are ready for I/O operations.
277      * This is the low-level/inner most poll, should be invoked by doSelect.
278      */
279     protected int implPoll(long timeout) throws IOException {
280         throw new RuntimeException("Not implemented");
281     }
282 
283     /**
284      * Invoked by implCloseSelector to close the selector.
285      */
286     protected abstract void implClose() throws IOException;
287 
288     @Override
289     public final void implCloseSelector() throws IOException {
290         wakeup();
291         synchronized (this) {
292             implClose();
293             synchronized (publicSelectedKeys) {
294                 // Deregister channels
295                 Iterator<SelectionKey> i = keys.iterator();
296                 while (i.hasNext()) {
297                     SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
298                     deregister(ski);
299                     SelectableChannel selch = ski.channel();
300                     if (!selch.isOpen() && !selch.isRegistered())
301                         ((SelChImpl)selch).kill();
302                     selectedKeys.remove(ski);
303                     i.remove();
304                 }
305                 assert selectedKeys.isEmpty() && keys.isEmpty();
306             }
307         }
308     }
309 
310     @Override
311     protected final SelectionKey register(AbstractSelectableChannel ch,
312                                           int ops,
313                                           Object attachment)
314     {
315         if (!(ch instanceof SelChImpl))
316             throw new IllegalSelectorException();
317         SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
318         if (attachment != null)
319             k.attach(attachment);
320 
321         // register (if needed) before adding to key set
322         implRegister(k);
323 
324         // add to the selector's key set, removing it immediately if the selector
325         // is closed. The key is not in the channel's key set at this point but
326         // it may be observed by a thread iterating over the selector's key set.
327         keys.add(k);
328         try {
329             k.interestOps(ops);
330         } catch (ClosedSelectorException e) {
331             assert ch.keyFor(this) == null;
332             keys.remove(k);
333             k.cancel();
334             throw e;
335         }
336         return k;
337     }
338 
339     /**
340      * Register the key in the selector.
341      *
342      * The default implementation checks if the selector is open. It should
343      * be overridden by selector implementations as needed.
344      */
345     protected void implRegister(SelectionKeyImpl ski) {
346         ensureOpen();
347     }
348 
349     /**
350      * Removes the key from the selector
351      */
352     protected abstract void implDereg(SelectionKeyImpl ski) throws IOException;
353 
354     /**
355      * Queue a cancelled key for the next selection operation
356      */
357     public void cancel(SelectionKeyImpl ski) {
358         synchronized (cancelledKeys) {
359             cancelledKeys.addLast(ski);
360         }
361     }
362 
363     /**
364      * Invoked by selection operations to process the cancelled keys
365      */
366     protected final void processDeregisterQueue() throws IOException {
367         assert Thread.holdsLock(this);
368         assert Thread.holdsLock(publicSelectedKeys);
369 
370         synchronized (cancelledKeys) {
371             SelectionKeyImpl ski;
372             while ((ski = cancelledKeys.pollFirst()) != null) {
373                 // remove the key from the selector
374                 implDereg(ski);
375 
376                 selectedKeys.remove(ski);
377                 keys.remove(ski);
378 
379                 // remove from channel's key set
380                 deregister(ski);
381 
382                 SelectableChannel ch = ski.channel();
383                 if (!ch.isOpen() && !ch.isRegistered())
384                     ((SelChImpl)ch).kill();
385             }
386         }
387     }
388 
389     /**
390      * Invoked by selection operations to handle ready events. If an action
391      * is specified then it is invoked to handle the key, otherwise the key
392      * is added to the selected-key set (or updated when it is already in the
393      * set).
394      */
395     protected final int processReadyEvents(int rOps,
396                                            SelectionKeyImpl ski,
397                                            Consumer<SelectionKey> action) {
398         if (action != null) {
399             ski.translateAndSetReadyOps(rOps);
400             if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
401                 action.accept(ski);
402                 ensureOpen();
403                 return 1;
404             }
405         } else {
406             assert Thread.holdsLock(publicSelectedKeys);
407             if (selectedKeys.contains(ski)) {
408                 if (ski.translateAndUpdateReadyOps(rOps)) {
409                     return 1;
410                 }
411             } else {
412                 ski.translateAndSetReadyOps(rOps);
413                 if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
414                     selectedKeys.add(ski);
415                     return 1;
416                 }
417             }
418         }
419         return 0;
420     }
421 
422     /**
423      * Invoked by interestOps to ensure the interest ops are updated at the
424      * next selection operation.
425      */
426     protected abstract void setEventOps(SelectionKeyImpl ski);
427 }
--- EOF ---