125 /**
126 * Parks the current thread until a file descriptor is ready for the given op.
127 * @param fdVal the file descriptor
128 * @param event POLLIN or POLLOUT
129 * @param nanos the waiting time or 0 to wait indefinitely
130 * @param supplier supplies a boolean to indicate if the enclosing object is open
131 */
132 static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier)
133 throws IOException
134 {
135 assert nanos >= 0L;
136 if (event == Net.POLLIN) {
137 POLLERS.readPoller(fdVal).poll(fdVal, nanos, supplier);
138 } else if (event == Net.POLLOUT) {
139 POLLERS.writePoller(fdVal).poll(fdVal, nanos, supplier);
140 } else {
141 assert false;
142 }
143 }
144
145 /**
146 * If there is a thread polling the given file descriptor for the given event then
147 * the thread is unparked.
148 */
149 static void stopPoll(int fdVal, int event) {
150 if (event == Net.POLLIN) {
151 POLLERS.readPoller(fdVal).wakeup(fdVal);
152 } else if (event == Net.POLLOUT) {
153 POLLERS.writePoller(fdVal).wakeup(fdVal);
154 } else {
155 throw new IllegalArgumentException();
156 }
157 }
158
159 /**
160 * If there are any threads polling the given file descriptor then they are unparked.
161 */
162 static void stopPoll(int fdVal) {
163 stopPoll(fdVal, Net.POLLIN);
164 stopPoll(fdVal, Net.POLLOUT);
322 .name("SubPoller-", 0)
323 .uncaughtExceptionHandler((t, e) -> e.printStackTrace())
324 .factory();
325 executor = Executors.newThreadPerTaskExecutor(factory);
326 Arrays.stream(readPollers).forEach(p -> {
327 executor.execute(() -> p.subPollerLoop(masterPoller));
328 });
329 Arrays.stream(writePollers).forEach(p -> {
330 executor.execute(() -> p.subPollerLoop(masterPoller));
331 });
332 } else {
333 Arrays.stream(readPollers).forEach(p -> {
334 startPlatformThread("Read-Poller", p::pollerLoop);
335 });
336 Arrays.stream(writePollers).forEach(p -> {
337 startPlatformThread("Write-Poller", p::pollerLoop);
338 });
339 }
340 }
341
342 /**
343 * Returns the read poller for the given file descriptor.
344 */
345 Poller readPoller(int fdVal) {
346 int index = provider.fdValToIndex(fdVal, readPollers.length);
347 return readPollers[index];
348 }
349
350 /**
351 * Returns the write poller for the given file descriptor.
352 */
353 Poller writePoller(int fdVal) {
354 int index = provider.fdValToIndex(fdVal, writePollers.length);
355 return writePollers[index];
356 }
357
358 /**
359 * Reads the given property name to get the poller count. If the property is
360 * set then the value must be a power of 2. Returns 1 if the property is not
361 * set.
|
125 /**
126 * Parks the current thread until a file descriptor is ready for the given op.
127 * @param fdVal the file descriptor
128 * @param event POLLIN or POLLOUT
129 * @param nanos the waiting time or 0 to wait indefinitely
130 * @param supplier supplies a boolean to indicate if the enclosing object is open
131 */
132 static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier)
133 throws IOException
134 {
135 assert nanos >= 0L;
136 if (event == Net.POLLIN) {
137 POLLERS.readPoller(fdVal).poll(fdVal, nanos, supplier);
138 } else if (event == Net.POLLOUT) {
139 POLLERS.writePoller(fdVal).poll(fdVal, nanos, supplier);
140 } else {
141 assert false;
142 }
143 }
144
145 /**
146 * Parks the current thread until a Selector's file descriptor is ready.
147 * @param fdVal the Selector's file descriptor
148 * @param nanos the waiting time or 0 to wait indefinitely
149 */
150 static void pollSelector(int fdVal, long nanos) throws IOException {
151 assert nanos >= 0L;
152 Poller poller = POLLERS.masterPoller();
153 if (poller == null) {
154 poller = POLLERS.readPoller(fdVal);
155 }
156 poller.poll(fdVal, nanos, () -> true);
157 }
158
159 /**
160 * If there is a thread polling the given file descriptor for the given event then
161 * the thread is unparked.
162 */
163 static void stopPoll(int fdVal, int event) {
164 if (event == Net.POLLIN) {
165 POLLERS.readPoller(fdVal).wakeup(fdVal);
166 } else if (event == Net.POLLOUT) {
167 POLLERS.writePoller(fdVal).wakeup(fdVal);
168 } else {
169 throw new IllegalArgumentException();
170 }
171 }
172
173 /**
174 * If there are any threads polling the given file descriptor then they are unparked.
175 */
176 static void stopPoll(int fdVal) {
177 stopPoll(fdVal, Net.POLLIN);
178 stopPoll(fdVal, Net.POLLOUT);
336 .name("SubPoller-", 0)
337 .uncaughtExceptionHandler((t, e) -> e.printStackTrace())
338 .factory();
339 executor = Executors.newThreadPerTaskExecutor(factory);
340 Arrays.stream(readPollers).forEach(p -> {
341 executor.execute(() -> p.subPollerLoop(masterPoller));
342 });
343 Arrays.stream(writePollers).forEach(p -> {
344 executor.execute(() -> p.subPollerLoop(masterPoller));
345 });
346 } else {
347 Arrays.stream(readPollers).forEach(p -> {
348 startPlatformThread("Read-Poller", p::pollerLoop);
349 });
350 Arrays.stream(writePollers).forEach(p -> {
351 startPlatformThread("Write-Poller", p::pollerLoop);
352 });
353 }
354 }
355
356 /**
357 * Returns the master poller, or null if there is no master poller.
358 */
359 Poller masterPoller() {
360 return masterPoller;
361 }
362
363 /**
364 * Returns the read poller for the given file descriptor.
365 */
366 Poller readPoller(int fdVal) {
367 int index = provider.fdValToIndex(fdVal, readPollers.length);
368 return readPollers[index];
369 }
370
371 /**
372 * Returns the write poller for the given file descriptor.
373 */
374 Poller writePoller(int fdVal) {
375 int index = provider.fdValToIndex(fdVal, writePollers.length);
376 return writePollers[index];
377 }
378
379 /**
380 * Reads the given property name to get the poller count. If the property is
381 * set then the value must be a power of 2. Returns 1 if the property is not
382 * set.
|