18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package sun.nio.ch;
27
28 import java.io.IOException;
29 import java.nio.channels.SelectionKey;
30 import java.nio.channels.Selector;
31 import java.nio.channels.spi.SelectorProvider;
32 import java.util.ArrayDeque;
33 import java.util.Deque;
34 import java.util.HashMap;
35 import java.util.Map;
36 import java.util.concurrent.TimeUnit;
37 import java.util.function.Consumer;
38 import jdk.internal.misc.Blocker;
39
40 import static sun.nio.ch.KQueue.EVFILT_READ;
41 import static sun.nio.ch.KQueue.EVFILT_WRITE;
42 import static sun.nio.ch.KQueue.EV_ADD;
43 import static sun.nio.ch.KQueue.EV_DELETE;
44
45 /**
46 * KQueue based Selector implementation for macOS
47 */
48
49 class KQueueSelectorImpl extends SelectorImpl {
50
51 // maximum number of events to poll in one call to kqueue
52 private static final int MAX_KEVENTS = 256;
53
54 // kqueue file descriptor
55 private final int kqfd;
56
57 // address of poll array (event list) when polling for pending events
58 private final long pollArrayAddress;
92 throw ioe;
93 }
94
95 // register one end of the socket pair for wakeups
96 KQueue.register(kqfd, fd0, EVFILT_READ, EV_ADD);
97 }
98
99 @Override
100 protected int doSelect(Consumer<SelectionKey> action, long timeout)
101 throws IOException
102 {
103 assert Thread.holdsLock(this);
104
105 long to = Math.min(timeout, Integer.MAX_VALUE); // max kqueue timeout
106 boolean blocking = (to != 0);
107 boolean timedPoll = (to > 0);
108
109 int numEntries;
110 processUpdateQueue();
111 processDeregisterQueue();
112 try {
113 begin(blocking);
114
115 do {
116 long startTime = timedPoll ? System.nanoTime() : 0;
117 boolean attempted = Blocker.begin(blocking);
118 try {
119 numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
120 } finally {
121 Blocker.end(attempted);
122 }
123 if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
124 // timed poll interrupted so need to adjust timeout
125 long adjust = System.nanoTime() - startTime;
126 to -= TimeUnit.NANOSECONDS.toMillis(adjust);
127 if (to <= 0) {
128 // timeout expired so no retry
129 numEntries = 0;
130 }
131 }
132 } while (numEntries == IOStatus.INTERRUPTED);
133 assert IOStatus.check(numEntries);
134
135 } finally {
136 end(blocking);
137 }
138 processDeregisterQueue();
139 return processEvents(numEntries, action);
140 }
141
142 /**
143 * Process changes to the interest ops.
144 */
145 private void processUpdateQueue() {
146 assert Thread.holdsLock(this);
147
148 synchronized (updateLock) {
149 SelectionKeyImpl ski;
150 while ((ski = updateKeys.pollFirst()) != null) {
151 if (ski.isValid()) {
152 int fd = ski.getFDVal();
153 // add to fdToKey if needed
154 SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
155 assert (previous == null) || (previous == ski);
156
157 int newEvents = ski.translateInterestOps();
158 int registeredEvents = ski.registeredEvents();
159
160 // DatagramChannelImpl::disconnect has reset socket
161 if (ski.getAndClearReset() && registeredEvents != 0) {
|
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package sun.nio.ch;
27
28 import java.io.IOException;
29 import java.nio.channels.SelectionKey;
30 import java.nio.channels.Selector;
31 import java.nio.channels.spi.SelectorProvider;
32 import java.util.ArrayDeque;
33 import java.util.Deque;
34 import java.util.HashMap;
35 import java.util.Map;
36 import java.util.concurrent.TimeUnit;
37 import java.util.function.Consumer;
38
39 import static sun.nio.ch.KQueue.EVFILT_READ;
40 import static sun.nio.ch.KQueue.EVFILT_WRITE;
41 import static sun.nio.ch.KQueue.EV_ADD;
42 import static sun.nio.ch.KQueue.EV_DELETE;
43
44 /**
45 * KQueue based Selector implementation for macOS
46 */
47
48 class KQueueSelectorImpl extends SelectorImpl {
49
50 // maximum number of events to poll in one call to kqueue
51 private static final int MAX_KEVENTS = 256;
52
53 // kqueue file descriptor
54 private final int kqfd;
55
56 // address of poll array (event list) when polling for pending events
57 private final long pollArrayAddress;
91 throw ioe;
92 }
93
94 // register one end of the socket pair for wakeups
95 KQueue.register(kqfd, fd0, EVFILT_READ, EV_ADD);
96 }
97
98 @Override
99 protected int doSelect(Consumer<SelectionKey> action, long timeout)
100 throws IOException
101 {
102 assert Thread.holdsLock(this);
103
104 long to = Math.min(timeout, Integer.MAX_VALUE); // max kqueue timeout
105 boolean blocking = (to != 0);
106 boolean timedPoll = (to > 0);
107
108 int numEntries;
109 processUpdateQueue();
110 processDeregisterQueue();
111
112 if (Thread.currentThread().isVirtual()) {
113 numEntries = (timedPoll)
114 ? timedPoll(TimeUnit.MILLISECONDS.toNanos(to))
115 : untimedPoll(blocking);
116 } else {
117 try {
118 begin(blocking);
119 do {
120 long startTime = timedPoll ? System.nanoTime() : 0;
121 numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
122 if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
123 // timed poll interrupted so need to adjust timeout
124 long adjust = System.nanoTime() - startTime;
125 to -= TimeUnit.NANOSECONDS.toMillis(adjust);
126 if (to <= 0) {
127 // timeout expired so no retry
128 numEntries = 0;
129 }
130 }
131 } while (numEntries == IOStatus.INTERRUPTED);
132 } finally {
133 end(blocking);
134 }
135 }
136 assert IOStatus.check(numEntries);
137
138 processDeregisterQueue();
139 return processEvents(numEntries, action);
140 }
141
142 /**
143 * If blocking, parks the current virtual thread until a file descriptor is polled
144 * or the thread is interrupted.
145 */
146 private int untimedPoll(boolean block) throws IOException {
147 int numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
148 if (block) {
149 while (numEntries == 0 && !Thread.currentThread().isInterrupted()) {
150 Poller.pollSelector(kqfd, 0);
151 numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
152 }
153 }
154 return numEntries;
155 }
156
157 /**
158 * Parks the current virtual thread until a file descriptor is polled, or the thread
159 * is interrupted, for up to the specified waiting time.
160 */
161 private int timedPoll(long nanos) throws IOException {
162 long startNanos = System.nanoTime();
163 int numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
164 while (numEntries == 0 && !Thread.currentThread().isInterrupted()) {
165 long remainingNanos = nanos - (System.nanoTime() - startNanos);
166 if (remainingNanos <= 0) {
167 // timeout
168 break;
169 }
170 Poller.pollSelector(kqfd, remainingNanos);
171 numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
172 }
173 return numEntries;
174 }
175
176 /**
177 * Process changes to the interest ops.
178 */
179 private void processUpdateQueue() {
180 assert Thread.holdsLock(this);
181
182 synchronized (updateLock) {
183 SelectionKeyImpl ski;
184 while ((ski = updateKeys.pollFirst()) != null) {
185 if (ski.isValid()) {
186 int fd = ski.getFDVal();
187 // add to fdToKey if needed
188 SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
189 assert (previous == null) || (previous == ski);
190
191 int newEvents = ski.translateInterestOps();
192 int registeredEvents = ski.registeredEvents();
193
194 // DatagramChannelImpl::disconnect has reset socket
195 if (ski.getAndClearReset() && registeredEvents != 0) {
|