< prev index next > src/java.base/linux/classes/sun/nio/ch/EPollSelectorImpl.java
Print this page
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
- import jdk.internal.misc.Blocker;
import static sun.nio.ch.EPoll.EPOLLIN;
import static sun.nio.ch.EPoll.EPOLL_CTL_ADD;
import static sun.nio.ch.EPoll.EPOLL_CTL_DEL;
import static sun.nio.ch.EPoll.EPOLL_CTL_MOD;
boolean timedPoll = (to > 0);
int numEntries;
processUpdateQueue();
processDeregisterQueue();
- try {
- begin(blocking);
! do {
! long startTime = timedPoll ? System.nanoTime() : 0;
! boolean attempted = Blocker.begin(blocking);
! try {
numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to);
! } finally {
! Blocker.end(attempted);
! }
! if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
! // timed poll interrupted so need to adjust timeout
! long adjust = System.nanoTime() - startTime;
! to -= (int) TimeUnit.NANOSECONDS.toMillis(adjust);
! if (to <= 0) {
- // timeout expired so no retry
- numEntries = 0;
}
! }
! } while (numEntries == IOStatus.INTERRUPTED);
! assert IOStatus.check(numEntries);
!
- } finally {
- end(blocking);
}
processDeregisterQueue();
return processEvents(numEntries, action);
}
/**
* Process changes to the interest ops.
*/
private void processUpdateQueue() {
assert Thread.holdsLock(this);
boolean timedPoll = (to > 0);
int numEntries;
processUpdateQueue();
processDeregisterQueue();
! if (Thread.currentThread().isVirtual()) {
! numEntries = (timedPoll)
! ? timedPoll(TimeUnit.MILLISECONDS.toNanos(to))
! : untimedPoll(blocking);
+ } else {
+ try {
+ begin(blocking);
+ do {
+ long startTime = timedPoll ? System.nanoTime() : 0;
numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to);
! if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
! // timed poll interrupted so need to adjust timeout
! long adjust = System.nanoTime() - startTime;
! to -= (int) TimeUnit.NANOSECONDS.toMillis(adjust);
! if (to <= 0) {
! // timeout expired so no retry
! numEntries = 0;
! }
}
! } while (numEntries == IOStatus.INTERRUPTED);
! } finally {
! end(blocking);
! }
}
+ assert IOStatus.check(numEntries);
+
processDeregisterQueue();
return processEvents(numEntries, action);
}
+ /**
+ * If blocking, parks the current virtual thread until a file descriptor is polled
+ * or the thread is interrupted.
+ */
+ private int untimedPoll(boolean block) throws IOException {
+ int numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, 0);
+ if (block) {
+ while (numEntries == 0 && !Thread.currentThread().isInterrupted()) {
+ Poller.pollSelector(epfd, 0);
+ numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, 0);
+ }
+ }
+ return numEntries;
+ }
+
+ /**
+ * Parks the current virtual thread until a file descriptor is polled, or the thread
+ * is interrupted, for up to the specified waiting time.
+ */
+ private int timedPoll(long nanos) throws IOException {
+ long startNanos = System.nanoTime();
+ int numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, 0);
+ while (numEntries == 0 && !Thread.currentThread().isInterrupted()) {
+ long remainingNanos = nanos - (System.nanoTime() - startNanos);
+ if (remainingNanos <= 0) {
+ // timeout
+ break;
+ }
+ Poller.pollSelector(epfd, remainingNanos);
+ numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, 0);
+ }
+ return numEntries;
+ }
+
/**
* Process changes to the interest ops.
*/
private void processUpdateQueue() {
assert Thread.holdsLock(this);
< prev index next >