1 /* 2 * Copyright (c) 2020, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /** 25 * @test 26 * @summary Test virtual threads using a custom scheduler 27 * @requires vm.continuations 28 * @library /test/lib 29 * @run junit CustomScheduler 30 */ 31 32 import java.lang.reflect.Field; 33 import java.time.Duration; 34 import java.util.ArrayList; 35 import java.util.List; 36 import java.util.concurrent.Executors; 37 import java.util.concurrent.ExecutorService; 38 import java.util.concurrent.atomic.AtomicBoolean; 39 import java.util.concurrent.atomic.AtomicInteger; 40 import java.util.concurrent.atomic.AtomicReference; 41 import java.util.concurrent.locks.LockSupport; 42 43 import jdk.test.lib.thread.VThreadRunner; 44 import org.junit.jupiter.api.Test; 45 import org.junit.jupiter.api.BeforeAll; 46 import org.junit.jupiter.api.AfterAll; 47 import static org.junit.jupiter.api.Assertions.*; 48 import static org.junit.jupiter.api.Assumptions.*; 49 50 class CustomScheduler { 51 private static Thread.VirtualThreadScheduler defaultScheduler; 52 private static ExecutorService threadPool1, threadPool2; 53 private static Thread.VirtualThreadScheduler scheduler1, scheduler2; 54 55 @BeforeAll 56 static void setup() throws Exception { 57 var ref = new AtomicReference<Thread.VirtualThreadScheduler>(); 58 Thread thread = Thread.startVirtualThread(() -> { 59 ref.set(Thread.VirtualThreadScheduler.current()); 60 }); 61 thread.join(); 62 defaultScheduler = ref.get(); 63 64 threadPool1 = Executors.newFixedThreadPool(1); 65 threadPool2 = Executors.newFixedThreadPool(1); 66 scheduler1 = Thread.VirtualThreadScheduler.adapt(threadPool1); 67 scheduler2 = Thread.VirtualThreadScheduler.adapt(threadPool2); 68 } 69 70 @AfterAll 71 static void shutdown() { 72 threadPool1.shutdown(); 73 threadPool2.shutdown(); 74 } 75 76 /** 77 * Test platform thread creating a virtual thread that uses a custom scheduler. 78 */ 79 @Test 80 void testCustomScheduler1() throws Exception { 81 var ref = new AtomicReference<Thread.VirtualThreadScheduler>(); 82 Thread thread = Thread.ofVirtual().scheduler(scheduler1).start(() -> { 83 ref.set(Thread.VirtualThreadScheduler.current()); 84 }); 85 thread.join(); 86 assertTrue(ref.get() == scheduler1); 87 } 88 89 /** 90 * Test virtual thread creating a virtual thread that uses a custom scheduler. 91 */ 92 @Test 93 void testCustomScheduler2() throws Exception { 94 VThreadRunner.run(this::testCustomScheduler1); 95 } 96 97 /** 98 * Test virtual thread using custom scheduler creating a virtual thread that uses 99 * the default scheduler. 100 */ 101 @Test 102 void testCustomScheduler3() throws Exception { 103 var ref = new AtomicReference<Thread.VirtualThreadScheduler>(); 104 Thread thread = Thread.ofVirtual().scheduler(scheduler1).start(() -> { 105 try { 106 Thread.ofVirtual().start(() -> { 107 ref.set(Thread.VirtualThreadScheduler.current()); 108 }).join(); 109 } catch (Exception e) { 110 e.printStackTrace(); 111 } 112 }); 113 thread.join(); 114 assertTrue(ref.get() == defaultScheduler); 115 } 116 117 /** 118 * Test virtual thread using custom scheduler creating a virtual thread 119 * that uses a different custom scheduler. 120 */ 121 @Test 122 void testCustomScheduler4() throws Exception { 123 var ref = new AtomicReference<Thread.VirtualThreadScheduler>(); 124 Thread thread1 = Thread.ofVirtual().scheduler(scheduler1).start(() -> { 125 try { 126 Thread thread2 = Thread.ofVirtual().scheduler(scheduler2).start(() -> { 127 ref.set(Thread.VirtualThreadScheduler.current()); 128 }); 129 thread2.join(); 130 } catch (Exception e) { 131 e.printStackTrace(); 132 } 133 }); 134 thread1.join(); 135 assertTrue(ref.get() == scheduler2); 136 } 137 138 /** 139 * Test running task on a virtual thread, should thrown WrongThreadException. 140 */ 141 @Test 142 void testBadCarrier() { 143 Thread.VirtualThreadScheduler scheduler = (_, task) -> { 144 var exc = new AtomicReference<Throwable>(); 145 try { 146 Thread.ofVirtual().start(() -> { 147 try { 148 task.run(); 149 fail(); 150 } catch (Throwable e) { 151 exc.set(e); 152 } 153 }).join(); 154 } catch (InterruptedException e) { 155 fail(); 156 } 157 assertTrue(exc.get() instanceof WrongThreadException); 158 }; 159 Thread.ofVirtual().scheduler(scheduler).start(LockSupport::park); 160 } 161 162 /** 163 * Test parking with the virtual thread interrupt set, should not leak to the 164 * carrier thread when the task completes. 165 */ 166 @Test 167 void testParkWithInterruptSet() { 168 Thread carrier = Thread.currentThread(); 169 assumeFalse(carrier.isVirtual(), "Main thread is a virtual thread"); 170 try { 171 var scheduler = Thread.VirtualThreadScheduler.adapt(Runnable::run); 172 Thread vthread = Thread.ofVirtual().scheduler(scheduler).start(() -> { 173 Thread.currentThread().interrupt(); 174 Thread.yield(); 175 }); 176 assertTrue(vthread.isInterrupted()); 177 assertFalse(carrier.isInterrupted()); 178 } finally { 179 Thread.interrupted(); 180 } 181 } 182 183 /** 184 * Test terminating with the virtual thread interrupt set, should not leak to 185 * the carrier thread when the task completes. 186 */ 187 @Test 188 void testTerminateWithInterruptSet() { 189 Thread carrier = Thread.currentThread(); 190 assumeFalse(carrier.isVirtual(), "Main thread is a virtual thread"); 191 try { 192 var scheduler = Thread.VirtualThreadScheduler.adapt(Runnable::run); 193 Thread vthread = Thread.ofVirtual().scheduler(scheduler).start(() -> { 194 Thread.currentThread().interrupt(); 195 }); 196 assertTrue(vthread.isInterrupted()); 197 assertFalse(carrier.isInterrupted()); 198 } finally { 199 Thread.interrupted(); 200 } 201 } 202 203 /** 204 * Test running task with the carrier interrupt status set. 205 */ 206 @Test 207 void testRunWithInterruptSet() throws Exception { 208 assumeFalse(Thread.currentThread().isVirtual(), "Main thread is a virtual thread"); 209 var scheduler = Thread.VirtualThreadScheduler.adapt(task -> { 210 Thread.currentThread().interrupt(); 211 task.run(); 212 }); 213 try { 214 AtomicBoolean interrupted = new AtomicBoolean(); 215 Thread vthread = Thread.ofVirtual().scheduler(scheduler).start(() -> { 216 interrupted.set(Thread.currentThread().isInterrupted()); 217 }); 218 assertFalse(vthread.isInterrupted()); 219 } finally { 220 Thread.interrupted(); 221 } 222 } 223 224 /** 225 * Test custom scheduler throwing OOME when starting a thread. 226 */ 227 @Test 228 void testThreadStartOOME() throws Exception { 229 var scheduler = Thread.VirtualThreadScheduler.adapt(task -> { 230 System.err.println("OutOfMemoryError"); 231 throw new OutOfMemoryError(); 232 }); 233 Thread thread = Thread.ofVirtual().scheduler(scheduler).unstarted(() -> { }); 234 assertThrows(OutOfMemoryError.class, thread::start); 235 } 236 237 /** 238 * Test custom scheduler throwing OOME when unparking a thread. 239 */ 240 @Test 241 void testThreadUnparkOOME() throws Exception { 242 try (ExecutorService executor = Executors.newFixedThreadPool(1)) { 243 AtomicInteger counter = new AtomicInteger(); 244 var scheduler = Thread.VirtualThreadScheduler.adapt(task -> { 245 switch (counter.getAndIncrement()) { 246 case 0 -> executor.execute(task); // Thread.start 247 case 1, 2 -> { // unpark attempt 1+2 248 System.err.println("OutOfMemoryError"); 249 throw new OutOfMemoryError(); 250 } 251 default -> executor.execute(task); 252 } 253 executor.execute(task); 254 }); 255 256 // start thread and wait for it to park 257 var thread = Thread.ofVirtual().scheduler(scheduler).start(LockSupport::park); 258 await(thread, Thread.State.WAITING); 259 260 // unpark thread, this should retry until OOME is not thrown 261 LockSupport.unpark(thread); 262 thread.join(); 263 } 264 265 } 266 267 /** 268 * Waits for the given thread to reach a given state. 269 */ 270 private void await(Thread thread, Thread.State expectedState) throws InterruptedException { 271 Thread.State state = thread.getState(); 272 while (state != expectedState) { 273 assertTrue(state != Thread.State.TERMINATED, "Thread has terminated"); 274 Thread.sleep(10); 275 state = thread.getState(); 276 } 277 } 278 }