< prev index next >

src/java.base/share/classes/sun/nio/ch/SelectorImpl.java

Print this page

 23  * questions.
 24  */
 25 
 26 package sun.nio.ch;
 27 
 28 import java.io.IOException;
 29 import java.nio.channels.ClosedSelectorException;
 30 import java.nio.channels.IllegalSelectorException;
 31 import java.nio.channels.SelectableChannel;
 32 import java.nio.channels.SelectionKey;
 33 import java.nio.channels.spi.AbstractSelectableChannel;
 34 import java.nio.channels.spi.AbstractSelector;
 35 import java.nio.channels.spi.SelectorProvider;
 36 import java.util.ArrayDeque;
 37 import java.util.Collections;
 38 import java.util.Deque;
 39 import java.util.HashSet;
 40 import java.util.Iterator;
 41 import java.util.Objects;
 42 import java.util.Set;

 43 import java.util.concurrent.ConcurrentHashMap;


 44 import java.util.function.Consumer;
 45 



 46 
 47 /**
 48  * Base Selector implementation class.
 49  */
 50 
 51 public abstract class SelectorImpl
 52     extends AbstractSelector
 53 {


 54     // The set of keys registered with this Selector
 55     private final Set<SelectionKey> keys;
 56 
 57     // The set of keys with data ready for an operation
 58     private final Set<SelectionKey> selectedKeys;
 59 
 60     // Public views of the key sets
 61     private final Set<SelectionKey> publicKeys;             // Immutable
 62     private final Set<SelectionKey> publicSelectedKeys;     // Removal allowed, but not addition
 63 
 64     // pending cancelled keys for deregistration
 65     private final Deque<SelectionKeyImpl> cancelledKeys = new ArrayDeque<>();
 66 
 67     // used to check for reentrancy
 68     private boolean inSelect;
 69 



 70     protected SelectorImpl(SelectorProvider sp) {
 71         super(sp);
 72         keys = ConcurrentHashMap.newKeySet();
 73         selectedKeys = new HashSet<>();
 74         publicKeys = Collections.unmodifiableSet(keys);
 75         publicSelectedKeys = Util.ungrowableSet(selectedKeys);
 76     }
 77 
 78     private void ensureOpen() {
 79         if (!isOpen())
 80             throw new ClosedSelectorException();
 81     }
 82 
 83     @Override
 84     public final Set<SelectionKey> keys() {
 85         ensureOpen();
 86         return publicKeys;
 87     }
 88 
 89     @Override

156         throws IOException
157     {
158         Objects.requireNonNull(action);
159         if (timeout < 0)
160             throw new IllegalArgumentException("Negative timeout");
161         return lockAndDoSelect(action, (timeout == 0) ? -1 : timeout);
162     }
163 
164     @Override
165     public final int select(Consumer<SelectionKey> action) throws IOException {
166         Objects.requireNonNull(action);
167         return lockAndDoSelect(action, -1);
168     }
169 
170     @Override
171     public final int selectNow(Consumer<SelectionKey> action) throws IOException {
172         Objects.requireNonNull(action);
173         return lockAndDoSelect(action, 0);
174     }
175 
































































































176     /**
177      * Invoked by implCloseSelector to close the selector.
178      */
179     protected abstract void implClose() throws IOException;
180 
181     @Override
182     public final void implCloseSelector() throws IOException {
183         wakeup();
184         synchronized (this) {
185             implClose();
186             synchronized (publicSelectedKeys) {
187                 // Deregister channels
188                 Iterator<SelectionKey> i = keys.iterator();
189                 while (i.hasNext()) {
190                     SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
191                     deregister(ski);
192                     SelectableChannel selch = ski.channel();
193                     if (!selch.isOpen() && !selch.isRegistered())
194                         ((SelChImpl)selch).kill();
195                     selectedKeys.remove(ski);

 23  * questions.
 24  */
 25 
 26 package sun.nio.ch;
 27 
 28 import java.io.IOException;
 29 import java.nio.channels.ClosedSelectorException;
 30 import java.nio.channels.IllegalSelectorException;
 31 import java.nio.channels.SelectableChannel;
 32 import java.nio.channels.SelectionKey;
 33 import java.nio.channels.spi.AbstractSelectableChannel;
 34 import java.nio.channels.spi.AbstractSelector;
 35 import java.nio.channels.spi.SelectorProvider;
 36 import java.util.ArrayDeque;
 37 import java.util.Collections;
 38 import java.util.Deque;
 39 import java.util.HashSet;
 40 import java.util.Iterator;
 41 import java.util.Objects;
 42 import java.util.Set;
 43 import java.util.concurrent.Callable;
 44 import java.util.concurrent.ConcurrentHashMap;
 45 import java.util.concurrent.ForkJoinPool;
 46 import java.util.concurrent.ForkJoinWorkerThread;
 47 import java.util.function.Consumer;
 48 
 49 import jdk.internal.access.JavaLangAccess;
 50 import jdk.internal.access.SharedSecrets;
 51 import jdk.internal.misc.Unsafe;
 52 
 53 /**
 54  * Base Selector implementation class.
 55  */
 56 
 57 public abstract class SelectorImpl
 58     extends AbstractSelector
 59 {
 60     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 61 
 62     // The set of keys registered with this Selector
 63     private final Set<SelectionKey> keys;
 64 
 65     // The set of keys with data ready for an operation
 66     private final Set<SelectionKey> selectedKeys;
 67 
 68     // Public views of the key sets
 69     private final Set<SelectionKey> publicKeys;             // Immutable
 70     private final Set<SelectionKey> publicSelectedKeys;     // Removal allowed, but not addition
 71 
 72     // pending cancelled keys for deregistration
 73     private final Deque<SelectionKeyImpl> cancelledKeys = new ArrayDeque<>();
 74 
 75     // used to check for reentrancy
 76     private boolean inSelect;
 77 
 78     // ManagedBlocker for use by virtual threads to do selection operations
 79     private ManagedSelect managedSelect;
 80 
 81     protected SelectorImpl(SelectorProvider sp) {
 82         super(sp);
 83         keys = ConcurrentHashMap.newKeySet();
 84         selectedKeys = new HashSet<>();
 85         publicKeys = Collections.unmodifiableSet(keys);
 86         publicSelectedKeys = Util.ungrowableSet(selectedKeys);
 87     }
 88 
 89     private void ensureOpen() {
 90         if (!isOpen())
 91             throw new ClosedSelectorException();
 92     }
 93 
 94     @Override
 95     public final Set<SelectionKey> keys() {
 96         ensureOpen();
 97         return publicKeys;
 98     }
 99 
100     @Override

167         throws IOException
168     {
169         Objects.requireNonNull(action);
170         if (timeout < 0)
171             throw new IllegalArgumentException("Negative timeout");
172         return lockAndDoSelect(action, (timeout == 0) ? -1 : timeout);
173     }
174 
175     @Override
176     public final int select(Consumer<SelectionKey> action) throws IOException {
177         Objects.requireNonNull(action);
178         return lockAndDoSelect(action, -1);
179     }
180 
181     @Override
182     public final int selectNow(Consumer<SelectionKey> action) throws IOException {
183         Objects.requireNonNull(action);
184         return lockAndDoSelect(action, 0);
185     }
186 
187     /**
188      * ManagedBlocker for use by virtual threads to do selection operations
189      * on the carrier thread.
190      */
191     private static class ManagedSelect
192             implements ForkJoinPool.ManagedBlocker, Callable<Void> {
193         private final SelectorImpl selector;
194         private long timeout;
195         private int numEntries;
196         private boolean done;
197 
198         ManagedSelect(SelectorImpl selector) {
199             this.selector = selector;
200         }
201 
202         void prepare(long timeout) {
203             this.timeout = timeout;
204         }
205 
206         int result() {
207             return numEntries;
208         }
209 
210         @Override
211         public boolean block() {
212             try {
213                 numEntries = selector.implPoll(timeout);
214             } catch (IOException ioe) {
215                 Unsafe.getUnsafe().throwException(ioe);
216             } finally {
217                 done = true;
218             }
219             return true;
220         }
221 
222         @Override
223         public boolean isReleasable() {
224             return done;
225         }
226 
227         @Override
228         public Void call() {
229             done = false;
230             try {
231                 ForkJoinPool.managedBlock(this);
232             } catch (InterruptedException e) {
233                 throw new InternalError(e);
234             }
235             return null;
236         }
237     }
238 
239     /**
240      * Returns the ManagedSelect object for this Selector if running on a virtual
241      * thread and its carrier is a ForkJoinWorkerThread, otherwise returns null.
242      */
243     private ManagedSelect managedSelect() {
244         if (Thread.currentThread().isVirtual()
245                 && JLA.currentCarrierThread() instanceof ForkJoinWorkerThread) {
246             if (managedSelect == null)
247                 managedSelect = new ManagedSelect(this);
248             return managedSelect;
249         } else {
250             return null;
251         }
252     }
253 
254     /**
255      * Invoked by doSelect to poll for file descriptors that are ready for I/O
256      * operations. If invoked on a virtual thread mounted on a ForkJoinWorkerThread
257      * then blocking polls are done in ForkJoinPool.ManagedBlocker.
258      */
259     protected final int poll(long timeout) throws IOException {
260         assert Thread.holdsLock(this);
261         ManagedSelect managedSelect;
262         if (timeout != 0 && (managedSelect = managedSelect()) != null) {
263             managedSelect.prepare(timeout);
264             try {
265                 JLA.executeOnCarrierThread(managedSelect);
266             } catch (Exception e) {
267                 Unsafe.getUnsafe().throwException(e);
268             }
269             return managedSelect.result();
270         } else {
271             return implPoll(timeout);
272         }
273     }
274 
275     /**
276      * Polls for file descriptors that are ready for I/O operations.
277      * This is the low-level/inner most poll, should be invoked by doSelect.
278      */
279     protected int implPoll(long timeout) throws IOException {
280         throw new RuntimeException("Not implemented");
281     }
282 
283     /**
284      * Invoked by implCloseSelector to close the selector.
285      */
286     protected abstract void implClose() throws IOException;
287 
288     @Override
289     public final void implCloseSelector() throws IOException {
290         wakeup();
291         synchronized (this) {
292             implClose();
293             synchronized (publicSelectedKeys) {
294                 // Deregister channels
295                 Iterator<SelectionKey> i = keys.iterator();
296                 while (i.hasNext()) {
297                     SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
298                     deregister(ski);
299                     SelectableChannel selch = ski.channel();
300                     if (!selch.isOpen() && !selch.isRegistered())
301                         ((SelChImpl)selch).kill();
302                     selectedKeys.remove(ski);
< prev index next >