121 );
122
123 // timed park with native frame on stack
124 var finish2 = new AtomicBoolean();
125 var timedParkWhenPinned = Arguments.of(
126 "LockSupport.parkNanos when pinned",
127 (ThrowingRunnable<Exception>) () -> {
128 VThreadPinner.runPinned(() -> {
129 while (!finish2.get()) {
130 LockSupport.parkNanos(Long.MAX_VALUE);
131 }
132 });
133 },
134 Thread.State.TIMED_WAITING,
135 (Consumer<Thread>) t -> {
136 finish2.set(true);
137 LockSupport.unpark(t);
138 }
139 );
140
141 return Stream.of(parkWhenPinned, timedParkWhenPinned);
142 }
143
144 /**
145 * Test jdk.VirtualThreadPinned event.
146 */
147 @ParameterizedTest
148 @MethodSource("pinnedCases")
149 void testVirtualThreadPinned(String label,
150 ThrowingRunnable<Exception> parker,
151 Thread.State expectedState,
152 Consumer<Thread> unparker) throws Exception {
153
154 try (Recording recording = new Recording()) {
155 recording.enable("jdk.VirtualThreadPinned");
156
157 recording.start();
158 try {
159 var exception = new AtomicReference<Throwable>();
160 var thread = Thread.ofVirtual().start(() -> {
161 try {
162 parker.run();
163 } catch (Throwable e) {
164 exception.set(e);
165 }
166 });
167 try {
168 // wait for thread to park/wait
169 Thread.State state = thread.getState();
170 while (state != expectedState) {
171 assertTrue(state != Thread.State.TERMINATED, thread.toString());
172 Thread.sleep(10);
173 state = thread.getState();
174 }
175 } finally {
176 unparker.accept(thread);
177 thread.join();
178 assertNull(exception.get());
179 }
180 } finally {
181 recording.stop();
182 }
183
184 Map<String, Integer> events = sumEvents(recording);
185 System.err.println(events);
186
187 // should have at least one pinned event
188 int pinnedCount = events.getOrDefault("jdk.VirtualThreadPinned", 0);
189 assertTrue(pinnedCount >= 1, "Expected one or more events");
190 }
191 }
192
193 /**
194 * Test jdk.VirtualThreadSubmitFailed event.
195 */
196 @Test
197 void testVirtualThreadSubmitFailed() throws Exception {
198 try (Recording recording = new Recording()) {
199 recording.enable("jdk.VirtualThreadSubmitFailed");
200
201 recording.start();
202 try (ExecutorService pool = Executors.newCachedThreadPool()) {
203 Executor scheduler = task -> pool.execute(task);
204
205 // create virtual thread that uses custom scheduler
206 ThreadFactory factory = ThreadBuilders.virtualThreadBuilder(scheduler).factory();
207
208 // start a thread
209 Thread thread = factory.newThread(LockSupport::park);
210 thread.start();
211
212 // wait for thread to park
213 while (thread.getState() != Thread.State.WAITING) {
214 Thread.sleep(10);
215 }
216
217 // shutdown scheduler
218 pool.shutdown();
219
220 // unpark, the submit should fail
221 try {
222 LockSupport.unpark(thread);
223 fail();
224 } catch (RejectedExecutionException expected) { }
225
226 // start another thread, it should fail and an event should be recorded
227 try {
228 factory.newThread(LockSupport::park).start();
229 throw new RuntimeException("RejectedExecutionException expected");
230 } catch (RejectedExecutionException expected) { }
231 } finally {
232 recording.stop();
233 }
234
235 Map<String, Integer> events = sumEvents(recording);
247 Path recordingFile = recordingFile(recording);
248 List<RecordedEvent> events = RecordingFile.readAllEvents(recordingFile);
249 return events.stream()
250 .map(RecordedEvent::getEventType)
251 .collect(Collectors.groupingBy(EventType::getName,
252 Collectors.summingInt(x -> 1)));
253 }
254
255 /**
256 * Return the file path to the recording file.
257 */
258 private static Path recordingFile(Recording recording) throws IOException {
259 Path recordingFile = recording.getDestination();
260 if (recordingFile == null) {
261 ProcessHandle h = ProcessHandle.current();
262 recordingFile = Path.of("recording-" + recording.getId() + "-pid" + h.pid() + ".jfr");
263 recording.dump(recordingFile);
264 }
265 return recordingFile;
266 }
267 }
|
121 );
122
123 // timed park with native frame on stack
124 var finish2 = new AtomicBoolean();
125 var timedParkWhenPinned = Arguments.of(
126 "LockSupport.parkNanos when pinned",
127 (ThrowingRunnable<Exception>) () -> {
128 VThreadPinner.runPinned(() -> {
129 while (!finish2.get()) {
130 LockSupport.parkNanos(Long.MAX_VALUE);
131 }
132 });
133 },
134 Thread.State.TIMED_WAITING,
135 (Consumer<Thread>) t -> {
136 finish2.set(true);
137 LockSupport.unpark(t);
138 }
139 );
140
141 // block on monitorenter with native frame on stack
142 var blockWhenPinned = Arguments.of(
143 "Contended monitorenter when pinned",
144 (ThrowingRunnable<Exception>) () -> {
145 var readyToEnter = new AtomicBoolean();
146 var thread = Thread.ofVirtual().unstarted(() -> {
147 VThreadPinner.runPinned(() -> {
148 readyToEnter.set(true);
149 synchronized (lock) { }
150 });
151 });
152 try {
153 synchronized (lock) {
154 thread.start();
155 while (!readyToEnter.get()) {
156 Thread.sleep(10);
157 }
158 await(thread, Thread.State.BLOCKED);
159 }
160 } finally {
161 thread.join();
162 }
163 },
164 Thread.State.TERMINATED,
165 (Consumer<Thread>) t -> { }
166 );
167
168 // untimed Object.wait
169 var waitWhenPinned = Arguments.of(
170 "Object.wait",
171 (ThrowingRunnable<Exception>) () -> {
172 synchronized (lock) {
173 lock.wait();
174 }
175 },
176 Thread.State.WAITING,
177 (Consumer<Thread>) t -> {
178 synchronized (lock) {
179 lock.notifyAll();
180 }
181 }
182 );
183
184 // timed Object.wait
185 var timedWaitWhenPinned = Arguments.of(
186 "Object.wait(millis)",
187 (ThrowingRunnable<Exception>) () -> {
188 synchronized (lock) {
189 lock.wait(Long.MAX_VALUE);
190 }
191 },
192 Thread.State.TIMED_WAITING,
193 (Consumer<Thread>) t -> {
194 synchronized (lock) {
195 lock.notifyAll();
196 }
197 }
198 );
199
200 return Stream.of(parkWhenPinned,
201 timedParkWhenPinned,
202 blockWhenPinned,
203 waitWhenPinned,
204 timedWaitWhenPinned);
205 }
206
207 /**
208 * Test jdk.VirtualThreadPinned event.
209 */
210 @ParameterizedTest
211 @MethodSource("pinnedCases")
212 void testVirtualThreadPinned(String label,
213 ThrowingRunnable<Exception> parker,
214 Thread.State expectedState,
215 Consumer<Thread> unparker) throws Exception {
216
217 try (Recording recording = new Recording()) {
218 recording.enable("jdk.VirtualThreadPinned");
219
220 recording.start();
221 try {
222 var exception = new AtomicReference<Throwable>();
223 var thread = Thread.ofVirtual().start(() -> {
224 try {
225 parker.run();
226 } catch (Throwable e) {
227 exception.set(e);
228 }
229 });
230 try {
231 // wait for thread to park/wait
232 await(thread, expectedState);
233 } finally {
234 unparker.accept(thread);
235 thread.join();
236 assertNull(exception.get());
237 }
238 } finally {
239 recording.stop();
240 }
241
242 Map<String, Integer> events = sumEvents(recording);
243 System.err.println(events);
244
245 // should have at least one pinned event
246 int pinnedCount = events.getOrDefault("jdk.VirtualThreadPinned", 0);
247 assertTrue(pinnedCount >= 1, "Expected one or more events");
248 }
249 }
250
251 /**
252 * Test jdk.VirtualThreadSubmitFailed event.
253 */
254 @Test
255 void testVirtualThreadSubmitFailed() throws Exception {
256 try (Recording recording = new Recording()) {
257 recording.enable("jdk.VirtualThreadSubmitFailed");
258
259 recording.start();
260 try (ExecutorService pool = Executors.newCachedThreadPool()) {
261 Executor scheduler = task -> pool.execute(task);
262
263 // create virtual thread that uses custom scheduler
264 ThreadFactory factory = ThreadBuilders.virtualThreadBuilder(scheduler).factory();
265
266 // start a thread
267 Thread thread = factory.newThread(LockSupport::park);
268 thread.start();
269
270 // wait for thread to park
271 await(thread, Thread.State.WAITING);
272
273 // shutdown scheduler
274 pool.shutdown();
275
276 // unpark, the submit should fail
277 try {
278 LockSupport.unpark(thread);
279 fail();
280 } catch (RejectedExecutionException expected) { }
281
282 // start another thread, it should fail and an event should be recorded
283 try {
284 factory.newThread(LockSupport::park).start();
285 throw new RuntimeException("RejectedExecutionException expected");
286 } catch (RejectedExecutionException expected) { }
287 } finally {
288 recording.stop();
289 }
290
291 Map<String, Integer> events = sumEvents(recording);
303 Path recordingFile = recordingFile(recording);
304 List<RecordedEvent> events = RecordingFile.readAllEvents(recordingFile);
305 return events.stream()
306 .map(RecordedEvent::getEventType)
307 .collect(Collectors.groupingBy(EventType::getName,
308 Collectors.summingInt(x -> 1)));
309 }
310
311 /**
312 * Return the file path to the recording file.
313 */
314 private static Path recordingFile(Recording recording) throws IOException {
315 Path recordingFile = recording.getDestination();
316 if (recordingFile == null) {
317 ProcessHandle h = ProcessHandle.current();
318 recordingFile = Path.of("recording-" + recording.getId() + "-pid" + h.pid() + ".jfr");
319 recording.dump(recordingFile);
320 }
321 return recordingFile;
322 }
323
324 /**
325 * Waits for the given thread to reach a given state.
326 */
327 private static void await(Thread thread, Thread.State expectedState) throws InterruptedException {
328 Thread.State state = thread.getState();
329 while (state != expectedState) {
330 Thread.sleep(10);
331 state = thread.getState();
332 }
333 }
334 }
|