1 /* 2 * Copyright (c) 2020, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /** 25 * @test 26 * @summary Test virtual threads using a custom scheduler 27 * @requires vm.continuations 28 * @modules java.base/java.lang:+open 29 * @library /test/lib 30 * @run junit CustomScheduler 31 */ 32 33 import java.lang.reflect.Field; 34 import java.time.Duration; 35 import java.util.ArrayList; 36 import java.util.List; 37 import java.util.concurrent.*; 38 import java.util.concurrent.atomic.AtomicBoolean; 39 import java.util.concurrent.atomic.AtomicInteger; 40 import java.util.concurrent.atomic.AtomicReference; 41 import java.util.concurrent.locks.LockSupport; 42 43 import jdk.test.lib.thread.VThreadScheduler; 44 import jdk.test.lib.thread.VThreadRunner; 45 import org.junit.jupiter.api.Test; 46 import org.junit.jupiter.api.BeforeAll; 47 import org.junit.jupiter.api.AfterAll; 48 import static org.junit.jupiter.api.Assertions.*; 49 import static org.junit.jupiter.api.Assumptions.*; 50 51 class CustomScheduler { 52 private static ExecutorService scheduler1; 53 private static ExecutorService scheduler2; 54 55 @BeforeAll 56 static void setup() { 57 scheduler1 = Executors.newFixedThreadPool(1); 58 scheduler2 = Executors.newFixedThreadPool(1); 59 } 60 61 @AfterAll 62 static void shutdown() { 63 scheduler1.shutdown(); 64 scheduler2.shutdown(); 65 } 66 67 /** 68 * Test platform thread creating a virtual thread that uses a custom scheduler. 69 */ 70 @Test 71 void testCustomScheduler1() throws Exception { 72 var ref = new AtomicReference<Executor>(); 73 ThreadFactory factory = VThreadScheduler.virtualThreadFactory(scheduler1); 74 Thread thread = factory.newThread(() -> { 75 ref.set(VThreadScheduler.scheduler(Thread.currentThread())); 76 }); 77 thread.start(); 78 thread.join(); 79 assertTrue(ref.get() == scheduler1); 80 } 81 82 /** 83 * Test virtual thread creating a virtual thread that uses a custom scheduler. 84 */ 85 @Test 86 void testCustomScheduler2() throws Exception { 87 VThreadRunner.run(this::testCustomScheduler1); 88 } 89 90 /** 91 * Test virtual thread using custom scheduler creating a virtual thread that uses 92 * the default scheduler. 93 */ 94 @Test 95 void testCustomScheduler3() throws Exception { 96 var ref = new AtomicReference<Executor>(); 97 ThreadFactory factory = VThreadScheduler.virtualThreadFactory(scheduler1); 98 Thread thread = factory.newThread(() -> { 99 try { 100 Thread.ofVirtual().start(() -> { 101 ref.set(VThreadScheduler.scheduler(Thread.currentThread())); 102 }).join(); 103 } catch (Exception e) { 104 e.printStackTrace(); 105 } 106 }); 107 thread.start(); 108 thread.join(); 109 assertTrue(ref.get() == VThreadScheduler.defaultScheduler()); 110 } 111 112 /** 113 * Test virtual thread using custom scheduler creating a virtual thread 114 * that uses a different custom scheduler. 115 */ 116 @Test 117 void testCustomScheduler4() throws Exception { 118 var ref = new AtomicReference<Executor>(); 119 ThreadFactory factory1 = VThreadScheduler.virtualThreadFactory(scheduler1); 120 ThreadFactory factory2 = VThreadScheduler.virtualThreadFactory(scheduler2); 121 Thread thread1 = factory1.newThread(() -> { 122 try { 123 Thread thread2 = factory2.newThread(() -> { 124 ref.set(VThreadScheduler.scheduler(Thread.currentThread())); 125 }); 126 thread2.start(); 127 thread2.join(); 128 } catch (Exception e) { 129 e.printStackTrace(); 130 } 131 }); 132 thread1.start(); 133 thread1.join(); 134 assertTrue(ref.get() == scheduler2); 135 } 136 137 /** 138 * Test running task on a virtual thread, should thrown WrongThreadException. 139 */ 140 @Test 141 void testBadCarrier() { 142 Executor scheduler = (task) -> { 143 var exc = new AtomicReference<Throwable>(); 144 try { 145 Thread.ofVirtual().start(() -> { 146 try { 147 task.run(); 148 fail(); 149 } catch (Throwable e) { 150 exc.set(e); 151 } 152 }).join(); 153 } catch (InterruptedException e) { 154 fail(); 155 } 156 assertTrue(exc.get() instanceof WrongThreadException); 157 }; 158 ThreadFactory factory = VThreadScheduler.virtualThreadFactory(scheduler); 159 Thread thread = factory.newThread(LockSupport::park); 160 thread.start(); 161 } 162 163 /** 164 * Test parking with the virtual thread interrupt set, should not leak to the 165 * carrier thread when the task completes. 166 */ 167 @Test 168 void testParkWithInterruptSet() { 169 Thread carrier = Thread.currentThread(); 170 assumeFalse(carrier.isVirtual(), "Main thread is a virtual thread"); 171 try { 172 ThreadFactory factory = VThreadScheduler.virtualThreadFactory(Runnable::run); 173 Thread vthread = factory.newThread(() -> { 174 Thread.currentThread().interrupt(); 175 Thread.yield(); 176 }); 177 vthread.start(); 178 assertTrue(vthread.isInterrupted()); 179 assertFalse(carrier.isInterrupted()); 180 } finally { 181 Thread.interrupted(); 182 } 183 } 184 185 /** 186 * Test terminating with the virtual thread interrupt set, should not leak to 187 * the carrier thread when the task completes. 188 */ 189 @Test 190 void testTerminateWithInterruptSet() { 191 Thread carrier = Thread.currentThread(); 192 assumeFalse(carrier.isVirtual(), "Main thread is a virtual thread"); 193 try { 194 ThreadFactory factory = VThreadScheduler.virtualThreadFactory(Runnable::run); 195 Thread vthread = factory.newThread(() -> { 196 Thread.currentThread().interrupt(); 197 }); 198 vthread.start(); 199 assertTrue(vthread.isInterrupted()); 200 assertFalse(carrier.isInterrupted()); 201 } finally { 202 Thread.interrupted(); 203 } 204 } 205 206 /** 207 * Test running task with the carrier interrupt status set. 208 */ 209 @Test 210 void testRunWithInterruptSet() throws Exception { 211 assumeFalse(Thread.currentThread().isVirtual(), "Main thread is a virtual thread"); 212 Executor scheduler = (task) -> { 213 Thread.currentThread().interrupt(); 214 task.run(); 215 }; 216 ThreadFactory factory = VThreadScheduler.virtualThreadFactory(scheduler); 217 try { 218 AtomicBoolean interrupted = new AtomicBoolean(); 219 Thread vthread = factory.newThread(() -> { 220 interrupted.set(Thread.currentThread().isInterrupted()); 221 }); 222 vthread.start(); 223 assertFalse(vthread.isInterrupted()); 224 } finally { 225 Thread.interrupted(); 226 } 227 } 228 229 /** 230 * Test custom scheduler throwing OOME when starting a thread. 231 */ 232 @Test 233 void testThreadStartOOME() throws Exception { 234 Executor scheduler = task -> { 235 System.err.println("OutOfMemoryError"); 236 throw new OutOfMemoryError(); 237 }; 238 ThreadFactory factory = VThreadScheduler.virtualThreadFactory(scheduler); 239 Thread thread = factory.newThread(() -> { }); 240 assertThrows(OutOfMemoryError.class, thread::start); 241 } 242 243 /** 244 * Test custom scheduler throwing OOME when unparking a thread. 245 */ 246 @Test 247 void testThreadUnparkOOME() throws Exception { 248 try (ExecutorService executor = Executors.newFixedThreadPool(1)) { 249 AtomicInteger counter = new AtomicInteger(); 250 Executor scheduler = task -> { 251 switch (counter.getAndIncrement()) { 252 case 0 -> executor.execute(task); // Thread.start 253 case 1, 2 -> { // unpark attempt 1+2 254 System.err.println("OutOfMemoryError"); 255 throw new OutOfMemoryError(); 256 } 257 default -> executor.execute(task); 258 } 259 executor.execute(task); 260 }; 261 262 // start thread and wait for it to park 263 ThreadFactory factory = VThreadScheduler.virtualThreadFactory(scheduler); 264 var thread = factory.newThread(LockSupport::park); 265 thread.start(); 266 await(thread, Thread.State.WAITING); 267 268 // unpark thread, this should retry until OOME is not thrown 269 LockSupport.unpark(thread); 270 thread.join(); 271 } 272 273 } 274 275 /** 276 * Waits for the given thread to reach a given state. 277 */ 278 private void await(Thread thread, Thread.State expectedState) throws InterruptedException { 279 Thread.State state = thread.getState(); 280 while (state != expectedState) { 281 assertTrue(state != Thread.State.TERMINATED, "Thread has terminated"); 282 Thread.sleep(10); 283 state = thread.getState(); 284 } 285 } 286 }