1 /*
2 * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /**
25 * @test
26 * @summary Test using a custom scheduler as the default virtual thread scheduler
27 * @requires vm.continuations
28 * @run junit/othervm -Djdk.virtualThreadScheduler.implClass=CustomDefaultScheduler$CustomScheduler1
29 * --enable-native-access=ALL-UNNAMED CustomDefaultScheduler
30 * @run junit/othervm -Djdk.virtualThreadScheduler.implClass=CustomDefaultScheduler$CustomScheduler2
31 * --enable-native-access=ALL-UNNAMED CustomDefaultScheduler
32 */
33
34 import java.lang.Thread.VirtualThreadScheduler;
35 import java.util.Set;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.ExecutorService;
39 import java.util.concurrent.ThreadFactory;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.atomic.AtomicBoolean;
42 import java.util.concurrent.atomic.AtomicReference;
43 import java.util.concurrent.locks.LockSupport;
44
45 import org.junit.jupiter.api.Test;
46 import org.junit.jupiter.api.BeforeAll;
47 import static org.junit.jupiter.api.Assertions.*;
48 import static org.junit.jupiter.api.Assumptions.*;
49
50 class CustomDefaultScheduler {
51 private static String schedulerClassName;
52
53 @BeforeAll
54 static void setup() {
55 schedulerClassName = System.getProperty("jdk.virtualThreadScheduler.implClass");
56 }
57
58 /**
59 * Custom scheduler that uses a thread pool.
60 */
61 public static class CustomScheduler1 implements VirtualThreadScheduler {
62 private final ExecutorService pool;
63
64 public CustomScheduler1() {
65 ThreadFactory factory = Thread.ofPlatform().daemon().factory();
66 pool = Executors.newFixedThreadPool(1, factory);
67 }
68
69 void execute(Thread thread, Runnable task) {
70 if (thread.isVirtual()) {
71 pool.execute(task);
72 } else {
73 throw new UnsupportedOperationException();
74 }
75 }
76
77 @Override
78 public void onStart(Thread thread, Runnable task) {
79 execute(thread, task);
80 }
81
82 @Override
83 public void onContinue(Thread thread, Runnable task) {
84 execute(thread, task);
85 }
86 }
87
88 /**
89 * Custom scheduler that delegates to the built-in default scheduler.
90 */
91 public static class CustomScheduler2 implements VirtualThreadScheduler {
92 private final VirtualThreadScheduler builtinScheduler;
93
94 // the set of threads that executed with this scheduler
95 private final Set<Thread> executed = ConcurrentHashMap.newKeySet();
96
97 public CustomScheduler2(VirtualThreadScheduler builtinScheduler) {
98 this.builtinScheduler = builtinScheduler;
99 }
100
101 VirtualThreadScheduler builtinScheduler() {
102 return builtinScheduler;
103 }
104
105 @Override
106 public void onStart(Thread vthread, Runnable task) {
107 executed.add(vthread);
108 builtinScheduler.onStart(vthread, task);
109 }
110
111 @Override
112 public void onContinue(Thread vthread, Runnable task) {
113 executed.add(vthread);
114 builtinScheduler.onContinue(vthread, task);
115 }
116
117 Set<Thread> threadsExecuted() {
118 return executed;
119 }
120 }
121
122 /**
123 * Test that a virtual thread uses the custom default scheduler.
124 */
125 @Test
126 void testUseCustomScheduler() throws Exception {
127 var ref = new AtomicReference<VirtualThreadScheduler>();
128 Thread.startVirtualThread(() -> {
129 ref.set(VirtualThreadScheduler.current());
130 }).join();
131 VirtualThreadScheduler scheduler = ref.get();
132 assertEquals(schedulerClassName, scheduler.getClass().getName());
133 }
134
135 /**
136 * Test virtual thread park/unpark using custom default scheduler.
137 */
138 @Test
139 void testPark() throws Exception {
140 var done = new AtomicBoolean();
141 var thread = Thread.startVirtualThread(() -> {
142 while (!done.get()) {
143 LockSupport.park();
144 }
145 });
146 try {
147 await(thread, Thread.State.WAITING);
148 } finally {
149 done.set(true);
150 LockSupport.unpark(thread);
151 thread.join();
152 }
153 }
154
155 /**
156 * Test virtual thread blocking on monitor when using custom default scheduler.
157 */
158 @Test
159 void testBlock() throws Exception {
160 var ready = new CountDownLatch(1);
161 var lock = new Object();
162 var thread = Thread.ofVirtual().unstarted(() -> {
163 ready.countDown();
164 synchronized (lock) {
165 }
166 });
167 synchronized (lock) {
168 thread.start();
169 ready.await();
170 await(thread, Thread.State.BLOCKED);
171 }
172 thread.join();
173 }
174
175 /**
176 * Test custom default scheduler execute method with bad parameters.
177 */
178 @Test
179 void testExecuteThrows() throws Exception {
180 var ref = new AtomicReference<VirtualThreadScheduler>();
181 Thread vthread = Thread.startVirtualThread(() -> {
182 ref.set(VirtualThreadScheduler.current());
183 });
184 vthread.join();
185 VirtualThreadScheduler scheduler = ref.get();
186
187 Runnable task = () -> { };
188
189 // platform thread
190 Thread thread = Thread.ofPlatform().unstarted(() -> { });
191 assertThrows(UnsupportedOperationException.class, () -> scheduler.onContinue(thread, task));
192
193 // nulls
194 assertThrows(NullPointerException.class, () -> scheduler.onContinue(null, task));
195 assertThrows(NullPointerException.class, () -> scheduler.onContinue(vthread, null));
196 }
197
198 /**
199 * Test custom default scheduler delegating to builtin default scheduler.
200 */
201 @Test
202 void testDelegatingToBuiltin1() throws Exception {
203 assumeTrue(schedulerClassName.equals("CustomDefaultScheduler$CustomScheduler2"));
204
205 var ref = new AtomicReference<VirtualThreadScheduler>();
206 Thread vthread = Thread.startVirtualThread(() -> {
207 ref.set(VirtualThreadScheduler.current());
208 });
209 vthread.join();
210
211 var customScheduler1 = new CustomScheduler1();
212 var customScheduler2 = (CustomScheduler2) ref.get();
213 var builtinScheduler = customScheduler2.builtinScheduler();
214
215 // ensure builtin default scheduler can't be shutdown
216 assertThrows(ClassCastException.class, () -> { var e = (AutoCloseable) builtinScheduler; });
217
218 var vthread0 = Thread.ofVirtual().scheduler(builtinScheduler).unstarted(() -> { });
219 var vthread1 = Thread.ofVirtual().scheduler(customScheduler1).unstarted(() -> { });
220 var vthread2 = Thread.ofVirtual().scheduler(customScheduler2).unstarted(() -> { });
221
222 Runnable task = () -> { };
223
224 // builtin scheduler can execute tasks for itself or customScheduler2
225 builtinScheduler.onContinue(vthread0, task);
226 assertThrows(IllegalArgumentException.class, () -> builtinScheduler.onContinue(vthread1, task));
227 builtinScheduler.onContinue(vthread2, task);
228
229 assertThrows(IllegalArgumentException.class, () -> customScheduler2.onContinue(vthread1, task));
230 customScheduler2.onContinue(vthread2, task);
231 }
232
233 /**
234 * Test one virtual thread starting a second virtual thread when both are scheduled
235 * by a custom default scheduler delegating to builtin default scheduler.
236 */
237 @Test
238 void testDelegatingToBuiltin2() throws Exception {
239 assumeTrue(schedulerClassName.equals("CustomDefaultScheduler$CustomScheduler2"));
240
241 var schedulerRef = new AtomicReference<VirtualThreadScheduler>();
242 var vthreadRef = new AtomicReference<Thread>();
243
244 var vthread1 = Thread.ofVirtual().start(() -> {
245 schedulerRef.set(VirtualThreadScheduler.current());
246 Thread vthread2 = Thread.ofVirtual().start(() -> {
247 assertTrue(VirtualThreadScheduler.current() == schedulerRef.get());
248 vthreadRef.set(Thread.currentThread());
249 });
250 try {
251 vthread2.join();
252 } catch (InterruptedException e) {
253 // fail();
254 }
255 });
256
257 vthread1.join();
258 Thread vthread2 = vthreadRef.get();
259
260 var customScheduler = (CustomScheduler2) schedulerRef.get();
261 assertTrue(customScheduler.threadsExecuted().contains(vthread1));
262 assertTrue(customScheduler.threadsExecuted().contains(vthread2));
263 }
264
265 /**
266 * Waits for the given thread to reach a given state.
267 */
268 private void await(Thread thread, Thread.State expectedState) throws InterruptedException {
269 Thread.State state = thread.getState();
270 while (state != expectedState) {
271 assertTrue(state != Thread.State.TERMINATED, "Thread has terminated");
272 Thread.sleep(10);
273 state = thread.getState();
274 }
275 }
276 }