1 /*
  2  * Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test
 26  * @bug 8338890
 27  * @summary Basic test for jdk.management.VirtualThreadSchedulerMXBean
 28  * @requires vm.continuations
 29  * @modules jdk.management
 30  * @library /test/lib
 31  * @run junit/othervm VirtualThreadSchedulerMXBeanTest
 32  */
 33 
 34 import java.lang.management.ManagementFactory;
 35 import java.util.concurrent.Executors;
 36 import java.util.concurrent.atomic.AtomicBoolean;
 37 import java.util.function.IntPredicate;
 38 import java.util.function.LongPredicate;
 39 import java.util.stream.Stream;
 40 import java.util.stream.IntStream;
 41 import javax.management.MBeanServer;
 42 import jdk.management.VirtualThreadSchedulerMXBean;
 43 
 44 import jdk.test.lib.thread.VThreadRunner;
 45 import org.junit.jupiter.api.Test;
 46 import org.junit.jupiter.params.ParameterizedTest;
 47 import org.junit.jupiter.params.provider.MethodSource;
 48 import static org.junit.jupiter.api.Assertions.*;
 49 import static org.junit.jupiter.api.Assumptions.*;
 50 
 51 class VirtualThreadSchedulerMXBeanTest {
 52 
 53     /**
 54      * VirtualThreadSchedulerMXBean objects to test.
 55      */
 56     private static Stream<VirtualThreadSchedulerMXBean> managedBeans() throws Exception {
 57         var bean1 = ManagementFactory.getPlatformMXBean(VirtualThreadSchedulerMXBean.class);
 58 
 59         MBeanServer server = ManagementFactory.getPlatformMBeanServer();
 60         var bean2 = ManagementFactory.newPlatformMXBeanProxy(server,
 61                 "jdk.management:type=VirtualThreadScheduler",
 62                 VirtualThreadSchedulerMXBean.class);
 63 
 64         return Stream.of(bean1, bean2);
 65     }
 66 
 67     /**
 68      * Test default parallelism.
 69      */
 70     @ParameterizedTest
 71     @MethodSource("managedBeans")
 72     void testDefaultParallelism(VirtualThreadSchedulerMXBean bean) {
 73         assertEquals(Runtime.getRuntime().availableProcessors(), bean.getParallelism());
 74     }
 75 
 76     /**
 77      * Test increasing parallelism.
 78      */
 79     @ParameterizedTest
 80     @MethodSource("managedBeans")
 81     void testIncreaseParallelism(VirtualThreadSchedulerMXBean bean) throws Exception {
 82         assumeFalse(Thread.currentThread().isVirtual(), "Main thread is a virtual thread");
 83 
 84         final int parallelism = bean.getParallelism();
 85         try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
 86             var done = new AtomicBoolean();
 87             Runnable busyTask = () -> {
 88                 while (!done.get()) {
 89                     Thread.onSpinWait();
 90                 }
 91             };
 92 
 93             try {
 94                 // saturate
 95                 IntStream.range(0, parallelism).forEach(_ -> executor.submit(busyTask));
 96                 awaitPoolSizeGte(bean, parallelism);
 97                 awaitMountedVirtualThreadCountGte(bean, parallelism);
 98 
 99                 // increase parallelism
100                 for (int k = 1; k <= 4; k++) {
101                     int newParallelism = parallelism + k;
102                     bean.setParallelism(newParallelism);
103                     executor.submit(busyTask);
104 
105                     // pool size and mounted virtual thread should increase
106                     awaitPoolSizeGte(bean, newParallelism);
107                     awaitMountedVirtualThreadCountGte(bean, newParallelism);
108                 }
109             } finally {
110                 done.set(true);
111             }
112         } finally {
113             bean.setParallelism(parallelism);   // restore
114         }
115     }
116 
117     /**
118      * Test reducing parallelism.
119      */
120     @ParameterizedTest
121     @MethodSource("managedBeans")
122     void testReduceParallelism(VirtualThreadSchedulerMXBean bean) throws Exception {
123         assumeFalse(Thread.currentThread().isVirtual(), "Main thread is a virtual thread");
124 
125         final int parallelism = bean.getParallelism();
126         try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
127             var done = new AtomicBoolean();
128             var sleep = new AtomicBoolean();
129 
130             // spin when !sleep
131             Runnable busyTask = () -> {
132                 while (!done.get()) {
133                     if (sleep.get()) {
134                         try {
135                             Thread.sleep(10);
136                         } catch (InterruptedException e) { }
137                     } else {
138                         Thread.onSpinWait();
139                     }
140                 }
141             };
142 
143             try {
144                 // increase parallelism + saturate
145                 int highParallelism = parallelism + 4;
146                 bean.setParallelism(highParallelism);
147                 IntStream.range(0, highParallelism).forEach(_ -> executor.submit(busyTask));
148 
149                 // mounted virtual thread count should increase to highParallelism.
150                 // Sample the count at highParallelism a few times.
151                 for (int i = 0; i < 5; i++) {
152                     Thread.sleep(100);
153                     awaitMountedVirtualThreadCountEq(bean, highParallelism);
154                 }
155 
156                 // reduce parallelism and workload
157                 int lowParallelism = Math.clamp(parallelism / 2, 1, parallelism);
158                 bean.setParallelism(lowParallelism);
159                 sleep.set(true);
160 
161                 // mounted virtual thread count should reduce to lowParallelism or less.
162                 // Sample the count at lowParallelism or less a few times.
163                 for (int i = 0; i < 5; i++) {
164                     Thread.sleep(100);
165                     awaitMountedVirtualThreadCountLte(bean, lowParallelism);
166                 }
167 
168                 // increase workload
169                 sleep.set(false);
170 
171                 // mounted virtual thread count should not exceed lowParallelism.
172                 // Sample the count at lowParallelism a few times.
173                 for (int i = 0; i < 5; i++) {
174                     Thread.sleep(100);
175                     awaitMountedVirtualThreadCountEq(bean, lowParallelism);
176                 }
177 
178             } finally {
179                 done.set(true);
180             }
181         } finally {
182             bean.setParallelism(parallelism);  // restore
183         }
184     }
185 
186     /**
187      * Test getPoolSize.
188      */
189     @ParameterizedTest
190     @MethodSource("managedBeans")
191     void testPoolSize(VirtualThreadSchedulerMXBean bean) {
192         assertTrue(bean.getPoolSize() >= 0);
193         VThreadRunner.run(() -> {
194             assertTrue(Thread.currentThread().isVirtual());
195             assertTrue(bean.getPoolSize() >= 1);
196         });
197     }
198 
199     /**
200      * Test getMountedVirtualThreadCount.
201      */
202     @ParameterizedTest
203     @MethodSource("managedBeans")
204     void testMountedVirtualThreadCount(VirtualThreadSchedulerMXBean bean) {
205         assertTrue(bean.getMountedVirtualThreadCount() >= 0);
206         VThreadRunner.run(() -> {
207             assertTrue(Thread.currentThread().isVirtual());
208             assertTrue(bean.getMountedVirtualThreadCount() >= 1);
209         });
210     }
211 
212     /**
213      * Test getQueuedVirtualThreadCount.
214      */
215     @ParameterizedTest
216     @MethodSource("managedBeans")
217     void testQueuedVirtualThreadCount(VirtualThreadSchedulerMXBean bean) throws Exception {
218         assumeFalse(Thread.currentThread().isVirtual(), "Main thread is a virtual thread");
219 
220         try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
221             var done = new AtomicBoolean();
222             Runnable busyTask = () -> {
223                 while (!done.get()) {
224                     Thread.onSpinWait();
225                 }
226             };
227 
228             try {
229                 // saturate
230                 int parallelism = bean.getParallelism();
231                 IntStream.range(0, parallelism).forEach(_ -> executor.submit(busyTask));
232                 awaitMountedVirtualThreadCountGte(bean, parallelism);
233 
234                 // start 5 virtual threads, their tasks will be queued to execute
235                 for (int i = 0; i < 5; i++) {
236                     executor.submit(() -> { });
237                 }
238                 assertTrue(bean.getQueuedVirtualThreadCount() >= 5);
239             } finally {
240                 done.set(true);
241             }
242         }
243     }
244 
245     /**
246      * Waits for pool size >= target to be true.
247      */
248     void awaitPoolSizeGte(VirtualThreadSchedulerMXBean bean, int target) throws InterruptedException {
249         awaitPoolSize(bean, ps -> ps >= target, ">= " + target);
250     }
251 
252     /**
253      * Waits for the mounted virtual thread count >= target to be true.
254      */
255     void awaitMountedVirtualThreadCountGte(VirtualThreadSchedulerMXBean bean,
256                                            long target) throws InterruptedException {
257         awaitMountedVirtualThreadCount(bean, c -> c >= target, ">= " + target);
258     }
259 
260     /**
261      * Waits for the mounted virtual thread count <= target to be true.
262      */
263     void awaitMountedVirtualThreadCountLte(VirtualThreadSchedulerMXBean bean,
264                                            long target) throws InterruptedException {
265         awaitMountedVirtualThreadCount(bean, c -> c <= target, "<= " + target);
266     }
267 
268     /**
269      * Waits for the mounted virtual thread count == target to be true.
270      */
271     void awaitMountedVirtualThreadCountEq(VirtualThreadSchedulerMXBean bean,
272                                           long target) throws InterruptedException {
273         awaitMountedVirtualThreadCount(bean, c -> c == target, "== " + target);
274     }
275 
276     /**
277      * Waits until evaluating the given predicte on the pool size is true.
278      */
279     void awaitPoolSize(VirtualThreadSchedulerMXBean bean,
280                        IntPredicate predicate,
281                        String reason) throws InterruptedException {
282         int poolSize = bean.getPoolSize();
283         if (!predicate.test(poolSize)) {
284             System.err.format("poolSize = %d, await %s ...%n", poolSize, reason);
285             while (!predicate.test(poolSize)) {
286                 Thread.sleep(10);
287                 poolSize = bean.getPoolSize();
288             }
289             System.err.format("poolSize = %d%n", poolSize);
290         }
291     }
292 
293     /**
294      * Waits until evaluating the given predicte on the mounted thread count is true.
295      */
296     void awaitMountedVirtualThreadCount(VirtualThreadSchedulerMXBean bean,
297                                         LongPredicate predicate,
298                                         String reason) throws InterruptedException {
299         long count = bean.getMountedVirtualThreadCount();
300         if (!predicate.test(count)) {
301             System.err.format("mountedVirtualThreadCount = %d, await %s ...%n", count, reason);
302             while (!predicate.test(count)) {
303                 Thread.sleep(10);
304                 count = bean.getMountedVirtualThreadCount();
305             }
306             System.err.format("mountedVirtualThreadCount = %d%n", count);
307         }
308     }
309 }