1 /* 2 * Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /** 25 * @test 26 * @summary Basic test for JFR jdk.VirtualThreadXXX events 27 * @requires vm.continuations 28 * @modules jdk.jfr java.base/java.lang:+open 29 * @library /test/lib 30 * @run junit/othervm --enable-native-access=ALL-UNNAMED JfrEvents 31 */ 32 33 import java.io.IOException; 34 import java.nio.file.Path; 35 import java.util.List; 36 import java.util.Map; 37 import java.util.concurrent.Executor; 38 import java.util.concurrent.ExecutorService; 39 import java.util.concurrent.Executors; 40 import java.util.concurrent.RejectedExecutionException; 41 import java.util.concurrent.ThreadFactory; 42 import java.util.concurrent.atomic.AtomicBoolean; 43 import java.util.concurrent.atomic.AtomicReference; 44 import java.util.concurrent.locks.LockSupport; 45 import java.util.function.Consumer; 46 import java.util.stream.Collectors; 47 import java.util.stream.Stream; 48 49 import jdk.jfr.EventType; 50 import jdk.jfr.Recording; 51 import jdk.jfr.consumer.RecordedEvent; 52 import jdk.jfr.consumer.RecordingFile; 53 54 import jdk.test.lib.thread.VThreadPinner; 55 import jdk.test.lib.thread.VThreadRunner.ThrowingRunnable; 56 import org.junit.jupiter.api.Test; 57 import org.junit.jupiter.params.ParameterizedTest; 58 import org.junit.jupiter.params.provider.Arguments; 59 import org.junit.jupiter.params.provider.MethodSource; 60 import static org.junit.jupiter.api.Assertions.*; 61 62 class JfrEvents { 63 64 /** 65 * Test jdk.VirtualThreadStart and jdk.VirtualThreadEnd events. 66 */ 67 @Test 68 void testVirtualThreadStartAndEnd() throws Exception { 69 try (Recording recording = new Recording()) { 70 recording.enable("jdk.VirtualThreadStart"); 71 recording.enable("jdk.VirtualThreadEnd"); 72 73 // execute 100 tasks, each in their own virtual thread 74 recording.start(); 75 ThreadFactory factory = Thread.ofVirtual().factory(); 76 try (var executor = Executors.newThreadPerTaskExecutor(factory)) { 77 for (int i = 0; i < 100; i++) { 78 executor.submit(() -> { }); 79 } 80 Thread.sleep(1000); // give time for thread end events to be recorded 81 } finally { 82 recording.stop(); 83 } 84 85 Map<String, Integer> events = sumEvents(recording); 86 System.err.println(events); 87 88 int startCount = events.getOrDefault("jdk.VirtualThreadStart", 0); 89 int endCount = events.getOrDefault("jdk.VirtualThreadEnd", 0); 90 assertEquals(100, startCount); 91 assertEquals(100, endCount); 92 } 93 } 94 95 /** 96 * Arguments for testVirtualThreadPinned to test jdk.VirtualThreadPinned event. 97 * [0] label/description 98 * [1] the operation to park/wait 99 * [2] the Thread.State when parked/waiting 100 * [3] the action to unpark/notify the thread 101 */ 102 static Stream<Arguments> pinnedCases() { 103 Object lock = new Object(); 104 105 // park with native frame on stack 106 var finish1 = new AtomicBoolean(); 107 var parkWhenPinned = Arguments.of( 108 "LockSupport.park when pinned", 109 (ThrowingRunnable<Exception>) () -> { 110 VThreadPinner.runPinned(() -> { 111 while (!finish1.get()) { 112 LockSupport.park(); 113 } 114 }); 115 }, 116 Thread.State.WAITING, 117 (Consumer<Thread>) t -> { 118 finish1.set(true); 119 LockSupport.unpark(t); 120 } 121 ); 122 123 // timed park with native frame on stack 124 var finish2 = new AtomicBoolean(); 125 var timedParkWhenPinned = Arguments.of( 126 "LockSupport.parkNanos when pinned", 127 (ThrowingRunnable<Exception>) () -> { 128 VThreadPinner.runPinned(() -> { 129 while (!finish2.get()) { 130 LockSupport.parkNanos(Long.MAX_VALUE); 131 } 132 }); 133 }, 134 Thread.State.TIMED_WAITING, 135 (Consumer<Thread>) t -> { 136 finish2.set(true); 137 LockSupport.unpark(t); 138 } 139 ); 140 141 // block on monitorenter with native frame on stack 142 var blockWhenPinned = Arguments.of( 143 "Contended monitorenter when pinned", 144 (ThrowingRunnable<Exception>) () -> { 145 var readyToEnter = new AtomicBoolean(); 146 var thread = Thread.ofVirtual().unstarted(() -> { 147 VThreadPinner.runPinned(() -> { 148 readyToEnter.set(true); 149 synchronized (lock) { } 150 }); 151 }); 152 try { 153 synchronized (lock) { 154 thread.start(); 155 while (!readyToEnter.get()) { 156 Thread.sleep(10); 157 } 158 await(thread, Thread.State.BLOCKED); 159 } 160 } finally { 161 thread.join(); 162 } 163 }, 164 Thread.State.TERMINATED, 165 (Consumer<Thread>) t -> { } 166 ); 167 168 // untimed Object.wait 169 var waitWhenPinned = Arguments.of( 170 "Object.wait", 171 (ThrowingRunnable<Exception>) () -> { 172 synchronized (lock) { 173 lock.wait(); 174 } 175 }, 176 Thread.State.WAITING, 177 (Consumer<Thread>) t -> { 178 synchronized (lock) { 179 lock.notifyAll(); 180 } 181 } 182 ); 183 184 // timed Object.wait 185 var timedWaitWhenPinned = Arguments.of( 186 "Object.wait(millis)", 187 (ThrowingRunnable<Exception>) () -> { 188 synchronized (lock) { 189 lock.wait(Long.MAX_VALUE); 190 } 191 }, 192 Thread.State.TIMED_WAITING, 193 (Consumer<Thread>) t -> { 194 synchronized (lock) { 195 lock.notifyAll(); 196 } 197 } 198 ); 199 200 return Stream.of(parkWhenPinned, 201 timedParkWhenPinned, 202 blockWhenPinned, 203 waitWhenPinned, 204 timedWaitWhenPinned); 205 } 206 207 /** 208 * Test jdk.VirtualThreadPinned event. 209 */ 210 @ParameterizedTest 211 @MethodSource("pinnedCases") 212 void testVirtualThreadPinned(String label, 213 ThrowingRunnable<Exception> parker, 214 Thread.State expectedState, 215 Consumer<Thread> unparker) throws Exception { 216 217 try (Recording recording = new Recording()) { 218 recording.enable("jdk.VirtualThreadPinned"); 219 220 recording.start(); 221 try { 222 var exception = new AtomicReference<Throwable>(); 223 var thread = Thread.ofVirtual().start(() -> { 224 try { 225 parker.run(); 226 } catch (Throwable e) { 227 exception.set(e); 228 } 229 }); 230 try { 231 // wait for thread to park/wait 232 await(thread, expectedState); 233 } finally { 234 unparker.accept(thread); 235 thread.join(); 236 assertNull(exception.get()); 237 } 238 } finally { 239 recording.stop(); 240 } 241 242 Map<String, Integer> events = sumEvents(recording); 243 System.err.println(events); 244 245 // should have at least one pinned event 246 int pinnedCount = events.getOrDefault("jdk.VirtualThreadPinned", 0); 247 assertTrue(pinnedCount >= 1, "Expected one or more events"); 248 } 249 } 250 251 /** 252 * Test jdk.VirtualThreadSubmitFailed event. 253 */ 254 @Test 255 void testVirtualThreadSubmitFailed() throws Exception { 256 try (Recording recording = new Recording()) { 257 recording.enable("jdk.VirtualThreadSubmitFailed"); 258 259 recording.start(); 260 try (ExecutorService pool = Executors.newCachedThreadPool()) { 261 Executor scheduler = task -> pool.execute(task); 262 263 // create virtual thread that uses custom scheduler 264 ThreadFactory factory = ThreadBuilders.virtualThreadBuilder(scheduler).factory(); 265 266 // start a thread 267 Thread thread = factory.newThread(LockSupport::park); 268 thread.start(); 269 270 // wait for thread to park 271 await(thread, Thread.State.WAITING); 272 273 // shutdown scheduler 274 pool.shutdown(); 275 276 // unpark, the submit should fail 277 try { 278 LockSupport.unpark(thread); 279 fail(); 280 } catch (RejectedExecutionException expected) { } 281 282 // start another thread, it should fail and an event should be recorded 283 try { 284 factory.newThread(LockSupport::park).start(); 285 throw new RuntimeException("RejectedExecutionException expected"); 286 } catch (RejectedExecutionException expected) { } 287 } finally { 288 recording.stop(); 289 } 290 291 Map<String, Integer> events = sumEvents(recording); 292 System.err.println(events); 293 294 int count = events.getOrDefault("jdk.VirtualThreadSubmitFailed", 0); 295 assertEquals(2, count); 296 } 297 } 298 299 /** 300 * Read the events from the recording and return a map of event name to count. 301 */ 302 private static Map<String, Integer> sumEvents(Recording recording) throws IOException { 303 Path recordingFile = recordingFile(recording); 304 List<RecordedEvent> events = RecordingFile.readAllEvents(recordingFile); 305 return events.stream() 306 .map(RecordedEvent::getEventType) 307 .collect(Collectors.groupingBy(EventType::getName, 308 Collectors.summingInt(x -> 1))); 309 } 310 311 /** 312 * Return the file path to the recording file. 313 */ 314 private static Path recordingFile(Recording recording) throws IOException { 315 Path recordingFile = recording.getDestination(); 316 if (recordingFile == null) { 317 ProcessHandle h = ProcessHandle.current(); 318 recordingFile = Path.of("recording-" + recording.getId() + "-pid" + h.pid() + ".jfr"); 319 recording.dump(recordingFile); 320 } 321 return recordingFile; 322 } 323 324 /** 325 * Waits for the given thread to reach a given state. 326 */ 327 private static void await(Thread thread, Thread.State expectedState) throws InterruptedException { 328 Thread.State state = thread.getState(); 329 while (state != expectedState) { 330 Thread.sleep(10); 331 state = thread.getState(); 332 } 333 } 334 }