1 /*
2 * Copyright (c) 2011, 2024, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package sun.nio.ch;
27
28 import java.io.IOException;
29 import java.nio.channels.ClosedSelectorException;
30 import java.nio.channels.SelectionKey;
31 import java.nio.channels.Selector;
32 import java.nio.channels.spi.SelectorProvider;
33 import java.util.ArrayDeque;
34 import java.util.Deque;
35 import java.util.HashMap;
36 import java.util.Map;
37 import java.util.concurrent.TimeUnit;
38 import java.util.function.Consumer;
39 import jdk.internal.misc.Blocker;
40
41 import static sun.nio.ch.KQueue.EVFILT_READ;
42 import static sun.nio.ch.KQueue.EVFILT_WRITE;
43 import static sun.nio.ch.KQueue.EV_ADD;
44 import static sun.nio.ch.KQueue.EV_DELETE;
45
46 /**
47 * KQueue based Selector implementation for macOS
48 */
49
50 class KQueueSelectorImpl extends SelectorImpl {
51
52 // maximum number of events to poll in one call to kqueue
53 private static final int MAX_KEVENTS = 256;
54
55 // kqueue file descriptor
56 private final int kqfd;
57
58 // address of poll array (event list) when polling for pending events
59 private final long pollArrayAddress;
98 }
99
100 private void ensureOpen() {
101 if (!isOpen())
102 throw new ClosedSelectorException();
103 }
104
105 @Override
106 protected int doSelect(Consumer<SelectionKey> action, long timeout)
107 throws IOException
108 {
109 assert Thread.holdsLock(this);
110
111 long to = Math.min(timeout, Integer.MAX_VALUE); // max kqueue timeout
112 boolean blocking = (to != 0);
113 boolean timedPoll = (to > 0);
114
115 int numEntries;
116 processUpdateQueue();
117 processDeregisterQueue();
118 try {
119 begin(blocking);
120
121 do {
122 long startTime = timedPoll ? System.nanoTime() : 0;
123 boolean attempted = Blocker.begin(blocking);
124 try {
125 numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
126 } finally {
127 Blocker.end(attempted);
128 }
129 if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
130 // timed poll interrupted so need to adjust timeout
131 long adjust = System.nanoTime() - startTime;
132 to -= TimeUnit.NANOSECONDS.toMillis(adjust);
133 if (to <= 0) {
134 // timeout expired so no retry
135 numEntries = 0;
136 }
137 }
138 } while (numEntries == IOStatus.INTERRUPTED);
139 assert IOStatus.check(numEntries);
140
141 } finally {
142 end(blocking);
143 }
144 processDeregisterQueue();
145 return processEvents(numEntries, action);
146 }
147
148 /**
149 * Process changes to the interest ops.
150 */
151 private void processUpdateQueue() {
152 assert Thread.holdsLock(this);
153
154 synchronized (updateLock) {
155 SelectionKeyImpl ski;
156 while ((ski = updateKeys.pollFirst()) != null) {
157 if (ski.isValid()) {
158 int fd = ski.getFDVal();
159 // add to fdToKey if needed
160 SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
161 assert (previous == null) || (previous == ski);
162
163 int newEvents = ski.translateInterestOps();
164 int registeredEvents = ski.registeredEvents();
165
166 // DatagramChannelImpl::disconnect has reset socket
167 if (ski.getAndClearReset() && registeredEvents != 0) {
|
1 /*
2 * Copyright (c) 2011, 2022, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package sun.nio.ch;
27
28 import java.io.IOException;
29 import java.nio.channels.ClosedSelectorException;
30 import java.nio.channels.SelectionKey;
31 import java.nio.channels.Selector;
32 import java.nio.channels.spi.SelectorProvider;
33 import java.util.ArrayDeque;
34 import java.util.Deque;
35 import java.util.HashMap;
36 import java.util.Map;
37 import java.util.concurrent.TimeUnit;
38 import java.util.function.Consumer;
39
40 import static sun.nio.ch.KQueue.EVFILT_READ;
41 import static sun.nio.ch.KQueue.EVFILT_WRITE;
42 import static sun.nio.ch.KQueue.EV_ADD;
43 import static sun.nio.ch.KQueue.EV_DELETE;
44
45 /**
46 * KQueue based Selector implementation for macOS
47 */
48
49 class KQueueSelectorImpl extends SelectorImpl {
50
51 // maximum number of events to poll in one call to kqueue
52 private static final int MAX_KEVENTS = 256;
53
54 // kqueue file descriptor
55 private final int kqfd;
56
57 // address of poll array (event list) when polling for pending events
58 private final long pollArrayAddress;
97 }
98
99 private void ensureOpen() {
100 if (!isOpen())
101 throw new ClosedSelectorException();
102 }
103
104 @Override
105 protected int doSelect(Consumer<SelectionKey> action, long timeout)
106 throws IOException
107 {
108 assert Thread.holdsLock(this);
109
110 long to = Math.min(timeout, Integer.MAX_VALUE); // max kqueue timeout
111 boolean blocking = (to != 0);
112 boolean timedPoll = (to > 0);
113
114 int numEntries;
115 processUpdateQueue();
116 processDeregisterQueue();
117
118 if (Thread.currentThread().isVirtual()) {
119 numEntries = (timedPoll)
120 ? timedPoll(TimeUnit.MILLISECONDS.toNanos(to))
121 : untimedPoll(blocking);
122 } else {
123 try {
124 begin(blocking);
125 do {
126 long startTime = timedPoll ? System.nanoTime() : 0;
127 numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
128 if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
129 // timed poll interrupted so need to adjust timeout
130 long adjust = System.nanoTime() - startTime;
131 to -= TimeUnit.NANOSECONDS.toMillis(adjust);
132 if (to <= 0) {
133 // timeout expired so no retry
134 numEntries = 0;
135 }
136 }
137 } while (numEntries == IOStatus.INTERRUPTED);
138 } finally {
139 end(blocking);
140 }
141 }
142 assert IOStatus.check(numEntries);
143
144 processDeregisterQueue();
145 return processEvents(numEntries, action);
146 }
147
148 /**
149 * If blocking, parks the current virtual thread until a file descriptor is polled
150 * or the thread is interrupted.
151 */
152 private int untimedPoll(boolean block) throws IOException {
153 int numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
154 if (block) {
155 while (numEntries == 0 && !Thread.currentThread().isInterrupted()) {
156 Poller.pollSelector(kqfd, 0);
157 numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
158 }
159 }
160 return numEntries;
161 }
162
163 /**
164 * Parks the current virtual thread until a file descriptor is polled, or the thread
165 * is interrupted, for up to the specified waiting time.
166 */
167 private int timedPoll(long nanos) throws IOException {
168 long startNanos = System.nanoTime();
169 int numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
170 while (numEntries == 0 && !Thread.currentThread().isInterrupted()) {
171 long remainingNanos = nanos - (System.nanoTime() - startNanos);
172 if (remainingNanos <= 0) {
173 // timeout
174 break;
175 }
176 Poller.pollSelector(kqfd, remainingNanos);
177 numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
178 }
179 return numEntries;
180 }
181
182 /**
183 * Process changes to the interest ops.
184 */
185 private void processUpdateQueue() {
186 assert Thread.holdsLock(this);
187
188 synchronized (updateLock) {
189 SelectionKeyImpl ski;
190 while ((ski = updateKeys.pollFirst()) != null) {
191 if (ski.isValid()) {
192 int fd = ski.getFDVal();
193 // add to fdToKey if needed
194 SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
195 assert (previous == null) || (previous == ski);
196
197 int newEvents = ski.translateInterestOps();
198 int registeredEvents = ski.registeredEvents();
199
200 // DatagramChannelImpl::disconnect has reset socket
201 if (ski.getAndClearReset() && registeredEvents != 0) {
|