1 /*
  2  * Copyright (c) 2019, 2024, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test id=default
 26  * @summary Test virtual threads using Object.wait/notifyAll
 27  * @modules java.base/java.lang:+open jdk.management
 28  * @library /test/lib
 29  * @run junit/othervm --enable-native-access=ALL-UNNAMED MonitorWaitNotify
 30  */
 31 
 32 import java.util.ArrayList;
 33 import java.util.List;
 34 import java.util.Set;
 35 import java.util.concurrent.CountDownLatch;
 36 import java.util.concurrent.Executors;
 37 import java.util.concurrent.ExecutorService;
 38 import java.util.concurrent.ThreadFactory;
 39 import java.util.concurrent.TimeUnit;
 40 import java.util.concurrent.atomic.AtomicBoolean;
 41 import java.util.concurrent.atomic.AtomicInteger;
 42 import java.util.concurrent.atomic.AtomicReference;
 43 import java.util.concurrent.locks.LockSupport;
 44 import java.util.stream.IntStream;
 45 import java.util.stream.Stream;
 46 import java.util.stream.Collectors;
 47 
 48 import jdk.test.lib.thread.VThreadScheduler;
 49 import jdk.test.lib.thread.VThreadRunner;   // ensureParallelism requires jdk.management
 50 import jdk.test.lib.thread.VThreadPinner;
 51 import org.junit.jupiter.api.Test;
 52 import org.junit.jupiter.api.BeforeAll;
 53 import org.junit.jupiter.params.ParameterizedTest;
 54 import org.junit.jupiter.params.provider.ValueSource;
 55 import static org.junit.jupiter.api.Assertions.*;
 56 import static org.junit.jupiter.api.Assumptions.*;
 57 
 58 class MonitorWaitNotify {
 59 
 60     @BeforeAll
 61     static void setup() {
 62         // need >=2 carriers for testing pinning
 63         VThreadRunner.ensureParallelism(2);
 64     }
 65 
 66     /**
 67      * Test virtual thread waits, notified by platform thread.
 68      */
 69     @ParameterizedTest
 70     @ValueSource(booleans = { true, false })
 71     void testWaitNotify1(boolean pinned) throws Exception {
 72         var lock = new Object();
 73         var ready = new AtomicBoolean();
 74         var thread = Thread.ofVirtual().start(() -> {
 75             synchronized (lock) {
 76                 try {
 77                     if (pinned) {
 78                         VThreadPinner.runPinned(() -> {
 79                             ready.set(true);
 80                             lock.wait();
 81                         });
 82                     } else {
 83                         ready.set(true);
 84                         lock.wait();
 85                     }
 86                 } catch (InterruptedException e) { }
 87             }
 88         });
 89         awaitTrue(ready);
 90 
 91         // notify, thread should block waiting to reenter
 92         synchronized (lock) {
 93             lock.notifyAll();
 94             await(thread, Thread.State.BLOCKED);
 95         }
 96         thread.join();
 97     }
 98 
 99     /**
100      * Test platform thread waits, notified by virtual thread.
101      */
102     @Test
103     void testWaitNotify2() throws Exception {
104         var lock = new Object();
105         var thread = Thread.ofVirtual().unstarted(() -> {
106             synchronized (lock) {
107                 lock.notifyAll();
108             }
109         });
110         synchronized (lock) {
111             thread.start();
112             lock.wait();
113         }
114         thread.join();
115     }
116 
117     /**
118      * Test virtual thread waits, notified by another virtual thread.
119      */
120     @ParameterizedTest
121     @ValueSource(booleans = { true, false })
122     void testWaitNotify3(boolean pinned) throws Exception {
123         var lock = new Object();
124         var ready = new AtomicBoolean();
125         var thread1 = Thread.ofVirtual().start(() -> {
126             synchronized (lock) {
127                 try {
128                     if (pinned) {
129                         VThreadPinner.runPinned(() -> {
130                             ready.set(true);
131                             lock.wait();
132                         });
133                     } else {
134                         ready.set(true);
135                         lock.wait();
136                     }
137                 } catch (InterruptedException e) {
138                     e.printStackTrace();
139                 }
140             }
141         });
142         var thread2 = Thread.ofVirtual().start(() -> {
143             try {
144                 awaitTrue(ready);
145 
146                 // notify, thread should block waiting to reenter
147                 synchronized (lock) {
148                     lock.notifyAll();
149                     await(thread1, Thread.State.BLOCKED);
150                 }
151             } catch (InterruptedException e) {
152                 e.printStackTrace();
153             }
154         });
155         thread1.join();
156         thread2.join();
157     }
158 
159     /**
160      * Test notifyAll when there are no threads waiting.
161      */
162     @ParameterizedTest
163     @ValueSource(ints = { 0, 30000, Integer.MAX_VALUE })
164     void testNotifyBeforeWait(int timeout) throws Exception {
165         var lock = new Object();
166 
167         // no threads waiting
168         synchronized (lock) {
169             lock.notifyAll();
170         }
171 
172         var ready = new AtomicBoolean();
173         var thread = Thread.ofVirtual().start(() -> {
174             try {
175                 synchronized (lock) {
176                     ready.set(true);
177 
178                     // thread should wait
179                     if (timeout > 0) {
180                         lock.wait(timeout);
181                     } else {
182                         lock.wait();
183                     }
184                 }
185             } catch (InterruptedException e) { }
186         });
187 
188         try {
189             // wait for thread to start and wait
190             awaitTrue(ready);
191             Thread.State expectedState = timeout > 0
192                     ? Thread.State.TIMED_WAITING
193                     : Thread.State.WAITING;
194             await(thread, expectedState);
195 
196             // poll thread state again, it should still be waiting
197             Thread.sleep(10);
198             assertEquals(thread.getState(), expectedState);
199         } finally {
200             synchronized (lock) {
201                 lock.notifyAll();
202             }
203             thread.join();
204         }
205     }
206     /**
207      * Test duration of timed Object.wait.
208      */
209     @Test
210     void testTimedWaitDuration1() throws Exception {
211         var lock = new Object();
212 
213         var durationRef = new AtomicReference<Long>();
214         var thread = Thread.ofVirtual().start(() -> {
215             try {
216                 synchronized (lock) {
217                     long start = millisTime();
218                     lock.wait(2000);
219                     durationRef.set(millisTime() - start);
220                 }
221             } catch (InterruptedException e) { }
222         });
223 
224         thread.join();
225 
226         long duration = durationRef.get();
227         checkDuration(duration, 1900, 20_000);
228     }
229 
230     /**
231      * Test duration of timed Object.wait. This test invokes wait twice, first with a short
232      * timeout, the second with a longer timeout. The test scenario ensures that the
233      * timeout from the first wait doesn't interfere with the second wait.
234      */
235     @Test
236     void testTimedWaitDuration2() throws Exception {
237         var lock = new Object();
238 
239         var ready = new AtomicBoolean();
240         var waited = new AtomicBoolean();
241         var durationRef = new AtomicReference<Long>();
242         var thread = Thread.ofVirtual().start(() -> {
243             try {
244                 synchronized (lock) {
245                     ready.set(true);
246                     lock.wait(200);
247                     waited.set(true);
248 
249                     long start = millisTime();
250                     lock.wait(2000);
251                     durationRef.set(millisTime() - start);
252                 }
253             } catch (InterruptedException e) { }
254         });
255 
256         awaitTrue(ready);
257         synchronized (lock) {
258             // wake thread if waiting in first wait
259             if (!waited.get()) {
260                 lock.notifyAll();
261             }
262         }
263 
264         thread.join();
265 
266         long duration = durationRef.get();
267         checkDuration(duration, 1900, 20_000);
268     }
269 
270     /**
271      * Testing invoking Object.wait with interrupt status set.
272      */
273     @ParameterizedTest
274     @ValueSource(ints = { 0, 30000, Integer.MAX_VALUE })
275     void testWaitWithInterruptSet(int timeout) throws Exception {
276         VThreadRunner.run(() -> {
277             Object lock = new Object();
278             synchronized (lock) {
279                 Thread.currentThread().interrupt();
280                 if (timeout > 0) {
281                     assertThrows(InterruptedException.class, () -> lock.wait(timeout));
282                 } else {
283                     assertThrows(InterruptedException.class, lock::wait);
284                 }
285                 assertFalse(Thread.currentThread().isInterrupted());
286             }
287         });
288     }
289 
290     /**
291      * Test interrupting a virtual thread waiting in Object.wait.
292      */
293     @ParameterizedTest
294     @ValueSource(ints = { 0, 30000, Integer.MAX_VALUE })
295     void testInterruptWait(int timeout) throws Exception {
296         var lock = new Object();
297         var ready = new AtomicBoolean();
298         var interruptedException = new AtomicBoolean();
299         var vthread = Thread.ofVirtual().start(() -> {
300             synchronized (lock) {
301                 try {
302                     ready.set(true);
303                     if (timeout > 0) {
304                         lock.wait(timeout);
305                     } else {
306                         lock.wait();
307                     }
308                 } catch (InterruptedException e) {
309                     // check stack trace has the expected frames
310                     Set<String> expected = Set.of("wait0", "wait", "run");
311                     Set<String> methods = Stream.of(e.getStackTrace())
312                             .map(StackTraceElement::getMethodName)
313                             .collect(Collectors.toSet());
314                     assertTrue(methods.containsAll(expected));
315 
316                     interruptedException.set(true);
317                 }
318             }
319         });
320 
321         // wait for thread to start and wait
322         awaitTrue(ready);
323         await(vthread, timeout > 0 ? Thread.State.TIMED_WAITING : Thread.State.WAITING);
324 
325         // interrupt thread, should block, then throw InterruptedException
326         synchronized (lock) {
327             vthread.interrupt();
328             await(vthread, Thread.State.BLOCKED);
329         }
330         vthread.join();
331         assertTrue(interruptedException.get());
332     }
333 
334     /**
335      * Test interrupting a virtual thread blocked waiting to reenter after waiting.
336      */
337     @ParameterizedTest
338     @ValueSource(ints = { 0, 30000, Integer.MAX_VALUE })
339     void testInterruptReenterAfterWait(int timeout) throws Exception {
340         var lock = new Object();
341         var ready = new AtomicBoolean();
342         var interruptedException = new AtomicBoolean();
343         var vthread = Thread.ofVirtual().start(() -> {
344             synchronized (lock) {
345                 try {
346                     ready.set(true);
347                     if (timeout > 0) {
348                         lock.wait(timeout);
349                     } else {
350                         lock.wait();
351                     }
352                 } catch (InterruptedException e) {
353                     interruptedException.set(true);
354                 }
355             }
356         });
357 
358         // wait for thread to start and wait
359         awaitTrue(ready);
360         await(vthread, timeout > 0 ? Thread.State.TIMED_WAITING : Thread.State.WAITING);
361 
362         // notify, thread should block waiting to reenter
363         synchronized (lock) {
364             lock.notifyAll();
365             await(vthread, Thread.State.BLOCKED);
366 
367             // interrupt when blocked
368             vthread.interrupt();
369         }
370 
371         vthread.join();
372         assertFalse(interruptedException.get());
373         assertTrue(vthread.isInterrupted());
374     }
375 
376     /**
377      * Test Object.wait when the monitor entry count > 1.
378      */
379     @ParameterizedTest
380     @ValueSource(ints = { 0, 30000, Integer.MAX_VALUE })
381     void testWaitWhenEnteredManyTimes(int timeout) throws Exception {
382         var lock = new Object();
383         var ready = new AtomicBoolean();
384         var vthread = Thread.ofVirtual().start(() -> {
385             synchronized (lock) {
386                 synchronized (lock) {
387                     synchronized (lock) {
388                         try {
389                             ready.set(true);
390                             if (timeout > 0) {
391                                 lock.wait(timeout);
392                             } else {
393                                 lock.wait();
394                             }
395                         } catch (InterruptedException e) { }
396                     }
397                 }
398             }
399         });
400 
401         // wait for thread to start and wait
402         awaitTrue(ready);
403         await(vthread, timeout > 0 ? Thread.State.TIMED_WAITING : Thread.State.WAITING);
404 
405         // notify, thread should block waiting to reenter
406         synchronized (lock) {
407             lock.notifyAll();
408             await(vthread, Thread.State.BLOCKED);
409         }
410         vthread.join();
411     }
412 
413     /**
414      * Test that Object.wait does not consume the thread's parking permit.
415      */
416     @Test
417     void testParkingPermitNotConsumed() throws Exception {
418         var lock = new Object();
419         var started = new CountDownLatch(1);
420         var completed = new AtomicBoolean();
421         var vthread = Thread.ofVirtual().start(() -> {
422             started.countDown();
423             LockSupport.unpark(Thread.currentThread());
424             synchronized (lock) {
425                 try {
426                     lock.wait();
427                 } catch (InterruptedException e) {
428                     fail("wait interrupted");
429                 }
430             }
431             LockSupport.park();      // should not park
432             completed.set(true);
433         });
434 
435         // wait for thread to start and wait
436         started.await();
437         await(vthread, Thread.State.WAITING);
438 
439         // wakeup thread
440         synchronized (lock) {
441             lock.notifyAll();
442         }
443 
444         // thread should terminate
445         vthread.join();
446         assertTrue(completed.get());
447     }
448 
449     /**
450      * Test that Object.wait does not make available the thread's parking permit.
451      */
452     @Test
453     void testParkingPermitNotOffered() throws Exception {
454         var lock = new Object();
455         var started = new CountDownLatch(1);
456         var readyToPark = new CountDownLatch(1);
457         var completed = new AtomicBoolean();
458         var vthread = Thread.ofVirtual().start(() -> {
459             started.countDown();
460             synchronized (lock) {
461                 try {
462                     lock.wait();
463                 } catch (InterruptedException e) {
464                     fail("wait interrupted");
465                 }
466             }
467             readyToPark.countDown();
468             LockSupport.park();      // should park
469             completed.set(true);
470         });
471 
472         // wait for thread to start and wait
473         started.await();
474         await(vthread, Thread.State.WAITING);
475 
476         // wakeup thread
477         synchronized (lock) {
478             lock.notifyAll();
479         }
480 
481         // thread should park
482         readyToPark.await();
483         await(vthread, Thread.State.WAITING);
484 
485         LockSupport.unpark(vthread);
486 
487         // thread should terminate
488         vthread.join();
489         assertTrue(completed.get());
490     }
491 
492     /**
493      * Test that wait(long) throws IAE when timeout is negative.
494      */
495     @Test
496     void testIllegalArgumentException() throws Exception {
497         VThreadRunner.run(() -> {
498             Object obj = new Object();
499             synchronized (obj) {
500                 assertThrows(IllegalArgumentException.class, () -> obj.wait(-1L));
501                 assertThrows(IllegalArgumentException.class, () -> obj.wait(-1000L));
502                 assertThrows(IllegalArgumentException.class, () -> obj.wait(Long.MIN_VALUE));
503             }
504         });
505     }
506 
507     /**
508      * Test that wait throws IMSE when not owner.
509      */
510     @Test
511     void testIllegalMonitorStateException() throws Exception {
512         VThreadRunner.run(() -> {
513             Object obj = new Object();
514             assertThrows(IllegalMonitorStateException.class, () -> obj.wait());
515             assertThrows(IllegalMonitorStateException.class, () -> obj.wait(0));
516             assertThrows(IllegalMonitorStateException.class, () -> obj.wait(1000));
517             assertThrows(IllegalMonitorStateException.class, () -> obj.wait(Long.MAX_VALUE));
518         });
519     }
520 
521     /**
522      * Waits for the boolean value to become true.
523      */
524     private static void awaitTrue(AtomicBoolean ref) throws InterruptedException {
525         while (!ref.get()) {
526             Thread.sleep(20);
527         }
528     }
529 
530     /**
531      * Waits for the given thread to reach a given state.
532      */
533     private void await(Thread thread, Thread.State expectedState) throws InterruptedException {
534         Thread.State state = thread.getState();
535         while (state != expectedState) {
536             assertTrue(state != Thread.State.TERMINATED, "Thread has terminated");
537             Thread.sleep(10);
538             state = thread.getState();
539         }
540     }
541 
542     /**
543      * Returns the current time in milliseconds.
544      */
545     private static long millisTime() {
546         long now = System.nanoTime();
547         return TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);
548     }
549 
550     /**
551      * Check a duration is within expected bounds.
552      * @param duration, in milliseconds
553      * @param min minimum expected duration, in milliseconds
554      * @param max maximum expected duration, in milliseconds
555      * @return the duration (now - start), in milliseconds
556      */
557     private static void checkDuration(long duration, long min, long max) {
558         assertTrue(duration >= min,
559                 "Duration " + duration + "ms, expected >= " + min + "ms");
560         assertTrue(duration <= max,
561                 "Duration " + duration + "ms, expected <= " + max + "ms");
562     }
563 }