229 // saturate
230 int parallelism = bean.getParallelism();
231 IntStream.range(0, parallelism).forEach(_ -> executor.submit(busyTask));
232 awaitMountedVirtualThreadCountGte(bean, parallelism);
233
234 // start 5 virtual threads, their tasks will be queued to execute
235 for (int i = 0; i < 5; i++) {
236 executor.submit(() -> { });
237 }
238 assertTrue(bean.getQueuedVirtualThreadCount() >= 5);
239 } finally {
240 done.set(true);
241 }
242 }
243 }
244
245 /**
246 * Waits for pool size >= target to be true.
247 */
248 void awaitPoolSizeGte(VirtualThreadSchedulerMXBean bean, int target) throws InterruptedException {
249 awaitPoolSize(bean, ps -> ps >= target, ">= " + target);
250 }
251
252 /**
253 * Waits for the mounted virtual thread count >= target to be true.
254 */
255 void awaitMountedVirtualThreadCountGte(VirtualThreadSchedulerMXBean bean,
256 long target) throws InterruptedException {
257 awaitMountedVirtualThreadCount(bean, c -> c >= target, ">= " + target);
258 }
259
260 /**
261 * Waits for the mounted virtual thread count <= target to be true.
262 */
263 void awaitMountedVirtualThreadCountLte(VirtualThreadSchedulerMXBean bean,
264 long target) throws InterruptedException {
265 awaitMountedVirtualThreadCount(bean, c -> c <= target, "<= " + target);
266 }
267
268 /**
269 * Waits for the mounted virtual thread count == target to be true.
270 */
271 void awaitMountedVirtualThreadCountEq(VirtualThreadSchedulerMXBean bean,
272 long target) throws InterruptedException {
273 awaitMountedVirtualThreadCount(bean, c -> c == target, "== " + target);
274 }
275
276 /**
277 * Waits until evaluating the given predicte on the pool size is true.
278 */
279 void awaitPoolSize(VirtualThreadSchedulerMXBean bean,
280 IntPredicate predicate,
281 String reason) throws InterruptedException {
282 int poolSize = bean.getPoolSize();
283 if (!predicate.test(poolSize)) {
284 System.err.format("poolSize = %d, await %s ...%n", poolSize, reason);
285 while (!predicate.test(poolSize)) {
286 Thread.sleep(10);
287 poolSize = bean.getPoolSize();
288 }
289 System.err.format("poolSize = %d%n", poolSize);
290 }
291 }
292
293 /**
294 * Waits until evaluating the given predicte on the mounted thread count is true.
295 */
296 void awaitMountedVirtualThreadCount(VirtualThreadSchedulerMXBean bean,
297 LongPredicate predicate,
298 String reason) throws InterruptedException {
299 long count = bean.getMountedVirtualThreadCount();
300 if (!predicate.test(count)) {
301 System.err.format("mountedVirtualThreadCount = %d, await %s ...%n", count, reason);
302 while (!predicate.test(count)) {
303 Thread.sleep(10);
304 count = bean.getMountedVirtualThreadCount();
305 }
306 System.err.format("mountedVirtualThreadCount = %d%n", count);
307 }
308 }
309 }
|
229 // saturate
230 int parallelism = bean.getParallelism();
231 IntStream.range(0, parallelism).forEach(_ -> executor.submit(busyTask));
232 awaitMountedVirtualThreadCountGte(bean, parallelism);
233
234 // start 5 virtual threads, their tasks will be queued to execute
235 for (int i = 0; i < 5; i++) {
236 executor.submit(() -> { });
237 }
238 assertTrue(bean.getQueuedVirtualThreadCount() >= 5);
239 } finally {
240 done.set(true);
241 }
242 }
243 }
244
245 /**
246 * Waits for pool size >= target to be true.
247 */
248 void awaitPoolSizeGte(VirtualThreadSchedulerMXBean bean, int target) throws InterruptedException {
249 awaitPoolSize(bean, ps -> ps >= target, "poolSize >= " + target);
250 }
251
252 /**
253 * Waits for the mounted virtual thread count >= target to be true.
254 */
255 void awaitMountedVirtualThreadCountGte(VirtualThreadSchedulerMXBean bean,
256 int target) throws InterruptedException {
257 awaitMountedVirtualThreadCount(bean, c -> c >= target, "mountedVirtualThreadCount >= " + target);
258 }
259
260 /**
261 * Waits for the mounted virtual thread count <= target to be true.
262 */
263 void awaitMountedVirtualThreadCountLte(VirtualThreadSchedulerMXBean bean,
264 int target) throws InterruptedException {
265 awaitMountedVirtualThreadCount(bean, c -> c <= target, "mountedVirtualThreadCount <= " + target);
266 }
267
268 /**
269 * Waits for the mounted virtual thread count == target to be true.
270 */
271 void awaitMountedVirtualThreadCountEq(VirtualThreadSchedulerMXBean bean,
272 int target) throws InterruptedException {
273 awaitMountedVirtualThreadCount(bean, c -> c == target, "mountedVirtualThreadCount == " + target);
274 }
275
276 /**
277 * Waits until evaluating the given predicte on the pool size is true.
278 */
279 void awaitPoolSize(VirtualThreadSchedulerMXBean bean,
280 IntPredicate predicate,
281 String reason) throws InterruptedException {
282 int poolSize = bean.getPoolSize();
283 if (!predicate.test(poolSize)) {
284 System.err.format("poolSize = %d, await %s ...%n", poolSize, reason);
285 while (!predicate.test(poolSize)) {
286 Thread.sleep(10);
287 poolSize = bean.getPoolSize();
288 }
289 System.err.format("poolSize = %d%n", poolSize);
290 }
291 }
292
293 /**
294 * Waits until evaluating the given predicte on the mounted virtual thread count is true.
295 */
296 void awaitMountedVirtualThreadCount(VirtualThreadSchedulerMXBean bean,
297 IntPredicate predicate,
298 String reason) throws InterruptedException {
299 int count = bean.getMountedVirtualThreadCount();
300 if (!predicate.test(count)) {
301 System.err.format("mountedVirtualThreadCount = %d, await %s ...%n", count, reason);
302 while (!predicate.test(count)) {
303 Thread.sleep(10);
304 count = bean.getMountedVirtualThreadCount();
305 }
306 System.err.format("mountedVirtualThreadCount = %d%n", count);
307 }
308 }
309
310 /**
311 * Waits until evaluating the given predicte on the queue virtual thread count is true.
312 */
313 void awaitQueuedVirtualThreadCount(VirtualThreadSchedulerMXBean bean,
314 LongPredicate predicate,
315 String reason) throws InterruptedException {
316 long count = bean.getQueuedVirtualThreadCount();
317 if (!predicate.test(count)) {
318 System.err.format("queuedVirtualThreadCount = %d, await %s ...%n", count, reason);
319 while (!predicate.test(count)) {
320 Thread.sleep(10);
321 count = bean.getQueuedVirtualThreadCount();
322 }
323 System.err.format("queuedVirtualThreadCount = %d%n", count);
324 }
325 }
326
327 /**
328 * Waits until there are no mounted virtual threads and no virtual threads queued to
329 * the scheduler.
330 */
331 void awaitQuiescence(VirtualThreadSchedulerMXBean bean) throws InterruptedException {
332 awaitQueuedVirtualThreadCount(bean, c -> c == 0, "queuedVirtualThreadCount == 0");
333 awaitMountedVirtualThreadCount(bean, c -> c == 0L, "mountedVirtualThreadCount == 0");
334 }
335 }
|