1 /*
  2  * Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package sun.nio.ch;
 26 
 27 import java.io.IOError;
 28 import java.io.IOException;
 29 import java.util.Map;
 30 import java.util.concurrent.ConcurrentHashMap;
 31 import java.util.stream.Stream;
 32 import jdk.internal.misc.InnocuousThread;
 33 import jdk.internal.access.JavaLangAccess;
 34 import jdk.internal.access.SharedSecrets;
 35 import jdk.internal.misc.VirtualThreads;
 36 
 37 /**
 38  * A Poller of file descriptors. A virtual thread registers the file descriptor
 39  * for a socket with a Poller before parking. The poller unparks the thread when
 40  * the socket is ready for I/O.
 41  */
 42 public abstract class Poller {
 43     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 44     private static final Poller READ_POLLER;
 45     private static final Poller WRITE_POLLER;
 46     static {
 47         PollerProvider provider = PollerProvider.provider();
 48         try {
 49             READ_POLLER = provider.readPoller();
 50             WRITE_POLLER = provider.writePoller();
 51             READ_POLLER.startPollerThread("Read-Poller");
 52             WRITE_POLLER.startPollerThread("Write-Poller");
 53         } catch (IOException ioe) {
 54             throw new IOError(ioe);
 55         }
 56     }
 57 
 58     /**
 59      * Start a platform thread with the given name to poll file descriptors
 60      * registered with this poller.
 61      */
 62     private void startPollerThread(String name) {
 63         Runnable task = () -> {
 64             try {
 65                 for (;;) {
 66                     poll();
 67                 }
 68             } catch (Exception e) {
 69                 e.printStackTrace();
 70             }
 71         };
 72         try {
 73             Thread thread = JLA.executeOnCarrierThread(() ->
 74                 InnocuousThread.newSystemThread(name, task)
 75             );
 76             thread.setDaemon(true);
 77             thread.start();
 78         } catch (Exception e) {
 79             throw new InternalError(e);
 80         }
 81     }
 82 
 83     /**
 84      * Registers the current thread to be unparked when a file descriptor is
 85      * ready for I/O.
 86      *
 87      * @throws IOException if the register fails
 88      * @throws IllegalArgumentException if the event is not POLLIN or POLLOUT
 89      * @throws IllegalStateException if another thread is already registered
 90      *         to be unparked when the file descriptor is ready for this event
 91      */
 92     static void register(int fdVal, int event) throws IOException {
 93         if (event == Net.POLLIN) {
 94             READ_POLLER.register(fdVal);
 95         } else if (event == Net.POLLOUT) {
 96             WRITE_POLLER.register(fdVal);
 97         } else {
 98             throw new IllegalArgumentException("Unknown event " + event);
 99         }
100     }
101 
102     /**
103      * Deregister the current thread so it will not be unparked when a file descriptor
104      * is ready for I/O.
105      *
106      * @throws IllegalArgumentException if the event is not POLLIN or POLLOUT
107      */
108     static void deregister(int fdVal, int event) {
109         if (event == Net.POLLIN) {
110             READ_POLLER.deregister(fdVal);
111         } else if (event == Net.POLLOUT) {
112             WRITE_POLLER.deregister(fdVal);
113         } else {
114             throw new IllegalArgumentException("Unknown event " + event);
115         }
116     }
117 
118     /**
119      * Stops polling the file descriptor for the given event and unpark any
120      * strand registered to be unparked when the file descriptor is ready for I/O.
121      */
122     static void stopPoll(int fdVal, int event) {
123         if (event == Net.POLLIN) {
124             READ_POLLER.wakeup(fdVal);
125         } else if (event == Net.POLLOUT) {
126             WRITE_POLLER.wakeup(fdVal);
127         } else {
128             throw new IllegalArgumentException();
129         }
130     }
131 
132     /**
133      * Stops polling the file descriptor and unpark any threads that are registered
134      * to be unparked when the file descriptor is ready for I/O.
135      */
136     static void stopPoll(int fdVal) {
137         stopPoll(fdVal, Net.POLLIN);
138         stopPoll(fdVal, Net.POLLOUT);
139     }
140 
141     private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
142 
143     protected Poller() { }
144 
145     private void register(int fdVal) throws IOException {
146         assert Thread.currentThread().isVirtual();
147         Thread previous = map.putIfAbsent(fdVal, Thread.currentThread());
148         assert previous == null;
149         implRegister(fdVal);
150     }
151 
152     private void deregister(int fdVal) {
153         Thread previous = map.remove(fdVal);
154         assert previous == null || previous == Thread.currentThread();
155     }
156 
157     private Stream<Thread> registeredThreads() {
158         return map.values().stream();
159     }
160 
161     private void wakeup(int fdVal) {
162         Thread t = map.remove(fdVal);
163         if (t != null) {
164             implDeregister(fdVal);
165             VirtualThreads.unpark(t);
166         }
167     }
168 
169     /**
170      * Called by the polling facility when the file descriptor is polled
171      */
172     final void polled(int fdVal) {
173         Thread t = map.remove(fdVal);
174         if (t != null) {
175             VirtualThreads.unpark(t);
176         }
177     }
178 
179     /**
180      * Poll for events. The {@link #polled(int)} method is invoked for each
181      * polled file descriptor.
182      *
183      * @param timeout if positive then block for up to {@code timeout} milliseconds,
184      *     if zero then don't block, if -1 then block indefinitely
185      */
186     abstract int poll(int timeout) throws IOException;
187 
188     /**
189      * Poll for events, blocks indefinitely.
190      */
191     final int poll() throws IOException {
192         return poll(-1);
193     }
194 
195     /**
196      * Poll for events, non-blocking.
197      */
198     final int pollNow() throws IOException {
199         return poll(0);
200     }
201 
202     /**
203      * Returns the poller's file descriptor, or -1 if none.
204      */
205     int fdVal() {
206         return -1;
207     }
208 
209     /**
210      * Register the file descriptor
211      */
212     abstract void implRegister(int fdVal) throws IOException;
213 
214     /**
215      * Deregister (or disarm) the file descriptor
216      */
217     abstract void implDeregister(int fdVal);
218 
219     /**
220      * Return a stream of all threads blocked waiting for I/O operations.
221      */
222     public static Stream<Thread> blockedThreads() {
223         return Stream.concat(READ_POLLER.registeredThreads(),
224                 WRITE_POLLER.registeredThreads());
225     }
226 }