1 /* 2 * Copyright (c) 2019, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test id=default 26 * @summary Test virtual threads using Object.wait/notifyAll 27 * @modules java.base/java.lang:+open jdk.management 28 * @library /test/lib 29 * @run junit/othervm --enable-native-access=ALL-UNNAMED MonitorWaitNotify 30 */ 31 32 import java.util.ArrayList; 33 import java.util.List; 34 import java.util.Set; 35 import java.util.concurrent.CountDownLatch; 36 import java.util.concurrent.Executors; 37 import java.util.concurrent.ExecutorService; 38 import java.util.concurrent.ThreadFactory; 39 import java.util.concurrent.TimeUnit; 40 import java.util.concurrent.atomic.AtomicBoolean; 41 import java.util.concurrent.atomic.AtomicInteger; 42 import java.util.concurrent.atomic.AtomicReference; 43 import java.util.concurrent.locks.LockSupport; 44 import java.util.stream.IntStream; 45 import java.util.stream.Stream; 46 import java.util.stream.Collectors; 47 48 import jdk.test.lib.thread.VThreadScheduler; 49 import jdk.test.lib.thread.VThreadRunner; // ensureParallelism requires jdk.management 50 import jdk.test.lib.thread.VThreadPinner; 51 import org.junit.jupiter.api.Test; 52 import org.junit.jupiter.api.BeforeAll; 53 import org.junit.jupiter.params.ParameterizedTest; 54 import org.junit.jupiter.params.provider.ValueSource; 55 import static org.junit.jupiter.api.Assertions.*; 56 import static org.junit.jupiter.api.Assumptions.*; 57 58 class MonitorWaitNotify { 59 60 @BeforeAll 61 static void setup() { 62 // need >=2 carriers for testing pinning 63 VThreadRunner.ensureParallelism(2); 64 } 65 66 /** 67 * Test virtual thread waits, notified by platform thread. 68 */ 69 @ParameterizedTest 70 @ValueSource(booleans = { true, false }) 71 void testWaitNotify1(boolean pinned) throws Exception { 72 var lock = new Object(); 73 var ready = new AtomicBoolean(); 74 var thread = Thread.ofVirtual().start(() -> { 75 synchronized (lock) { 76 try { 77 if (pinned) { 78 VThreadPinner.runPinned(() -> { 79 ready.set(true); 80 lock.wait(); 81 }); 82 } else { 83 ready.set(true); 84 lock.wait(); 85 } 86 } catch (InterruptedException e) { } 87 } 88 }); 89 awaitTrue(ready); 90 91 // notify, thread should block waiting to reenter 92 synchronized (lock) { 93 lock.notifyAll(); 94 await(thread, Thread.State.BLOCKED); 95 } 96 thread.join(); 97 } 98 99 /** 100 * Test platform thread waits, notified by virtual thread. 101 */ 102 @Test 103 void testWaitNotify2() throws Exception { 104 var lock = new Object(); 105 var thread = Thread.ofVirtual().unstarted(() -> { 106 synchronized (lock) { 107 lock.notifyAll(); 108 } 109 }); 110 synchronized (lock) { 111 thread.start(); 112 lock.wait(); 113 } 114 thread.join(); 115 } 116 117 /** 118 * Test virtual thread waits, notified by another virtual thread. 119 */ 120 @ParameterizedTest 121 @ValueSource(booleans = { true, false }) 122 void testWaitNotify3(boolean pinned) throws Exception { 123 var lock = new Object(); 124 var ready = new AtomicBoolean(); 125 var thread1 = Thread.ofVirtual().start(() -> { 126 synchronized (lock) { 127 try { 128 if (pinned) { 129 VThreadPinner.runPinned(() -> { 130 ready.set(true); 131 lock.wait(); 132 }); 133 } else { 134 ready.set(true); 135 lock.wait(); 136 } 137 } catch (InterruptedException e) { 138 e.printStackTrace(); 139 } 140 } 141 }); 142 var thread2 = Thread.ofVirtual().start(() -> { 143 try { 144 awaitTrue(ready); 145 146 // notify, thread should block waiting to reenter 147 synchronized (lock) { 148 lock.notifyAll(); 149 await(thread1, Thread.State.BLOCKED); 150 } 151 } catch (InterruptedException e) { 152 e.printStackTrace(); 153 } 154 }); 155 thread1.join(); 156 thread2.join(); 157 } 158 159 /** 160 * Test notifyAll when there are no threads waiting. 161 */ 162 @ParameterizedTest 163 @ValueSource(ints = { 0, 30000, Integer.MAX_VALUE }) 164 void testNotifyBeforeWait(int timeout) throws Exception { 165 var lock = new Object(); 166 167 // no threads waiting 168 synchronized (lock) { 169 lock.notifyAll(); 170 } 171 172 var ready = new AtomicBoolean(); 173 var thread = Thread.ofVirtual().start(() -> { 174 try { 175 synchronized (lock) { 176 ready.set(true); 177 178 // thread should wait 179 if (timeout > 0) { 180 lock.wait(timeout); 181 } else { 182 lock.wait(); 183 } 184 } 185 } catch (InterruptedException e) { } 186 }); 187 188 try { 189 // wait for thread to start and wait 190 awaitTrue(ready); 191 Thread.State expectedState = timeout > 0 192 ? Thread.State.TIMED_WAITING 193 : Thread.State.WAITING; 194 await(thread, expectedState); 195 196 // poll thread state again, it should still be waiting 197 Thread.sleep(10); 198 assertEquals(thread.getState(), expectedState); 199 } finally { 200 synchronized (lock) { 201 lock.notifyAll(); 202 } 203 thread.join(); 204 } 205 } 206 /** 207 * Test duration of timed Object.wait. 208 */ 209 @Test 210 void testTimedWaitDuration1() throws Exception { 211 var lock = new Object(); 212 213 var durationRef = new AtomicReference<Long>(); 214 var thread = Thread.ofVirtual().start(() -> { 215 try { 216 synchronized (lock) { 217 long start = millisTime(); 218 lock.wait(2000); 219 durationRef.set(millisTime() - start); 220 } 221 } catch (InterruptedException e) { } 222 }); 223 224 thread.join(); 225 226 long duration = durationRef.get(); 227 checkDuration(duration, 1900, 20_000); 228 } 229 230 /** 231 * Test duration of timed Object.wait. This test invokes wait twice, first with a short 232 * timeout, the second with a longer timeout. The test scenario ensures that the 233 * timeout from the first wait doesn't interfere with the second wait. 234 */ 235 @Test 236 void testTimedWaitDuration2() throws Exception { 237 var lock = new Object(); 238 239 var ready = new AtomicBoolean(); 240 var waited = new AtomicBoolean(); 241 var durationRef = new AtomicReference<Long>(); 242 var thread = Thread.ofVirtual().start(() -> { 243 try { 244 synchronized (lock) { 245 ready.set(true); 246 lock.wait(200); 247 waited.set(true); 248 249 long start = millisTime(); 250 lock.wait(2000); 251 durationRef.set(millisTime() - start); 252 } 253 } catch (InterruptedException e) { } 254 }); 255 256 awaitTrue(ready); 257 synchronized (lock) { 258 // wake thread if waiting in first wait 259 if (!waited.get()) { 260 lock.notifyAll(); 261 } 262 } 263 264 thread.join(); 265 266 long duration = durationRef.get(); 267 checkDuration(duration, 1900, 20_000); 268 } 269 270 /** 271 * Testing invoking Object.wait with interrupt status set. 272 */ 273 @ParameterizedTest 274 @ValueSource(ints = { 0, 30000, Integer.MAX_VALUE }) 275 void testWaitWithInterruptSet(int timeout) throws Exception { 276 VThreadRunner.run(() -> { 277 Object lock = new Object(); 278 synchronized (lock) { 279 Thread.currentThread().interrupt(); 280 if (timeout > 0) { 281 assertThrows(InterruptedException.class, () -> lock.wait(timeout)); 282 } else { 283 assertThrows(InterruptedException.class, lock::wait); 284 } 285 assertFalse(Thread.currentThread().isInterrupted()); 286 } 287 }); 288 } 289 290 /** 291 * Test interrupting a virtual thread waiting in Object.wait. 292 */ 293 @ParameterizedTest 294 @ValueSource(ints = { 0, 30000, Integer.MAX_VALUE }) 295 void testInterruptWait(int timeout) throws Exception { 296 var lock = new Object(); 297 var ready = new AtomicBoolean(); 298 var interruptedException = new AtomicBoolean(); 299 var vthread = Thread.ofVirtual().start(() -> { 300 synchronized (lock) { 301 try { 302 ready.set(true); 303 if (timeout > 0) { 304 lock.wait(timeout); 305 } else { 306 lock.wait(); 307 } 308 } catch (InterruptedException e) { 309 // check stack trace has the expected frames 310 Set<String> expected = Set.of("wait0", "wait", "run"); 311 Set<String> methods = Stream.of(e.getStackTrace()) 312 .map(StackTraceElement::getMethodName) 313 .collect(Collectors.toSet()); 314 assertTrue(methods.containsAll(expected)); 315 316 interruptedException.set(true); 317 } 318 } 319 }); 320 321 // wait for thread to start and wait 322 awaitTrue(ready); 323 await(vthread, timeout > 0 ? Thread.State.TIMED_WAITING : Thread.State.WAITING); 324 325 // interrupt thread, should block, then throw InterruptedException 326 synchronized (lock) { 327 vthread.interrupt(); 328 await(vthread, Thread.State.BLOCKED); 329 } 330 vthread.join(); 331 assertTrue(interruptedException.get()); 332 } 333 334 /** 335 * Test interrupting a virtual thread blocked waiting to reenter after waiting. 336 */ 337 @ParameterizedTest 338 @ValueSource(ints = { 0, 30000, Integer.MAX_VALUE }) 339 void testInterruptReenterAfterWait(int timeout) throws Exception { 340 var lock = new Object(); 341 var ready = new AtomicBoolean(); 342 var interruptedException = new AtomicBoolean(); 343 var vthread = Thread.ofVirtual().start(() -> { 344 synchronized (lock) { 345 try { 346 ready.set(true); 347 if (timeout > 0) { 348 lock.wait(timeout); 349 } else { 350 lock.wait(); 351 } 352 } catch (InterruptedException e) { 353 interruptedException.set(true); 354 } 355 } 356 }); 357 358 // wait for thread to start and wait 359 awaitTrue(ready); 360 await(vthread, timeout > 0 ? Thread.State.TIMED_WAITING : Thread.State.WAITING); 361 362 // notify, thread should block waiting to reenter 363 synchronized (lock) { 364 lock.notifyAll(); 365 await(vthread, Thread.State.BLOCKED); 366 367 // interrupt when blocked 368 vthread.interrupt(); 369 } 370 371 vthread.join(); 372 assertFalse(interruptedException.get()); 373 assertTrue(vthread.isInterrupted()); 374 } 375 376 /** 377 * Test Object.wait when the monitor entry count > 1. 378 */ 379 @ParameterizedTest 380 @ValueSource(ints = { 0, 30000, Integer.MAX_VALUE }) 381 void testWaitWhenEnteredManyTimes(int timeout) throws Exception { 382 var lock = new Object(); 383 var ready = new AtomicBoolean(); 384 var vthread = Thread.ofVirtual().start(() -> { 385 synchronized (lock) { 386 synchronized (lock) { 387 synchronized (lock) { 388 try { 389 ready.set(true); 390 if (timeout > 0) { 391 lock.wait(timeout); 392 } else { 393 lock.wait(); 394 } 395 } catch (InterruptedException e) { } 396 } 397 } 398 } 399 }); 400 401 // wait for thread to start and wait 402 awaitTrue(ready); 403 await(vthread, timeout > 0 ? Thread.State.TIMED_WAITING : Thread.State.WAITING); 404 405 // notify, thread should block waiting to reenter 406 synchronized (lock) { 407 lock.notifyAll(); 408 await(vthread, Thread.State.BLOCKED); 409 } 410 vthread.join(); 411 } 412 413 /** 414 * Test that Object.wait does not consume the thread's parking permit. 415 */ 416 @Test 417 void testParkingPermitNotConsumed() throws Exception { 418 var lock = new Object(); 419 var started = new CountDownLatch(1); 420 var completed = new AtomicBoolean(); 421 var vthread = Thread.ofVirtual().start(() -> { 422 started.countDown(); 423 LockSupport.unpark(Thread.currentThread()); 424 synchronized (lock) { 425 try { 426 lock.wait(); 427 } catch (InterruptedException e) { 428 fail("wait interrupted"); 429 } 430 } 431 LockSupport.park(); // should not park 432 completed.set(true); 433 }); 434 435 // wait for thread to start and wait 436 started.await(); 437 await(vthread, Thread.State.WAITING); 438 439 // wakeup thread 440 synchronized (lock) { 441 lock.notifyAll(); 442 } 443 444 // thread should terminate 445 vthread.join(); 446 assertTrue(completed.get()); 447 } 448 449 /** 450 * Test that Object.wait does not make available the thread's parking permit. 451 */ 452 @Test 453 void testParkingPermitNotOffered() throws Exception { 454 var lock = new Object(); 455 var started = new CountDownLatch(1); 456 var readyToPark = new CountDownLatch(1); 457 var completed = new AtomicBoolean(); 458 var vthread = Thread.ofVirtual().start(() -> { 459 started.countDown(); 460 synchronized (lock) { 461 try { 462 lock.wait(); 463 } catch (InterruptedException e) { 464 fail("wait interrupted"); 465 } 466 } 467 readyToPark.countDown(); 468 LockSupport.park(); // should park 469 completed.set(true); 470 }); 471 472 // wait for thread to start and wait 473 started.await(); 474 await(vthread, Thread.State.WAITING); 475 476 // wakeup thread 477 synchronized (lock) { 478 lock.notifyAll(); 479 } 480 481 // thread should park 482 readyToPark.await(); 483 await(vthread, Thread.State.WAITING); 484 485 LockSupport.unpark(vthread); 486 487 // thread should terminate 488 vthread.join(); 489 assertTrue(completed.get()); 490 } 491 492 /** 493 * Test that wait(long) throws IAE when timeout is negative. 494 */ 495 @Test 496 void testIllegalArgumentException() throws Exception { 497 VThreadRunner.run(() -> { 498 Object obj = new Object(); 499 synchronized (obj) { 500 assertThrows(IllegalArgumentException.class, () -> obj.wait(-1L)); 501 assertThrows(IllegalArgumentException.class, () -> obj.wait(-1000L)); 502 assertThrows(IllegalArgumentException.class, () -> obj.wait(Long.MIN_VALUE)); 503 } 504 }); 505 } 506 507 /** 508 * Test that wait throws IMSE when not owner. 509 */ 510 @Test 511 void testIllegalMonitorStateException() throws Exception { 512 VThreadRunner.run(() -> { 513 Object obj = new Object(); 514 assertThrows(IllegalMonitorStateException.class, () -> obj.wait()); 515 assertThrows(IllegalMonitorStateException.class, () -> obj.wait(0)); 516 assertThrows(IllegalMonitorStateException.class, () -> obj.wait(1000)); 517 assertThrows(IllegalMonitorStateException.class, () -> obj.wait(Long.MAX_VALUE)); 518 }); 519 } 520 521 /** 522 * Waits for the boolean value to become true. 523 */ 524 private static void awaitTrue(AtomicBoolean ref) throws InterruptedException { 525 while (!ref.get()) { 526 Thread.sleep(20); 527 } 528 } 529 530 /** 531 * Waits for the given thread to reach a given state. 532 */ 533 private void await(Thread thread, Thread.State expectedState) throws InterruptedException { 534 Thread.State state = thread.getState(); 535 while (state != expectedState) { 536 assertTrue(state != Thread.State.TERMINATED, "Thread has terminated"); 537 Thread.sleep(10); 538 state = thread.getState(); 539 } 540 } 541 542 /** 543 * Returns the current time in milliseconds. 544 */ 545 private static long millisTime() { 546 long now = System.nanoTime(); 547 return TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS); 548 } 549 550 /** 551 * Check a duration is within expected bounds. 552 * @param duration, in milliseconds 553 * @param min minimum expected duration, in milliseconds 554 * @param max maximum expected duration, in milliseconds 555 * @return the duration (now - start), in milliseconds 556 */ 557 private static void checkDuration(long duration, long min, long max) { 558 assertTrue(duration >= min, 559 "Duration " + duration + "ms, expected >= " + min + "ms"); 560 assertTrue(duration <= max, 561 "Duration " + duration + "ms, expected <= " + max + "ms"); 562 } 563 }