1 /*
2 * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /*
25 * @test id=default
26 * @summary Test using a custom scheduler as the default virtual thread scheduler
27 * @requires vm.continuations
28 * @modules java.base/java.lang:+open
29 * @library /test/lib
30 * @run junit/othervm -Djdk.virtualThreadScheduler.implClass=CustomDefaultScheduler$CustomScheduler1
31 * --enable-native-access=ALL-UNNAMED CustomDefaultScheduler
32 * @run junit/othervm -Djdk.virtualThreadScheduler.implClass=CustomDefaultScheduler$CustomScheduler2
33 * --enable-native-access=ALL-UNNAMED CustomDefaultScheduler
34 */
35
36 /*
37 * @test id=poller-modes
38 * @requires vm.continuations
39 * @requires (os.family == "linux") | (os.family == "mac")
40 * @modules java.base/java.lang:+open
41 * @library /test/lib
42 * @run junit/othervm -Djdk.pollerMode=3
43 * -Djdk.virtualThreadScheduler.implClass=CustomDefaultScheduler$CustomScheduler1
44 * --enable-native-access=ALL-UNNAMED CustomDefaultScheduler
45 * @run junit/othervm -Djdk.pollerMode=3
46 * -Djdk.virtualThreadScheduler.implClass=CustomDefaultScheduler$CustomScheduler2
47 * --enable-native-access=ALL-UNNAMED CustomDefaultScheduler
48 */
49
50 import java.io.Closeable;
51 import java.io.IOException;
52 import java.lang.Thread.VirtualThreadScheduler;
53 import java.lang.Thread.VirtualThreadTask;
54 import java.net.InetAddress;
55 import java.net.InetSocketAddress;
56 import java.net.ServerSocket;
57 import java.net.Socket;
58 import java.util.Set;
59 import java.util.concurrent.ConcurrentHashMap;
60 import java.util.concurrent.Executors;
61 import java.util.concurrent.ExecutorService;
62 import java.util.concurrent.ThreadFactory;
63 import java.util.concurrent.CountDownLatch;
64 import java.util.concurrent.atomic.AtomicBoolean;
65 import java.util.concurrent.atomic.AtomicReference;
66 import java.util.concurrent.locks.LockSupport;
67 import jdk.test.lib.thread.VThreadRunner;
68 import jdk.test.lib.thread.VThreadScheduler;
69
70 import org.junit.jupiter.api.Test;
71 import org.junit.jupiter.api.BeforeAll;
72 import static org.junit.jupiter.api.Assertions.*;
73 import static org.junit.jupiter.api.Assumptions.*;
74
75 class CustomDefaultScheduler {
76 private static String schedulerClassName;
77
78 @BeforeAll
79 static void setup() {
80 schedulerClassName = System.getProperty("jdk.virtualThreadScheduler.implClass");
81 }
82
83 /**
84 * Custom scheduler that uses a thread pool.
85 */
86 public static class CustomScheduler1 implements VirtualThreadScheduler {
87 private final ExecutorService pool;
88
89 public CustomScheduler1() {
90 ThreadFactory factory = Thread.ofPlatform().daemon().factory();
91 pool = Executors.newFixedThreadPool(1, factory);
92 }
93
94 @Override
95 public void onStart(VirtualThreadTask task) {
96 pool.execute(task);
97 }
98
99 @Override
100 public void onContinue(VirtualThreadTask task) {
101 pool.execute(task);
102 }
103 }
104
105 /**
106 * Custom scheduler that delegates to the built-in default scheduler.
107 */
108 public static class CustomScheduler2 implements VirtualThreadScheduler {
109 private final VirtualThreadScheduler builtinScheduler;
110
111 // the set of threads that executed with this scheduler
112 private final Set<Thread> executed = ConcurrentHashMap.newKeySet();
113
114 public CustomScheduler2(VirtualThreadScheduler builtinScheduler) {
115 this.builtinScheduler = builtinScheduler;
116 }
117
118 VirtualThreadScheduler builtinScheduler() {
119 return builtinScheduler;
120 }
121
122 @Override
123 public void onStart(VirtualThreadTask task) {
124 executed.add(task.thread());
125 builtinScheduler.onStart(task);
126 }
127
128 @Override
129 public void onContinue(VirtualThreadTask task) {
130 executed.add(task.thread());
131 builtinScheduler.onContinue(task);
132 }
133
134 Set<Thread> threadsExecuted() {
135 return executed;
136 }
137 }
138
139 /**
140 * Test that a virtual thread uses the custom default scheduler.
141 */
142 @Test
143 void testUseCustomScheduler() throws Exception {
144 var ref = new AtomicReference<VirtualThreadScheduler>();
145 Thread.startVirtualThread(() -> {
146 ref.set(currentScheduler());
147 }).join();
148 VirtualThreadScheduler scheduler = ref.get();
149 assertEquals(schedulerClassName, scheduler.getClass().getName());
150 }
151
152 /**
153 * Test virtual thread park/unpark when using custom default scheduler.
154 */
155 @Test
156 void testPark() throws Exception {
157 var done = new AtomicBoolean();
158 var thread = Thread.startVirtualThread(() -> {
159 while (!done.get()) {
160 LockSupport.park();
161 }
162 });
163 try {
164 await(thread, Thread.State.WAITING);
165 } finally {
166 done.set(true);
167 LockSupport.unpark(thread);
168 thread.join();
169 }
170 }
171
172 /**
173 * Test virtual thread blocking on a monitor when using custom default scheduler.
174 */
175 @Test
176 void testBlockMonitor() throws Exception {
177 var ready = new CountDownLatch(1);
178 var lock = new Object();
179 var thread = Thread.ofVirtual().unstarted(() -> {
180 ready.countDown();
181 synchronized (lock) {
182 }
183 });
184 synchronized (lock) {
185 thread.start();
186 ready.await();
187 await(thread, Thread.State.BLOCKED);
188 }
189 thread.join();
190 }
191
192 /**
193 * Test virtual thread blocking on a socket I/O when using custom default scheduler.
194 */
195 @Test
196 void testBlockSocket() throws Exception {
197 VThreadRunner.run(() -> {
198 try (var connection = new Connection()) {
199 Socket s1 = connection.socket1();
200 Socket s2 = connection.socket2();
201
202 // write bytes after current virtual thread has parked
203 byte[] ba1 = "XXX".getBytes("UTF-8");
204 runAfterParkedAsync(() -> s1.getOutputStream().write(ba1));
205
206 byte[] ba2 = new byte[10];
207 int n = s2.getInputStream().read(ba2);
208 assertTrue(n > 0);
209 assertTrue(ba2[0] == 'X');
210 }
211 });
212 }
213
214 /**
215 * Test one virtual thread starting a second virtual thread when both are scheduled
216 * by a custom default scheduler delegating to builtin default scheduler.
217 */
218 @Test
219 void testDelegatingToBuiltin() throws Exception {
220 assumeTrue(schedulerClassName.equals("CustomDefaultScheduler$CustomScheduler2"));
221
222 var schedulerRef = new AtomicReference<VirtualThreadScheduler>();
223 var vthreadRef = new AtomicReference<Thread>();
224
225 var vthread1 = Thread.ofVirtual().start(() -> {
226 schedulerRef.set(currentScheduler());
227 Thread vthread2 = Thread.ofVirtual().start(() -> {
228 assertTrue(currentScheduler() == schedulerRef.get());
229 vthreadRef.set(Thread.currentThread());
230 });
231 try {
232 vthread2.join();
233 } catch (InterruptedException e) {
234 // fail();
235 }
236 });
237
238 vthread1.join();
239 Thread vthread2 = vthreadRef.get();
240
241 var customScheduler = (CustomScheduler2) schedulerRef.get();
242 assertTrue(customScheduler.threadsExecuted().contains(vthread1));
243 assertTrue(customScheduler.threadsExecuted().contains(vthread2));
244 }
245
246 /**
247 * Returns the scheduler for the current virtual thread.
248 */
249 private static VirtualThreadScheduler currentScheduler() {
250 return VThreadScheduler.scheduler(Thread.currentThread());
251 }
252
253 /**
254 * Waits for the given thread to reach a given state.
255 */
256 private void await(Thread thread, Thread.State expectedState) throws InterruptedException {
257 Thread.State state = thread.getState();
258 while (state != expectedState) {
259 assertTrue(state != Thread.State.TERMINATED, "Thread has terminated");
260 Thread.sleep(10);
261 state = thread.getState();
262 }
263 }
264
265 @FunctionalInterface
266 private interface ThrowingRunnable {
267 void run() throws Exception;
268 }
269
270 /**
271 * Runs the given task asynchronously after the current virtual thread has parked.
272 * @return the thread started to run the task
273 */
274 private static Thread runAfterParkedAsync(ThrowingRunnable task) {
275 Thread target = Thread.currentThread();
276 if (!target.isVirtual())
277 throw new WrongThreadException();
278 return Thread.ofPlatform().daemon().start(() -> {
279 try {
280 Thread.State state = target.getState();
281 while (state != Thread.State.WAITING
282 && state != Thread.State.TIMED_WAITING) {
283 Thread.sleep(20);
284 state = target.getState();
285 }
286 Thread.sleep(20); // give a bit more time to release carrier
287 task.run();
288 } catch (Exception e) {
289 e.printStackTrace();
290 }
291 });
292 }
293
294 /**
295 * Creates a loopback connection
296 */
297 private static class Connection implements Closeable {
298 private final Socket s1;
299 private final Socket s2;
300 Connection() throws IOException {
301 var lh = InetAddress.getLoopbackAddress();
302 try (var listener = new ServerSocket()) {
303 listener.bind(new InetSocketAddress(lh, 0));
304 Socket s1 = new Socket();
305 Socket s2;
306 try {
307 s1.connect(listener.getLocalSocketAddress());
308 s2 = listener.accept();
309 } catch (IOException ioe) {
310 s1.close();
311 throw ioe;
312 }
313 this.s1 = s1;
314 this.s2 = s2;
315 }
316
317 }
318 Socket socket1() {
319 return s1;
320 }
321 Socket socket2() {
322 return s2;
323 }
324 @Override
325 public void close() throws IOException {
326 s1.close();
327 s2.close();
328 }
329 }
330 }