1 /*
  2  * Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /**
 25  * @test
 26  * @summary Basic test for JFR jdk.VirtualThreadXXX events
 27  * @requires vm.continuations
 28  * @modules jdk.jfr java.base/java.lang:+open
 29  * @library /test/lib
 30  * @run junit/othervm --enable-native-access=ALL-UNNAMED JfrEvents
 31  */
 32 
 33 import java.io.IOException;
 34 import java.nio.file.Path;
 35 import java.util.List;
 36 import java.util.Map;
 37 import java.util.concurrent.Executor;
 38 import java.util.concurrent.ExecutorService;
 39 import java.util.concurrent.Executors;
 40 import java.util.concurrent.RejectedExecutionException;
 41 import java.util.concurrent.ThreadFactory;
 42 import java.util.concurrent.atomic.AtomicBoolean;
 43 import java.util.concurrent.atomic.AtomicReference;
 44 import java.util.concurrent.locks.LockSupport;
 45 import java.util.function.Consumer;
 46 import java.util.stream.Collectors;
 47 import java.util.stream.Stream;
 48 
 49 import jdk.jfr.EventType;
 50 import jdk.jfr.Recording;
 51 import jdk.jfr.consumer.RecordedEvent;
 52 import jdk.jfr.consumer.RecordingFile;
 53 
 54 import jdk.test.lib.thread.VThreadPinner;
 55 import jdk.test.lib.thread.VThreadRunner.ThrowingRunnable;
 56 import org.junit.jupiter.api.Test;
 57 import org.junit.jupiter.params.ParameterizedTest;
 58 import org.junit.jupiter.params.provider.Arguments;
 59 import org.junit.jupiter.params.provider.MethodSource;
 60 import static org.junit.jupiter.api.Assertions.*;
 61 
 62 class JfrEvents {
 63 
 64     /**
 65      * Test jdk.VirtualThreadStart and jdk.VirtualThreadEnd events.
 66      */
 67     @Test
 68     void testVirtualThreadStartAndEnd() throws Exception {
 69         try (Recording recording = new Recording()) {
 70             recording.enable("jdk.VirtualThreadStart");
 71             recording.enable("jdk.VirtualThreadEnd");
 72 
 73             // execute 100 tasks, each in their own virtual thread
 74             recording.start();
 75             ThreadFactory factory = Thread.ofVirtual().factory();
 76             try (var executor = Executors.newThreadPerTaskExecutor(factory)) {
 77                 for (int i = 0; i < 100; i++) {
 78                     executor.submit(() -> { });
 79                 }
 80                 Thread.sleep(1000); // give time for thread end events to be recorded
 81             } finally {
 82                 recording.stop();
 83             }
 84 
 85             Map<String, Integer> events = sumEvents(recording);
 86             System.err.println(events);
 87 
 88             int startCount = events.getOrDefault("jdk.VirtualThreadStart", 0);
 89             int endCount = events.getOrDefault("jdk.VirtualThreadEnd", 0);
 90             assertEquals(100, startCount);
 91             assertEquals(100, endCount);
 92         }
 93     }
 94 
 95     /**
 96      * Arguments for testVirtualThreadPinned to test jdk.VirtualThreadPinned event.
 97      *   [0] label/description
 98      *   [1] the operation to park/wait
 99      *   [2] the Thread.State when parked/waiting
100      *   [3] the action to unpark/notify the thread
101      */
102     static Stream<Arguments> pinnedCases() {
103         Object lock = new Object();
104 
105         // park with native frame on stack
106         var finish1 = new AtomicBoolean();
107         var parkWhenPinned = Arguments.of(
108             "LockSupport.park when pinned",
109             (ThrowingRunnable<Exception>) () -> {
110                 VThreadPinner.runPinned(() -> {
111                     while (!finish1.get()) {
112                         LockSupport.park();
113                     }
114                 });
115             },
116             Thread.State.WAITING,
117                 (Consumer<Thread>) t -> {
118                     finish1.set(true);
119                     LockSupport.unpark(t);
120                 }
121         );
122 
123         // timed park with native frame on stack
124         var finish2 = new AtomicBoolean();
125         var timedParkWhenPinned = Arguments.of(
126             "LockSupport.parkNanos when pinned",
127             (ThrowingRunnable<Exception>) () -> {
128                 VThreadPinner.runPinned(() -> {
129                     while (!finish2.get()) {
130                         LockSupport.parkNanos(Long.MAX_VALUE);
131                     }
132                 });
133             },
134             Thread.State.TIMED_WAITING,
135             (Consumer<Thread>) t -> {
136                 finish2.set(true);
137                 LockSupport.unpark(t);
138             }
139         );
140 
141         // block on monitorenter with native frame on stack
142         var blockWhenPinned = Arguments.of(
143             "Contended monitorenter when pinned",
144             (ThrowingRunnable<Exception>) () -> {
145                 var readyToEnter = new AtomicBoolean();
146                 var thread = Thread.ofVirtual().unstarted(() -> {
147                     VThreadPinner.runPinned(() -> {
148                         readyToEnter.set(true);
149                         synchronized (lock) { }
150                     });
151                 });
152                 try {
153                     synchronized (lock) {
154                         thread.start();
155                         while (!readyToEnter.get()) {
156                             Thread.sleep(10);
157                         }
158                         await(thread, Thread.State.BLOCKED);
159                     }
160                 } finally {
161                     thread.join();
162                 }
163             },
164             Thread.State.TERMINATED,
165             (Consumer<Thread>) t -> { }
166         );
167 
168         // untimed Object.wait
169         var waitWhenPinned = Arguments.of(
170             "Object.wait",
171             (ThrowingRunnable<Exception>) () -> {
172                 synchronized (lock) {
173                     lock.wait();
174                 }
175             },
176             Thread.State.WAITING,
177             (Consumer<Thread>) t -> {
178                 synchronized (lock) {
179                     lock.notifyAll();
180                 }
181             }
182         );
183 
184         // timed Object.wait
185         var timedWaitWhenPinned = Arguments.of(
186             "Object.wait(millis)",
187             (ThrowingRunnable<Exception>) () -> {
188                 synchronized (lock) {
189                     lock.wait(Long.MAX_VALUE);
190                 }
191             },
192             Thread.State.TIMED_WAITING,
193             (Consumer<Thread>) t -> {
194                 synchronized (lock) {
195                     lock.notifyAll();
196                 }
197             }
198         );
199 
200         return Stream.of(parkWhenPinned,
201                 timedParkWhenPinned,
202                 blockWhenPinned,
203                 waitWhenPinned,
204                 timedWaitWhenPinned);
205     }
206 
207     /**
208      * Test jdk.VirtualThreadPinned event.
209      */
210     @ParameterizedTest
211     @MethodSource("pinnedCases")
212     void testVirtualThreadPinned(String label,
213                                  ThrowingRunnable<Exception> parker,
214                                  Thread.State expectedState,
215                                  Consumer<Thread> unparker) throws Exception {
216 
217         try (Recording recording = new Recording()) {
218             recording.enable("jdk.VirtualThreadPinned");
219 
220             recording.start();
221             try {
222                 var exception = new AtomicReference<Throwable>();
223                 var thread = Thread.ofVirtual().start(() -> {
224                     try {
225                         parker.run();
226                     } catch (Throwable e) {
227                         exception.set(e);
228                     }
229                 });
230                 try {
231                     // wait for thread to park/wait
232                     await(thread, expectedState);
233                 } finally {
234                     unparker.accept(thread);
235                     thread.join();
236                     assertNull(exception.get());
237                 }
238             } finally {
239                 recording.stop();
240             }
241 
242             Map<String, Integer> events = sumEvents(recording);
243             System.err.println(events);
244 
245             // should have at least one pinned event
246             int pinnedCount = events.getOrDefault("jdk.VirtualThreadPinned", 0);
247             assertTrue(pinnedCount >= 1, "Expected one or more events");
248         }
249     }
250 
251     /**
252      * Test jdk.VirtualThreadSubmitFailed event.
253      */
254     @Test
255     void testVirtualThreadSubmitFailed() throws Exception {
256         try (Recording recording = new Recording()) {
257             recording.enable("jdk.VirtualThreadSubmitFailed");
258 
259             recording.start();
260             try (ExecutorService pool = Executors.newCachedThreadPool()) {
261                 Executor scheduler = task -> pool.execute(task);
262 
263                 // create virtual thread that uses custom scheduler
264                 ThreadFactory factory = ThreadBuilders.virtualThreadBuilder(scheduler).factory();
265 
266                 // start a thread
267                 Thread thread = factory.newThread(LockSupport::park);
268                 thread.start();
269 
270                 // wait for thread to park
271                 await(thread, Thread.State.WAITING);
272 
273                 // shutdown scheduler
274                 pool.shutdown();
275 
276                 // unpark, the submit should fail
277                 try {
278                     LockSupport.unpark(thread);
279                     fail();
280                 } catch (RejectedExecutionException expected) { }
281 
282                 // start another thread, it should fail and an event should be recorded
283                 try {
284                     factory.newThread(LockSupport::park).start();
285                     throw new RuntimeException("RejectedExecutionException expected");
286                 } catch (RejectedExecutionException expected) { }
287             } finally {
288                 recording.stop();
289             }
290 
291             Map<String, Integer> events = sumEvents(recording);
292             System.err.println(events);
293 
294             int count = events.getOrDefault("jdk.VirtualThreadSubmitFailed", 0);
295             assertEquals(2, count);
296         }
297     }
298 
299     /**
300      * Read the events from the recording and return a map of event name to count.
301      */
302     private static Map<String, Integer> sumEvents(Recording recording) throws IOException {
303         Path recordingFile = recordingFile(recording);
304         List<RecordedEvent> events = RecordingFile.readAllEvents(recordingFile);
305         return events.stream()
306                 .map(RecordedEvent::getEventType)
307                 .collect(Collectors.groupingBy(EventType::getName,
308                                                Collectors.summingInt(x -> 1)));
309     }
310 
311     /**
312      * Return the file path to the recording file.
313      */
314     private static Path recordingFile(Recording recording) throws IOException {
315         Path recordingFile = recording.getDestination();
316         if (recordingFile == null) {
317             ProcessHandle h = ProcessHandle.current();
318             recordingFile = Path.of("recording-" + recording.getId() + "-pid" + h.pid() + ".jfr");
319             recording.dump(recordingFile);
320         }
321         return recordingFile;
322     }
323 
324     /**
325      * Waits for the given thread to reach a given state.
326      */
327     private static void await(Thread thread, Thread.State expectedState) throws InterruptedException {
328         Thread.State state = thread.getState();
329         while (state != expectedState) {
330             Thread.sleep(10);
331             state = thread.getState();
332         }
333     }
334 }