< prev index next > src/java.base/macosx/classes/sun/nio/ch/KQueueSelectorImpl.java
Print this page
/*
! * Copyright (c) 2011, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
/*
! * Copyright (c) 2011, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
- import jdk.internal.misc.Blocker;
import static sun.nio.ch.KQueue.EVFILT_READ;
import static sun.nio.ch.KQueue.EVFILT_WRITE;
import static sun.nio.ch.KQueue.EV_ADD;
import static sun.nio.ch.KQueue.EV_DELETE;
boolean timedPoll = (to > 0);
int numEntries;
processUpdateQueue();
processDeregisterQueue();
- try {
- begin(blocking);
! do {
! long startTime = timedPoll ? System.nanoTime() : 0;
! boolean attempted = Blocker.begin(blocking);
! try {
numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
! } finally {
! Blocker.end(attempted);
! }
! if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
! // timed poll interrupted so need to adjust timeout
! long adjust = System.nanoTime() - startTime;
! to -= TimeUnit.NANOSECONDS.toMillis(adjust);
! if (to <= 0) {
- // timeout expired so no retry
- numEntries = 0;
}
! }
! } while (numEntries == IOStatus.INTERRUPTED);
! assert IOStatus.check(numEntries);
!
- } finally {
- end(blocking);
}
processDeregisterQueue();
return processEvents(numEntries, action);
}
/**
* Process changes to the interest ops.
*/
private void processUpdateQueue() {
assert Thread.holdsLock(this);
boolean timedPoll = (to > 0);
int numEntries;
processUpdateQueue();
processDeregisterQueue();
! if (Thread.currentThread().isVirtual()) {
! numEntries = (timedPoll)
! ? timedPoll(TimeUnit.MILLISECONDS.toNanos(to))
! : untimedPoll(blocking);
+ } else {
+ try {
+ begin(blocking);
+ do {
+ long startTime = timedPoll ? System.nanoTime() : 0;
numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
! if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
! // timed poll interrupted so need to adjust timeout
! long adjust = System.nanoTime() - startTime;
! to -= TimeUnit.NANOSECONDS.toMillis(adjust);
! if (to <= 0) {
! // timeout expired so no retry
! numEntries = 0;
! }
}
! } while (numEntries == IOStatus.INTERRUPTED);
! } finally {
! end(blocking);
! }
}
+ assert IOStatus.check(numEntries);
+
processDeregisterQueue();
return processEvents(numEntries, action);
}
+ /**
+ * If blocking, parks the current virtual thread until a file descriptor is polled
+ * or the thread is interrupted.
+ */
+ private int untimedPoll(boolean block) throws IOException {
+ int numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
+ if (block) {
+ while (numEntries == 0 && !Thread.currentThread().isInterrupted()) {
+ Poller.pollSelector(kqfd, 0);
+ numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
+ }
+ }
+ return numEntries;
+ }
+
+ /**
+ * Parks the current virtual thread until a file descriptor is polled, or the thread
+ * is interrupted, for up to the specified waiting time.
+ */
+ private int timedPoll(long nanos) throws IOException {
+ long startNanos = System.nanoTime();
+ int numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
+ while (numEntries == 0 && !Thread.currentThread().isInterrupted()) {
+ long remainingNanos = nanos - (System.nanoTime() - startNanos);
+ if (remainingNanos <= 0) {
+ // timeout
+ break;
+ }
+ Poller.pollSelector(kqfd, remainingNanos);
+ numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, 0);
+ }
+ return numEntries;
+ }
+
/**
* Process changes to the interest ops.
*/
private void processUpdateQueue() {
assert Thread.holdsLock(this);
< prev index next >