1 /*
2 * Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /*
25 * @test
26 * @bug 8338890
27 * @summary Basic test for jdk.management.VirtualThreadSchedulerMXBean
28 * @requires vm.continuations
29 * @modules jdk.management
30 * @library /test/lib
31 * @run junit/othervm VirtualThreadSchedulerMXBeanTest
32 */
33
34 import java.lang.management.ManagementFactory;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.util.function.IntPredicate;
38 import java.util.function.LongPredicate;
39 import java.util.stream.Stream;
40 import java.util.stream.IntStream;
41 import javax.management.MBeanServer;
42 import jdk.management.VirtualThreadSchedulerMXBean;
43
44 import jdk.test.lib.thread.VThreadRunner;
45 import org.junit.jupiter.api.Test;
46 import org.junit.jupiter.params.ParameterizedTest;
47 import org.junit.jupiter.params.provider.MethodSource;
48 import static org.junit.jupiter.api.Assertions.*;
49 import static org.junit.jupiter.api.Assumptions.*;
50
51 class VirtualThreadSchedulerMXBeanTest {
52
53 /**
54 * VirtualThreadSchedulerMXBean objects to test.
55 */
56 private static Stream<VirtualThreadSchedulerMXBean> managedBeans() throws Exception {
57 var bean1 = ManagementFactory.getPlatformMXBean(VirtualThreadSchedulerMXBean.class);
58
59 MBeanServer server = ManagementFactory.getPlatformMBeanServer();
60 var bean2 = ManagementFactory.newPlatformMXBeanProxy(server,
61 "jdk.management:type=VirtualThreadScheduler",
62 VirtualThreadSchedulerMXBean.class);
63
64 return Stream.of(bean1, bean2);
65 }
66
67 /**
68 * Test default parallelism.
69 */
70 @ParameterizedTest
71 @MethodSource("managedBeans")
72 void testDefaultParallelism(VirtualThreadSchedulerMXBean bean) {
73 assertEquals(Runtime.getRuntime().availableProcessors(), bean.getParallelism());
74 }
75
76 /**
77 * Test increasing parallelism.
78 */
79 @ParameterizedTest
80 @MethodSource("managedBeans")
81 void testIncreaseParallelism(VirtualThreadSchedulerMXBean bean) throws Exception {
82 assumeFalse(Thread.currentThread().isVirtual(), "Main thread is a virtual thread");
83
84 final int parallelism = bean.getParallelism();
85 try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
86 var done = new AtomicBoolean();
87 Runnable busyTask = () -> {
88 while (!done.get()) {
89 Thread.onSpinWait();
90 }
91 };
92
93 try {
94 // saturate
95 IntStream.range(0, parallelism).forEach(_ -> executor.submit(busyTask));
96 awaitPoolSizeGte(bean, parallelism);
97 awaitMountedVirtualThreadCountGte(bean, parallelism);
98
99 // increase parallelism
100 for (int k = 1; k <= 4; k++) {
101 int newParallelism = parallelism + k;
102 bean.setParallelism(newParallelism);
103 executor.submit(busyTask);
104
105 // pool size and mounted virtual thread should increase
106 awaitPoolSizeGte(bean, newParallelism);
107 awaitMountedVirtualThreadCountGte(bean, newParallelism);
108 }
109 } finally {
110 done.set(true);
111 }
112 } finally {
113 bean.setParallelism(parallelism); // restore
114 }
115 }
116
117 /**
118 * Test reducing parallelism.
119 */
120 @ParameterizedTest
121 @MethodSource("managedBeans")
122 void testReduceParallelism(VirtualThreadSchedulerMXBean bean) throws Exception {
123 assumeFalse(Thread.currentThread().isVirtual(), "Main thread is a virtual thread");
124
125 final int parallelism = bean.getParallelism();
126 try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
127 var done = new AtomicBoolean();
128 var sleep = new AtomicBoolean();
129
130 // spin when !sleep
131 Runnable busyTask = () -> {
132 while (!done.get()) {
133 if (sleep.get()) {
134 try {
135 Thread.sleep(10);
136 } catch (InterruptedException e) { }
137 } else {
138 Thread.onSpinWait();
139 }
140 }
141 };
142
143 try {
144 // increase parallelism + saturate
145 int highParallelism = parallelism + 4;
146 bean.setParallelism(highParallelism);
147 IntStream.range(0, highParallelism).forEach(_ -> executor.submit(busyTask));
148
149 // mounted virtual thread count should increase to highParallelism.
150 // Sample the count at highParallelism a few times.
151 for (int i = 0; i < 5; i++) {
152 Thread.sleep(100);
153 awaitMountedVirtualThreadCountEq(bean, highParallelism);
154 }
155
156 // reduce parallelism and workload
157 int lowParallelism = Math.clamp(parallelism / 2, 1, parallelism);
158 bean.setParallelism(lowParallelism);
159 sleep.set(true);
160
161 // mounted virtual thread count should reduce to lowParallelism or less.
162 // Sample the count at lowParallelism or less a few times.
163 for (int i = 0; i < 5; i++) {
164 Thread.sleep(100);
165 awaitMountedVirtualThreadCountLte(bean, lowParallelism);
166 }
167
168 // increase workload
169 sleep.set(false);
170
171 // mounted virtual thread count should not exceed lowParallelism.
172 // Sample the count at lowParallelism a few times.
173 for (int i = 0; i < 5; i++) {
174 Thread.sleep(100);
175 awaitMountedVirtualThreadCountEq(bean, lowParallelism);
176 }
177
178 } finally {
179 done.set(true);
180 }
181 } finally {
182 bean.setParallelism(parallelism); // restore
183 }
184 }
185
186 /**
187 * Test getPoolSize.
188 */
189 @ParameterizedTest
190 @MethodSource("managedBeans")
191 void testPoolSize(VirtualThreadSchedulerMXBean bean) {
192 assertTrue(bean.getPoolSize() >= 0);
193 VThreadRunner.run(() -> {
194 assertTrue(Thread.currentThread().isVirtual());
195 assertTrue(bean.getPoolSize() >= 1);
196 });
197 }
198
199 /**
200 * Test getMountedVirtualThreadCount.
201 */
202 @ParameterizedTest
203 @MethodSource("managedBeans")
204 void testMountedVirtualThreadCount(VirtualThreadSchedulerMXBean bean) {
205 assertTrue(bean.getMountedVirtualThreadCount() >= 0);
206 VThreadRunner.run(() -> {
207 assertTrue(Thread.currentThread().isVirtual());
208 assertTrue(bean.getMountedVirtualThreadCount() >= 1);
209 });
210 }
211
212 /**
213 * Test getQueuedVirtualThreadCount.
214 */
215 @ParameterizedTest
216 @MethodSource("managedBeans")
217 void testQueuedVirtualThreadCount(VirtualThreadSchedulerMXBean bean) throws Exception {
218 assumeFalse(Thread.currentThread().isVirtual(), "Main thread is a virtual thread");
219
220 try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
221 var done = new AtomicBoolean();
222 Runnable busyTask = () -> {
223 while (!done.get()) {
224 Thread.onSpinWait();
225 }
226 };
227
228 try {
229 // saturate
230 int parallelism = bean.getParallelism();
231 IntStream.range(0, parallelism).forEach(_ -> executor.submit(busyTask));
232 awaitMountedVirtualThreadCountGte(bean, parallelism);
233
234 // start 5 virtual threads, their tasks will be queued to execute
235 for (int i = 0; i < 5; i++) {
236 executor.submit(() -> { });
237 }
238 assertTrue(bean.getQueuedVirtualThreadCount() >= 5);
239 } finally {
240 done.set(true);
241 }
242 }
243 }
244
245 /**
246 * Waits for pool size >= target to be true.
247 */
248 void awaitPoolSizeGte(VirtualThreadSchedulerMXBean bean, int target) throws InterruptedException {
249 awaitPoolSize(bean, ps -> ps >= target, "poolSize >= " + target);
250 }
251
252 /**
253 * Waits for the mounted virtual thread count >= target to be true.
254 */
255 void awaitMountedVirtualThreadCountGte(VirtualThreadSchedulerMXBean bean,
256 int target) throws InterruptedException {
257 awaitMountedVirtualThreadCount(bean, c -> c >= target, "mountedVirtualThreadCount >= " + target);
258 }
259
260 /**
261 * Waits for the mounted virtual thread count <= target to be true.
262 */
263 void awaitMountedVirtualThreadCountLte(VirtualThreadSchedulerMXBean bean,
264 int target) throws InterruptedException {
265 awaitMountedVirtualThreadCount(bean, c -> c <= target, "mountedVirtualThreadCount <= " + target);
266 }
267
268 /**
269 * Waits for the mounted virtual thread count == target to be true.
270 */
271 void awaitMountedVirtualThreadCountEq(VirtualThreadSchedulerMXBean bean,
272 int target) throws InterruptedException {
273 awaitMountedVirtualThreadCount(bean, c -> c == target, "mountedVirtualThreadCount == " + target);
274 }
275
276 /**
277 * Waits until evaluating the given predicte on the pool size is true.
278 */
279 void awaitPoolSize(VirtualThreadSchedulerMXBean bean,
280 IntPredicate predicate,
281 String reason) throws InterruptedException {
282 int poolSize = bean.getPoolSize();
283 if (!predicate.test(poolSize)) {
284 System.err.format("poolSize = %d, await %s ...%n", poolSize, reason);
285 while (!predicate.test(poolSize)) {
286 Thread.sleep(10);
287 poolSize = bean.getPoolSize();
288 }
289 System.err.format("poolSize = %d%n", poolSize);
290 }
291 }
292
293 /**
294 * Waits until evaluating the given predicte on the mounted virtual thread count is true.
295 */
296 void awaitMountedVirtualThreadCount(VirtualThreadSchedulerMXBean bean,
297 IntPredicate predicate,
298 String reason) throws InterruptedException {
299 int count = bean.getMountedVirtualThreadCount();
300 if (!predicate.test(count)) {
301 System.err.format("mountedVirtualThreadCount = %d, await %s ...%n", count, reason);
302 while (!predicate.test(count)) {
303 Thread.sleep(10);
304 count = bean.getMountedVirtualThreadCount();
305 }
306 System.err.format("mountedVirtualThreadCount = %d%n", count);
307 }
308 }
309
310 /**
311 * Waits until evaluating the given predicte on the queue virtual thread count is true.
312 */
313 void awaitQueuedVirtualThreadCount(VirtualThreadSchedulerMXBean bean,
314 LongPredicate predicate,
315 String reason) throws InterruptedException {
316 long count = bean.getQueuedVirtualThreadCount();
317 if (!predicate.test(count)) {
318 System.err.format("queuedVirtualThreadCount = %d, await %s ...%n", count, reason);
319 while (!predicate.test(count)) {
320 Thread.sleep(10);
321 count = bean.getQueuedVirtualThreadCount();
322 }
323 System.err.format("queuedVirtualThreadCount = %d%n", count);
324 }
325 }
326
327 /**
328 * Waits until there are no mounted virtual threads and no virtual threads queued to
329 * the scheduler.
330 */
331 void awaitQuiescence(VirtualThreadSchedulerMXBean bean) throws InterruptedException {
332 awaitQueuedVirtualThreadCount(bean, c -> c == 0, "queuedVirtualThreadCount == 0");
333 awaitMountedVirtualThreadCount(bean, c -> c == 0L, "mountedVirtualThreadCount == 0");
334 }
335 }