1 /*
  2  * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test id=default
 26  * @summary Test using a custom scheduler as the default virtual thread scheduler
 27  * @requires vm.continuations
 28  * @modules java.base/java.lang:+open
 29  * @library /test/lib
 30  * @run junit/othervm -Djdk.virtualThreadScheduler.implClass=CustomDefaultScheduler$CustomScheduler1
 31  *     --enable-native-access=ALL-UNNAMED CustomDefaultScheduler
 32  * @run junit/othervm -Djdk.virtualThreadScheduler.implClass=CustomDefaultScheduler$CustomScheduler2
 33  *     --enable-native-access=ALL-UNNAMED CustomDefaultScheduler
 34  */
 35 
 36 /*
 37  * @test id=poller-modes
 38  * @requires vm.continuations
 39  * @requires (os.family == "linux") | (os.family == "mac")
 40  * @modules java.base/java.lang:+open
 41  * @library /test/lib
 42  * @run junit/othervm -Djdk.pollerMode=3
 43  *     -Djdk.virtualThreadScheduler.implClass=CustomDefaultScheduler$CustomScheduler1
 44  *     --enable-native-access=ALL-UNNAMED CustomDefaultScheduler
 45  * @run junit/othervm -Djdk.pollerMode=3
 46  *     -Djdk.virtualThreadScheduler.implClass=CustomDefaultScheduler$CustomScheduler2
 47  *     --enable-native-access=ALL-UNNAMED CustomDefaultScheduler
 48  */
 49 
 50 import java.io.Closeable;
 51 import java.io.IOException;
 52 import java.lang.Thread.VirtualThreadScheduler;
 53 import java.lang.Thread.VirtualThreadTask;
 54 import java.net.InetAddress;
 55 import java.net.InetSocketAddress;
 56 import java.net.ServerSocket;
 57 import java.net.Socket;
 58 import java.util.Set;
 59 import java.util.concurrent.ConcurrentHashMap;
 60 import java.util.concurrent.Executors;
 61 import java.util.concurrent.ExecutorService;
 62 import java.util.concurrent.ThreadFactory;
 63 import java.util.concurrent.CountDownLatch;
 64 import java.util.concurrent.atomic.AtomicBoolean;
 65 import java.util.concurrent.atomic.AtomicReference;
 66 import java.util.concurrent.locks.LockSupport;
 67 import jdk.test.lib.thread.VThreadRunner;
 68 import jdk.test.lib.thread.VThreadScheduler;
 69 
 70 import org.junit.jupiter.api.Test;
 71 import org.junit.jupiter.api.BeforeAll;
 72 import static org.junit.jupiter.api.Assertions.*;
 73 import static org.junit.jupiter.api.Assumptions.*;
 74 
 75 class CustomDefaultScheduler {
 76     private static String schedulerClassName;
 77 
 78     @BeforeAll
 79     static void setup() {
 80         schedulerClassName = System.getProperty("jdk.virtualThreadScheduler.implClass");
 81     }
 82 
 83     /**
 84      * Custom scheduler that uses a thread pool.
 85      */
 86     public static class CustomScheduler1 implements VirtualThreadScheduler {
 87         private final ExecutorService pool;
 88 
 89         public CustomScheduler1() {
 90             ThreadFactory factory = Thread.ofPlatform().daemon().factory();
 91             pool = Executors.newFixedThreadPool(1, factory);
 92         }
 93 
 94         @Override
 95         public void onStart(VirtualThreadTask task) {
 96             pool.execute(task);
 97         }
 98 
 99         @Override
100         public void onContinue(VirtualThreadTask task) {
101             pool.execute(task);
102         }
103     }
104 
105     /**
106      * Custom scheduler that delegates to the built-in default scheduler.
107      */
108     public static class CustomScheduler2 implements VirtualThreadScheduler {
109         private final VirtualThreadScheduler builtinScheduler;
110 
111         // the set of threads that executed with this scheduler
112         private final Set<Thread> executed = ConcurrentHashMap.newKeySet();
113 
114         public CustomScheduler2(VirtualThreadScheduler builtinScheduler) {
115             this.builtinScheduler = builtinScheduler;
116         }
117 
118         VirtualThreadScheduler builtinScheduler() {
119             return builtinScheduler;
120         }
121 
122         @Override
123         public void onStart(VirtualThreadTask task) {
124             executed.add(task.thread());
125             builtinScheduler.onStart(task);
126         }
127 
128         @Override
129         public void onContinue(VirtualThreadTask task) {
130             executed.add(task.thread());
131             builtinScheduler.onContinue(task);
132         }
133 
134         Set<Thread> threadsExecuted() {
135             return executed;
136         }
137     }
138 
139     /**
140      * Test that a virtual thread uses the custom default scheduler.
141      */
142     @Test
143     void testUseCustomScheduler() throws Exception {
144         var ref = new AtomicReference<VirtualThreadScheduler>();
145         Thread.startVirtualThread(() -> {
146             ref.set(currentScheduler());
147         }).join();
148         VirtualThreadScheduler scheduler = ref.get();
149         assertEquals(schedulerClassName, scheduler.getClass().getName());
150     }
151 
152     /**
153      * Test virtual thread park/unpark when using custom default scheduler.
154      */
155     @Test
156     void testPark() throws Exception {
157         var done = new AtomicBoolean();
158         var thread = Thread.startVirtualThread(() -> {
159             while (!done.get()) {
160                 LockSupport.park();
161             }
162         });
163         try {
164             await(thread, Thread.State.WAITING);
165         } finally {
166             done.set(true);
167             LockSupport.unpark(thread);
168             thread.join();
169         }
170     }
171 
172     /**
173      * Test virtual thread blocking on a monitor when using custom default scheduler.
174      */
175     @Test
176     void testBlockMonitor() throws Exception {
177         var ready = new CountDownLatch(1);
178         var lock = new Object();
179         var thread = Thread.ofVirtual().unstarted(() -> {
180             ready.countDown();
181             synchronized (lock) {
182             }
183         });
184         synchronized (lock) {
185             thread.start();
186             ready.await();
187             await(thread, Thread.State.BLOCKED);
188         }
189         thread.join();
190     }
191 
192     /**
193      * Test virtual thread blocking on a socket I/O when using custom default scheduler.
194      */
195     @Test
196     void testBlockSocket() throws Exception {
197         VThreadRunner.run(() -> {
198             try (var connection = new Connection()) {
199                 Socket s1 = connection.socket1();
200                 Socket s2 = connection.socket2();
201 
202                 // write bytes after current virtual thread has parked
203                 byte[] ba1 = "XXX".getBytes("UTF-8");
204                 runAfterParkedAsync(() -> s1.getOutputStream().write(ba1));
205 
206                 byte[] ba2 = new byte[10];
207                 int n = s2.getInputStream().read(ba2);
208                 assertTrue(n > 0);
209                 assertTrue(ba2[0] == 'X');
210             }
211         });
212     }
213 
214     /**
215      * Test one virtual thread starting a second virtual thread when both are scheduled
216      * by a custom default scheduler delegating to builtin default scheduler.
217      */
218     @Test
219     void testDelegatingToBuiltin() throws Exception {
220         assumeTrue(schedulerClassName.equals("CustomDefaultScheduler$CustomScheduler2"));
221 
222         var schedulerRef = new AtomicReference<VirtualThreadScheduler>();
223         var vthreadRef = new AtomicReference<Thread>();
224 
225         var vthread1 = Thread.ofVirtual().start(() -> {
226             schedulerRef.set(currentScheduler());
227             Thread vthread2 = Thread.ofVirtual().start(() -> {
228                 assertTrue(currentScheduler() == schedulerRef.get());
229                 vthreadRef.set(Thread.currentThread());
230             });
231             try {
232                 vthread2.join();
233             } catch (InterruptedException e) {
234                 // fail();
235             }
236         });
237 
238         vthread1.join();
239         Thread vthread2 = vthreadRef.get();
240 
241         var customScheduler = (CustomScheduler2) schedulerRef.get();
242         assertTrue(customScheduler.threadsExecuted().contains(vthread1));
243         assertTrue(customScheduler.threadsExecuted().contains(vthread2));
244     }
245 
246     /**
247      * Returns the scheduler for the current virtual thread.
248      */
249     private static VirtualThreadScheduler currentScheduler() {
250         return VThreadScheduler.scheduler(Thread.currentThread());
251     }
252 
253     /**
254      * Waits for the given thread to reach a given state.
255      */
256     private void await(Thread thread, Thread.State expectedState) throws InterruptedException {
257         Thread.State state = thread.getState();
258         while (state != expectedState) {
259             assertTrue(state != Thread.State.TERMINATED, "Thread has terminated");
260             Thread.sleep(10);
261             state = thread.getState();
262         }
263     }
264 
265     @FunctionalInterface
266     private interface ThrowingRunnable {
267         void run() throws Exception;
268     }
269 
270     /**
271      * Runs the given task asynchronously after the current virtual thread has parked.
272      * @return the thread started to run the task
273      */
274     private static Thread runAfterParkedAsync(ThrowingRunnable task) {
275         Thread target = Thread.currentThread();
276         if (!target.isVirtual())
277             throw new WrongThreadException();
278         return Thread.ofPlatform().daemon().start(() -> {
279             try {
280                 Thread.State state = target.getState();
281                 while (state != Thread.State.WAITING
282                         && state != Thread.State.TIMED_WAITING) {
283                     Thread.sleep(20);
284                     state = target.getState();
285                 }
286                 Thread.sleep(20);  // give a bit more time to release carrier
287                 task.run();
288             } catch (Exception e) {
289                 e.printStackTrace();
290             }
291         });
292     }
293 
294     /**
295      * Creates a loopback connection
296      */
297     private static class Connection implements Closeable {
298         private final Socket s1;
299         private final Socket s2;
300         Connection() throws IOException {
301             var lh = InetAddress.getLoopbackAddress();
302             try (var listener = new ServerSocket()) {
303                 listener.bind(new InetSocketAddress(lh, 0));
304                 Socket s1 = new Socket();
305                 Socket s2;
306                 try {
307                     s1.connect(listener.getLocalSocketAddress());
308                     s2 = listener.accept();
309                 } catch (IOException ioe) {
310                     s1.close();
311                     throw ioe;
312                 }
313                 this.s1 = s1;
314                 this.s2 = s2;
315             }
316 
317         }
318         Socket socket1() {
319             return s1;
320         }
321         Socket socket2() {
322             return s2;
323         }
324         @Override
325         public void close() throws IOException {
326             s1.close();
327             s2.close();
328         }
329     }
330 }