1 /*
2 * Copyright (c) 2021, 2025, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /*
25 * @test id=platform
26 * @bug 8284199 8296779 8306647
27 * @summary Basic tests for StructuredTaskScope
28 * @enablePreview
29 * @run junit/othervm -DthreadFactory=platform StructuredTaskScopeTest
30 */
31
32 /*
33 * @test id=virtual
34 * @enablePreview
35 * @run junit/othervm -DthreadFactory=virtual StructuredTaskScopeTest
36 */
37
38 import java.time.Duration;
39 import java.util.Arrays;
40 import java.util.ArrayList;
41 import java.util.List;
42 import java.util.Set;
43 import java.util.NoSuchElementException;
44 import java.util.concurrent.Callable;
45 import java.util.concurrent.ConcurrentHashMap;
46 import java.util.concurrent.CountDownLatch;
47 import java.util.concurrent.Executors;
48 import java.util.concurrent.Future;
49 import java.util.concurrent.LinkedTransferQueue;
50 import java.util.concurrent.ThreadFactory;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.RejectedExecutionException;
53 import java.util.concurrent.ScheduledExecutorService;
54 import java.util.concurrent.StructuredTaskScope;
55 import java.util.concurrent.StructuredTaskScope.TimeoutException;
56 import java.util.concurrent.StructuredTaskScope.Configuration;
57 import java.util.concurrent.StructuredTaskScope.FailedException;
58 import java.util.concurrent.StructuredTaskScope.Joiner;
59 import java.util.concurrent.StructuredTaskScope.Subtask;
60 import java.util.concurrent.StructureViolationException;
61 import java.util.concurrent.atomic.AtomicBoolean;
62 import java.util.concurrent.atomic.AtomicInteger;
63 import java.util.concurrent.atomic.AtomicReference;
64 import java.util.function.Predicate;
65 import java.util.function.UnaryOperator;
66 import java.util.stream.Stream;
67 import static java.lang.Thread.State.*;
68
69 import org.junit.jupiter.api.Test;
70 import org.junit.jupiter.api.BeforeAll;
71 import org.junit.jupiter.api.AfterAll;
72 import org.junit.jupiter.params.ParameterizedTest;
73 import org.junit.jupiter.params.provider.MethodSource;
74 import static org.junit.jupiter.api.Assertions.*;
75
76 class StructuredTaskScopeTest {
77 private static ScheduledExecutorService scheduler;
78 private static List<ThreadFactory> threadFactories;
79
80 @BeforeAll
81 static void setup() throws Exception {
82 scheduler = Executors.newSingleThreadScheduledExecutor();
83
84 // thread factories
85 String value = System.getProperty("threadFactory");
86 List<ThreadFactory> list = new ArrayList<>();
87 if (value == null || value.equals("platform"))
88 list.add(Thread.ofPlatform().factory());
89 if (value == null || value.equals("virtual"))
90 list.add(Thread.ofVirtual().factory());
91 assertTrue(list.size() > 0, "No thread factories for tests");
92 threadFactories = list;
93 }
94
95 @AfterAll
96 static void shutdown() {
97 scheduler.shutdown();
98 }
99
100 private static Stream<ThreadFactory> factories() {
101 return threadFactories.stream();
102 }
103
104 /**
105 * Test that fork creates virtual threads when no ThreadFactory is configured.
106 */
107 @Test
108 void testForkCreatesVirtualThread() throws Exception {
109 Set<Thread> threads = ConcurrentHashMap.newKeySet();
110 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
111 for (int i = 0; i < 50; i++) {
112 // runnable
113 scope.fork(() -> {
114 threads.add(Thread.currentThread());
115 });
116
117 // callable
118 scope.fork(() -> {
119 threads.add(Thread.currentThread());
120 return null;
121 });
122 }
123 scope.join();
124 }
125 assertEquals(100, threads.size());
126 threads.forEach(t -> assertTrue(t.isVirtual()));
127 }
128
129 /**
130 * Test that fork create threads with the configured ThreadFactory.
131 */
132 @ParameterizedTest
133 @MethodSource("factories")
134 void testForkUsesThreadFactory(ThreadFactory factory) throws Exception {
135 // TheadFactory that keeps reference to all threads it creates
136 class RecordingThreadFactory implements ThreadFactory {
137 final ThreadFactory delegate;
138 final Set<Thread> threads = ConcurrentHashMap.newKeySet();
139 RecordingThreadFactory(ThreadFactory delegate) {
140 this.delegate = delegate;
141 }
142 @Override
143 public Thread newThread(Runnable task) {
144 Thread thread = delegate.newThread(task);
145 threads.add(thread);
146 return thread;
147 }
148 Set<Thread> threads() {
149 return threads;
150 }
151 }
152 var recordingThreadFactory = new RecordingThreadFactory(factory);
153 Set<Thread> threads = ConcurrentHashMap.newKeySet();
154 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
155 cf -> cf.withThreadFactory(recordingThreadFactory))) {
156
157 for (int i = 0; i < 50; i++) {
158 // runnable
159 scope.fork(() -> {
160 threads.add(Thread.currentThread());
161 });
162
163 // callable
164 scope.fork(() -> {
165 threads.add(Thread.currentThread());
166 return null;
167 });
168 }
169 scope.join();
170 }
171 assertEquals(100, threads.size());
172 assertEquals(recordingThreadFactory.threads(), threads);
173 }
174
175 /**
176 * Test fork method is owner confined.
177 */
178 @ParameterizedTest
179 @MethodSource("factories")
180 void testForkConfined(ThreadFactory factory) throws Exception {
181 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
182 cf -> cf.withThreadFactory(factory))) {
183
184 // random thread cannot fork
185 try (var pool = Executors.newSingleThreadExecutor()) {
186 Future<Void> future = pool.submit(() -> {
187 assertThrows(WrongThreadException.class, () -> {
188 scope.fork(() -> null);
189 });
190 return null;
191 });
192 future.get();
193 }
194
195 // subtask cannot fork
196 Subtask<Boolean> subtask = scope.fork(() -> {
197 assertThrows(WrongThreadException.class, () -> {
198 scope.fork(() -> null);
199 });
200 return true;
201 });
202 scope.join();
203 assertTrue(subtask.get());
204 }
205 }
206
207 /**
208 * Test fork after join, no subtasks forked before join.
209 */
210 @ParameterizedTest
211 @MethodSource("factories")
212 void testForkAfterJoinCompleted1(ThreadFactory factory) throws Exception {
213 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
214 cf -> cf.withThreadFactory(factory))) {
215 scope.join();
216 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
217 }
218 }
219
220 /**
221 * Test fork after join, subtasks forked before join.
222 */
223 @ParameterizedTest
224 @MethodSource("factories")
225 void testForkAfterJoinCompleted2(ThreadFactory factory) throws Exception {
226 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
227 cf -> cf.withThreadFactory(factory))) {
228 scope.fork(() -> "foo");
229 scope.join();
230 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
231 }
232 }
233
234 /**
235 * Test fork after join interrupted.
236 */
237 @ParameterizedTest
238 @MethodSource("factories")
239 void testForkAfterJoinInterrupted(ThreadFactory factory) throws Exception {
240 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
241 cf -> cf.withThreadFactory(factory))) {
242 var subtask1 = scope.fork(() -> {
243 Thread.sleep(Duration.ofDays(1));
244 return "foo";
245 });
246
247 // join throws
248 Thread.currentThread().interrupt();
249 assertThrows(InterruptedException.class, scope::join);
250
251 // fork should throw
252 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
253 }
254 }
255
256 /**
257 * Test fork after join timeout.
258 */
259 @ParameterizedTest
260 @MethodSource("factories")
261 void testForkAfterJoinTimeout(ThreadFactory factory) throws Exception {
262 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
263 cf -> cf.withThreadFactory(factory)
264 .withTimeout(Duration.ofMillis(100)))) {
265 awaitCancelled(scope);
266
267 // join throws
268 assertThrows(TimeoutException.class, scope::join);
269
270 // fork should throw
271 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
272 }
273 }
274
275 /**
276 * Test fork after task scope is cancelled. This test uses a custom Joiner to
277 * cancel execution.
278 */
279 @ParameterizedTest
280 @MethodSource("factories")
281 void testForkAfterCancel2(ThreadFactory factory) throws Exception {
282 var countingThreadFactory = new CountingThreadFactory(factory);
283 var testJoiner = new CancelAfterOneJoiner<String>();
284
285 try (var scope = StructuredTaskScope.open(testJoiner,
286 cf -> cf.withThreadFactory(countingThreadFactory))) {
287
288 // fork subtask, the scope should be cancelled when the subtask completes
289 var subtask1 = scope.fork(() -> "foo");
290 awaitCancelled(scope);
291
292 assertEquals(1, countingThreadFactory.threadCount());
293 assertEquals(1, testJoiner.onForkCount());
294 assertEquals(1, testJoiner.onCompleteCount());
295
296 // fork second subtask, it should not run
297 var subtask2 = scope.fork(() -> "bar");
298
299 // onFork should be invoked, newThread and onComplete should not be invoked
300 assertEquals(1, countingThreadFactory.threadCount());
301 assertEquals(2, testJoiner.onForkCount());
302 assertEquals(1, testJoiner.onCompleteCount());
303
304 scope.join();
305
306 assertEquals(1, countingThreadFactory.threadCount());
307 assertEquals(2, testJoiner.onForkCount());
308 assertEquals(1, testJoiner.onCompleteCount());
309 assertEquals("foo", subtask1.get());
310 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
311 }
312 }
313
314 /**
315 * Test fork after task scope is closed.
316 */
317 @ParameterizedTest
318 @MethodSource("factories")
319 void testForkAfterClose(ThreadFactory factory) {
320 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
321 cf -> cf.withThreadFactory(factory))) {
322 scope.close();
323 assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
324 }
325 }
326
327 /**
328 * Test fork with a ThreadFactory that rejects creating a thread.
329 */
330 @Test
331 void testForkRejectedExecutionException() {
332 ThreadFactory factory = task -> null;
333 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
334 cf -> cf.withThreadFactory(factory))) {
335 assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null));
336 }
337 }
338
339 /**
340 * Test join with no subtasks.
341 */
342 @Test
343 void testJoinWithNoSubtasks() throws Exception {
344 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
345 scope.join();
346 }
347 }
348
349 /**
350 * Test join with a remaining subtask.
351 */
352 @ParameterizedTest
353 @MethodSource("factories")
354 void testJoinWithRemainingSubtasks(ThreadFactory factory) throws Exception {
355 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
356 cf -> cf.withThreadFactory(factory))) {
357 Subtask<String> subtask = scope.fork(() -> {
358 Thread.sleep(Duration.ofMillis(100));
359 return "foo";
360 });
361 scope.join();
362 assertEquals("foo", subtask.get());
363 }
364 }
365
366 /**
367 * Test join after join completed with a result.
368 */
369 @Test
370 void testJoinAfterJoin1() throws Exception {
371 var results = new LinkedTransferQueue<>(List.of("foo", "bar", "baz"));
372 Joiner<Object, String> joiner = results::take;
373 try (var scope = StructuredTaskScope.open(joiner)) {
374 scope.fork(() -> "foo");
375 assertEquals("foo", scope.join());
376
377 // join already called
378 for (int i = 0 ; i < 3; i++) {
379 assertThrows(IllegalStateException.class, scope::join);
380 }
381 }
382 }
383
384 /**
385 * Test join after join completed with an exception.
386 */
387 @Test
388 void testJoinAfterJoin2() throws Exception {
389 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow())) {
390 scope.fork(() -> { throw new FooException(); });
391 Throwable ex = assertThrows(FailedException.class, scope::join);
392 assertTrue(ex.getCause() instanceof FooException);
393
394 // join already called
395 for (int i = 0 ; i < 3; i++) {
396 assertThrows(IllegalStateException.class, scope::join);
397 }
398 }
399 }
400
401 /**
402 * Test join after join interrupted.
403 */
404 @Test
405 void testJoinAfterJoinInterrupted() throws Exception {
406 try (var scope = StructuredTaskScope.open()) {
407 var latch = new CountDownLatch(1);
408 var subtask = scope.fork(() -> {
409 latch.await();
410 return "foo";
411 });
412
413 // join throws InterruptedException
414 Thread.currentThread().interrupt();
415 assertThrows(InterruptedException.class, scope::join);
416
417 latch.countDown();
418
419 // retry join to get result
420 scope.join();
421 assertEquals("foo", subtask.get());
422
423 // retry after otbaining result
424 assertThrows(IllegalStateException.class, scope::join);
425 }
426 }
427
428 /**
429 * Test join after join completed with a timeout.
430 */
431 @Test
432 void testJoinAfterJoinTimeout() throws Exception {
433 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow(),
434 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
435 // wait for scope to be cancelled by timeout
436 awaitCancelled(scope);
437 assertThrows(TimeoutException.class, scope::join);
438
439 // join already called
440 for (int i = 0 ; i < 3; i++) {
441 assertThrows(IllegalStateException.class, scope::join);
442 }
443 }
444 }
445
446 /**
447 * Test join invoked from Joiner.onTimeout.
448 */
449 @Test
450 void testJoinInOnTimeout() throws Exception {
451 Thread owner = Thread.currentThread();
452 var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>();
453
454 var joiner = new Joiner<String, Void>() {
455 @Override
456 public void onTimeout() {
457 assertTrue(Thread.currentThread() == owner);
458 var scope = scopeRef.get();
459 assertThrows(IllegalStateException.class, scope::join);
460 }
461 @Override
462 public Void result() {
463 return null;
464 }
465 };
466
467 try (var scope = StructuredTaskScope.open(joiner,
468 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
469 awaitCancelled(scope);
470 scopeRef.set(scope);
471 scope.join(); // invokes onTimeout
472 }
473 }
474
475 /**
476 * Test join method is owner confined.
477 */
478 @ParameterizedTest
479 @MethodSource("factories")
480 void testJoinConfined(ThreadFactory factory) throws Exception {
481 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
482 cf -> cf.withThreadFactory(factory))) {
483
484 // random thread cannot join
485 try (var pool = Executors.newSingleThreadExecutor()) {
486 Future<Void> future = pool.submit(() -> {
487 assertThrows(WrongThreadException.class, scope::join);
488 return null;
489 });
490 future.get();
491 }
492
493 // subtask cannot join
494 Subtask<Boolean> subtask = scope.fork(() -> {
495 assertThrows(WrongThreadException.class, () -> { scope.join(); });
496 return true;
497 });
498 scope.join();
499 assertTrue(subtask.get());
500 }
501 }
502
503 /**
504 * Test join with interrupt status set.
505 */
506 @ParameterizedTest
507 @MethodSource("factories")
508 void testInterruptJoin1(ThreadFactory factory) throws Exception {
509 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
510 cf -> cf.withThreadFactory(factory))) {
511
512 Subtask<String> subtask = scope.fork(() -> {
513 Thread.sleep(Duration.ofDays(1));
514 return "foo";
515 });
516
517 // join should throw
518 Thread.currentThread().interrupt();
519 try {
520 scope.join();
521 fail("join did not throw");
522 } catch (InterruptedException expected) {
523 assertFalse(Thread.interrupted()); // interrupt status should be cleared
524 }
525 }
526 }
527
528 /**
529 * Test interrupt of thread blocked in join.
530 */
531 @ParameterizedTest
532 @MethodSource("factories")
533 void testInterruptJoin2(ThreadFactory factory) throws Exception {
534 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
535 cf -> cf.withThreadFactory(factory))) {
536 Subtask<String> subtask = scope.fork(() -> {
537 Thread.sleep(Duration.ofDays(1));
538 return "foo";
539 });
540
541 // interrupt main thread when it blocks in join
542 scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join");
543 try {
544 scope.join();
545 fail("join did not throw");
546 } catch (InterruptedException expected) {
547 assertFalse(Thread.interrupted()); // interrupt status should be clear
548 }
549 }
550 }
551
552 /**
553 * Test join when scope is cancelled.
554 */
555 @ParameterizedTest
556 @MethodSource("factories")
557 void testJoinWhenCancelled(ThreadFactory factory) throws Exception {
558 var countingThreadFactory = new CountingThreadFactory(factory);
559 var testJoiner = new CancelAfterOneJoiner<String>();
560
561 try (var scope = StructuredTaskScope.open(testJoiner,
562 cf -> cf.withThreadFactory(countingThreadFactory))) {
563
564 // fork subtask, the scope should be cancelled when the subtask completes
565 var subtask1 = scope.fork(() -> "foo");
566 awaitCancelled(scope);
567
568 // fork second subtask, it should not run
569 var subtask2 = scope.fork(() -> {
570 Thread.sleep(Duration.ofDays(1));
571 return "bar";
572 });
573
574 scope.join();
575
576 assertEquals("foo", subtask1.get());
577 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
578 }
579 }
580
581 /**
582 * Test join after scope is closed.
583 */
584 @Test
585 void testJoinAfterClose() throws Exception {
586 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
587 scope.close();
588 assertThrows(IllegalStateException.class, () -> scope.join());
589 }
590 }
591
592 /**
593 * Test join with timeout, subtasks finish before timeout expires.
594 */
595 @ParameterizedTest
596 @MethodSource("factories")
597 void testJoinWithTimeout1(ThreadFactory factory) throws Exception {
598 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
599 cf -> cf.withThreadFactory(factory)
600 .withTimeout(Duration.ofDays(1)))) {
601
602 Subtask<String> subtask = scope.fork(() -> {
603 Thread.sleep(Duration.ofSeconds(1));
604 return "foo";
605 });
606
607 scope.join();
608
609 assertFalse(scope.isCancelled());
610 assertEquals("foo", subtask.get());
611 }
612 }
613
614 /**
615 * Test join with timeout, timeout expires before subtasks finish.
616 */
617 @ParameterizedTest
618 @MethodSource("factories")
619 void testJoinWithTimeout2(ThreadFactory factory) throws Exception {
620 long startMillis = millisTime();
621 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
622 cf -> cf.withThreadFactory(factory)
623 .withTimeout(Duration.ofSeconds(2)))) {
624
625 Subtask<Void> subtask = scope.fork(() -> {
626 Thread.sleep(Duration.ofDays(1));
627 return null;
628 });
629
630 assertThrows(TimeoutException.class, scope::join);
631 expectDuration(startMillis, /*min*/1900, /*max*/20_000);
632
633 assertTrue(scope.isCancelled());
634 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
635 }
636 }
637
638 /**
639 * Test join with timeout that has already expired.
640 */
641 @ParameterizedTest
642 @MethodSource("factories")
643 void testJoinWithTimeout3(ThreadFactory factory) throws Exception {
644 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
645 cf -> cf.withThreadFactory(factory)
646 .withTimeout(Duration.ofSeconds(-1)))) {
647
648 Subtask<Void> subtask = scope.fork(() -> {
649 Thread.sleep(Duration.ofDays(1));
650 return null;
651 });
652
653 assertThrows(TimeoutException.class, scope::join);
654
655 assertTrue(scope.isCancelled());
656 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
657 }
658 }
659
660 /**
661 * Test that cancelling execution interrupts unfinished threads. This test uses
662 * a custom Joiner to cancel execution.
663 */
664 @ParameterizedTest
665 @MethodSource("factories")
666 void testCancelInterruptsThreads2(ThreadFactory factory) throws Exception {
667 var testJoiner = new CancelAfterOneJoiner<String>();
668
669 try (var scope = StructuredTaskScope.open(testJoiner,
670 cf -> cf.withThreadFactory(factory))) {
671
672 // fork subtask1 that runs for a long time
673 var started = new CountDownLatch(1);
674 var interrupted = new CountDownLatch(1);
675 var subtask1 = scope.fork(() -> {
676 started.countDown();
677 try {
678 Thread.sleep(Duration.ofDays(1));
679 } catch (InterruptedException e) {
680 interrupted.countDown();
681 }
682 });
683 started.await();
684
685 // fork subtask2, the scope should be cancelled when the subtask completes
686 var subtask2 = scope.fork(() -> "bar");
687 awaitCancelled(scope);
688
689 // subtask1 should be interrupted
690 interrupted.await();
691
692 scope.join();
693 assertEquals(Subtask.State.UNAVAILABLE, subtask1.state());
694 assertEquals("bar", subtask2.get());
695 }
696 }
697
698 /**
699 * Test that timeout interrupts unfinished threads.
700 */
701 @ParameterizedTest
702 @MethodSource("factories")
703 void testTimeoutInterruptsThreads(ThreadFactory factory) throws Exception {
704 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
705 cf -> cf.withThreadFactory(factory)
706 .withTimeout(Duration.ofSeconds(2)))) {
707
708 var started = new AtomicBoolean();
709 var interrupted = new CountDownLatch(1);
710 Subtask<Void> subtask = scope.fork(() -> {
711 started.set(true);
712 try {
713 Thread.sleep(Duration.ofDays(1));
714 } catch (InterruptedException e) {
715 interrupted.countDown();
716 }
717 return null;
718 });
719
720 // wait for scope to be cancelled by timeout
721 awaitCancelled(scope);
722
723 // if subtask started then it should be interrupted
724 if (started.get()) {
725 interrupted.await();
726 }
727
728 assertThrows(TimeoutException.class, scope::join);
729
730 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
731 }
732 }
733
734 /**
735 * Test close without join, no subtasks forked.
736 */
737 @Test
738 void testCloseWithoutJoin1() {
739 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
740 // do nothing
741 }
742 }
743
744 /**
745 * Test close without join, subtasks forked.
746 */
747 @ParameterizedTest
748 @MethodSource("factories")
749 void testCloseWithoutJoin2(ThreadFactory factory) {
750 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
751 cf -> cf.withThreadFactory(factory))) {
752 Subtask<String> subtask = scope.fork(() -> {
753 Thread.sleep(Duration.ofDays(1));
754 return null;
755 });
756
757 // first call to close should throw
758 assertThrows(IllegalStateException.class, scope::close);
759
760 // subsequent calls to close should not throw
761 for (int i = 0; i < 3; i++) {
762 scope.close();
763 }
764
765 // subtask result/exception not available
766 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
767 assertThrows(IllegalStateException.class, subtask::get);
768 assertThrows(IllegalStateException.class, subtask::exception);
769 }
770 }
771
772 /**
773 * Test close after join throws. Close should not throw as join attempted.
774 */
775 @ParameterizedTest
776 @MethodSource("factories")
777 void testCloseAfterJoinThrows(ThreadFactory factory) throws Exception {
778 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
779 cf -> cf.withThreadFactory(factory))) {
780 var subtask = scope.fork(() -> {
781 Thread.sleep(Duration.ofDays(1));
782 return null;
783 });
784
785 // join throws
786 Thread.currentThread().interrupt();
787 assertThrows(InterruptedException.class, scope::join);
788 assertThrows(IllegalStateException.class, subtask::get);
789
790 } // close should not throw
791 }
792
793 /**
794 * Test close method is owner confined.
795 */
796 @ParameterizedTest
797 @MethodSource("factories")
798 void testCloseConfined(ThreadFactory factory) throws Exception {
799 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
800 cf -> cf.withThreadFactory(factory))) {
801
802 // random thread cannot close scope
803 try (var pool = Executors.newCachedThreadPool(factory)) {
804 Future<Boolean> future = pool.submit(() -> {
805 assertThrows(WrongThreadException.class, scope::close);
806 return null;
807 });
808 future.get();
809 }
810
811 // subtask cannot close
812 Subtask<Boolean> subtask = scope.fork(() -> {
813 assertThrows(WrongThreadException.class, scope::close);
814 return true;
815 });
816 scope.join();
817 assertTrue(subtask.get());
818 }
819 }
820
821 /**
822 * Test close with interrupt status set.
823 */
824 @ParameterizedTest
825 @MethodSource("factories")
826 void testInterruptClose1(ThreadFactory factory) throws Exception {
827 var testJoiner = new CancelAfterOneJoiner<String>();
828 try (var scope = StructuredTaskScope.open(testJoiner,
829 cf -> cf.withThreadFactory(factory))) {
830
831 // fork first subtask, a straggler as it continues after being interrupted
832 var started = new CountDownLatch(1);
833 var done = new AtomicBoolean();
834 scope.fork(() -> {
835 started.countDown();
836 try {
837 Thread.sleep(Duration.ofDays(1));
838 } catch (InterruptedException e) {
839 // interrupted by cancel, expected
840 }
841 Thread.sleep(Duration.ofMillis(100)); // force close to wait
842 done.set(true);
843 return null;
844 });
845 started.await();
846
847 // fork second subtask, the scope should be cancelled when this subtask completes
848 scope.fork(() -> "bar");
849 awaitCancelled(scope);
850
851 scope.join();
852
853 // invoke close with interrupt status set
854 Thread.currentThread().interrupt();
855 try {
856 scope.close();
857 } finally {
858 assertTrue(Thread.interrupted()); // clear interrupt status
859 assertTrue(done.get());
860 }
861 }
862 }
863
864 /**
865 * Test interrupting thread waiting in close.
866 */
867 @ParameterizedTest
868 @MethodSource("factories")
869 void testInterruptClose2(ThreadFactory factory) throws Exception {
870 var testJoiner = new CancelAfterOneJoiner<String>();
871 try (var scope = StructuredTaskScope.open(testJoiner,
872 cf -> cf.withThreadFactory(factory))) {
873
874 Thread mainThread = Thread.currentThread();
875
876 // fork first subtask, a straggler as it continues after being interrupted
877 var started = new CountDownLatch(1);
878 var done = new AtomicBoolean();
879 scope.fork(() -> {
880 started.countDown();
881 try {
882 Thread.sleep(Duration.ofDays(1));
883 } catch (InterruptedException e) {
884 // interrupted by cancel, expected
885 }
886
887 // interrupt main thread when it blocks in close
888 interruptThreadAt(mainThread, "java.util.concurrent.StructuredTaskScopeImpl.close");
889
890 Thread.sleep(Duration.ofMillis(100)); // force close to wait
891 done.set(true);
892 return null;
893 });
894 started.await();
895
896 // fork second subtask, the scope should be cancelled when this subtask completes
897 scope.fork(() -> "bar");
898 awaitCancelled(scope);
899
900 scope.join();
901
902 // main thread will be interrupted while blocked in close
903 try {
904 scope.close();
905 } finally {
906 assertTrue(Thread.interrupted()); // clear interrupt status
907 assertTrue(done.get());
908 }
909 }
910 }
911
912 /**
913 * Test that closing an enclosing scope closes the thread flock of a nested scope.
914 */
915 @Test
916 void testCloseThrowsStructureViolation() throws Exception {
917 try (var scope1 = StructuredTaskScope.open(Joiner.awaitAll())) {
918 try (var scope2 = StructuredTaskScope.open(Joiner.awaitAll())) {
919
920 // close enclosing scope
921 try {
922 scope1.close();
923 fail("close did not throw");
924 } catch (StructureViolationException expected) { }
925
926 // underlying flock should be closed
927 var executed = new AtomicBoolean();
928 Subtask<?> subtask = scope2.fork(() -> executed.set(true));
929 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
930 scope2.join();
931 assertFalse(executed.get());
932 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
933 }
934 }
935 }
936
937 /**
938 * Test that isCancelled returns true after close.
939 */
940 @Test
941 void testIsCancelledAfterClose() throws Exception {
942 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
943 assertFalse(scope.isCancelled());
944 scope.close();
945 assertTrue(scope.isCancelled());
946 }
947 }
948
949 /**
950 * Test Joiner.onFork throwing exception.
951 */
952 @Test
953 void testOnForkThrows() throws Exception {
954 var joiner = new Joiner<String, Void>() {
955 @Override
956 public boolean onFork(Subtask<String> subtask) {
957 throw new FooException();
958 }
959 @Override
960 public Void result() {
961 return null;
962 }
963 };
964 try (var scope = StructuredTaskScope.open(joiner)) {
965 assertThrows(FooException.class, () -> scope.fork(() -> "foo"));
966 }
967 }
968
969 /**
970 * Test Joiner.onFork returning true to cancel execution.
971 */
972 @Test
973 void testOnForkCancelsExecution() throws Exception {
974 var joiner = new Joiner<String, Void>() {
975 @Override
976 public boolean onFork(Subtask<String> subtask) {
977 return true;
978 }
979 @Override
980 public Void result() {
981 return null;
982 }
983 };
984 try (var scope = StructuredTaskScope.open(joiner)) {
985 assertFalse(scope.isCancelled());
986 scope.fork(() -> "foo");
987 assertTrue(scope.isCancelled());
988 scope.join();
989 }
990 }
991
992 /**
993 * Test Joiner.onComplete throwing exception causes UHE to be invoked.
994 */
995 @Test
996 void testOnCompleteThrows() throws Exception {
997 var joiner = new Joiner<String, Void>() {
998 @Override
999 public boolean onComplete(Subtask<String> subtask) {
1000 throw new FooException();
1001 }
1002 @Override
1003 public Void result() {
1004 return null;
1005 }
1006 };
1007 var excRef = new AtomicReference<Throwable>();
1008 Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e);
1009 ThreadFactory factory = Thread.ofVirtual()
1010 .uncaughtExceptionHandler(uhe)
1011 .factory();
1012 try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
1013 scope.fork(() -> "foo");
1014 scope.join();
1015 assertInstanceOf(FooException.class, excRef.get());
1016 }
1017 }
1018
1019 /**
1020 * Test Joiner.onComplete returning true to cancel execution.
1021 */
1022 @Test
1023 void testOnCompleteCancelsExecution() throws Exception {
1024 var joiner = new Joiner<String, Void>() {
1025 @Override
1026 public boolean onComplete(Subtask<String> subtask) {
1027 return true;
1028 }
1029 @Override
1030 public Void result() {
1031 return null;
1032 }
1033 };
1034 try (var scope = StructuredTaskScope.open(joiner)) {
1035 assertFalse(scope.isCancelled());
1036 scope.fork(() -> "foo");
1037 awaitCancelled(scope);
1038 scope.join();
1039 }
1040 }
1041
1042 /**
1043 * Test Joiner.onTimeout invoked by owner thread when timeout expires.
1044 */
1045 @Test
1046 void testOnTimeoutInvoked() throws Exception {
1047 var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>();
1048 Thread owner = Thread.currentThread();
1049 var invokeCount = new AtomicInteger();
1050 var joiner = new Joiner<String, Void>() {
1051 @Override
1052 public void onTimeout() {
1053 assertTrue(Thread.currentThread() == owner);
1054 assertTrue(scopeRef.get().isCancelled());
1055 invokeCount.incrementAndGet();
1056 }
1057 @Override
1058 public Void result() {
1059 return null;
1060 }
1061 };
1062 try (var scope = StructuredTaskScope.open(joiner,
1063 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1064 scopeRef.set(scope);
1065 scope.fork(() -> {
1066 Thread.sleep(Duration.ofDays(1));
1067 return null;
1068 });
1069 scope.join();
1070 assertEquals(1, invokeCount.get());
1071 }
1072 }
1073
1074 /**
1075 * Test Joiner.onTimeout throwing an excepiton.
1076 */
1077 @Test
1078 void testOnTimeoutThrows() throws Exception {
1079 var joiner = new Joiner<String, Void>() {
1080 @Override
1081 public void onTimeout() {
1082 throw new FooException();
1083 }
1084 @Override
1085 public Void result() {
1086 return null;
1087 }
1088 };
1089 try (var scope = StructuredTaskScope.open(joiner,
1090 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1091 // wait for scope to be cancelled by timeout
1092 awaitCancelled(scope);
1093
1094 // join should throw FooException on first usage
1095 assertThrows(FooException.class, scope::join);
1096
1097 // retry after onTimeout fails
1098 assertThrows(IllegalStateException.class, scope::join);
1099 }
1100 }
1101
1102 /**
1103 * Test toString.
1104 */
1105 @Test
1106 void testToString() throws Exception {
1107 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
1108 cf -> cf.withName("duke"))) {
1109
1110 // open
1111 assertTrue(scope.toString().contains("duke"));
1112
1113 // closed
1114 scope.close();
1115 assertTrue(scope.toString().contains("duke"));
1116 }
1117 }
1118
1119 /**
1120 * Test Subtask with task that completes successfully.
1121 */
1122 @ParameterizedTest
1123 @MethodSource("factories")
1124 void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception {
1125 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1126 cf -> cf.withThreadFactory(factory))) {
1127 Subtask<String> subtask = scope.fork(() -> "foo");
1128
1129 // before join, owner thread
1130 assertThrows(IllegalStateException.class, subtask::get);
1131 assertThrows(IllegalStateException.class, subtask::exception);
1132
1133 // before join, another thread
1134 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
1135 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
1136
1137 scope.join();
1138
1139 assertEquals(Subtask.State.SUCCESS, subtask.state());
1140
1141 // after join, owner thread
1142 assertEquals("foo", subtask.get());
1143 assertThrows(IllegalStateException.class, subtask::exception);
1144
1145 // after join, another thread
1146 assertEquals("foo", callInOtherThread(subtask::get));
1147 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
1148 }
1149 }
1150
1151 /**
1152 * Test Subtask with task that fails.
1153 */
1154 @ParameterizedTest
1155 @MethodSource("factories")
1156 void testSubtaskWhenFailed(ThreadFactory factory) throws Exception {
1157 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1158 cf -> cf.withThreadFactory(factory))) {
1159
1160 Subtask<String> subtask = scope.fork(() -> { throw new FooException(); });
1161
1162 // before join, owner thread
1163 assertThrows(IllegalStateException.class, subtask::get);
1164 assertThrows(IllegalStateException.class, subtask::exception);
1165
1166 // before join, another thread
1167 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
1168 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
1169
1170 scope.join();
1171
1172 assertEquals(Subtask.State.FAILED, subtask.state());
1173
1174 // after join, owner thread
1175 assertThrows(IllegalStateException.class, subtask::get);
1176 assertTrue(subtask.exception() instanceof FooException);
1177
1178 // after join, another thread
1179 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
1180 assertTrue(callInOtherThread(subtask::exception) instanceof FooException);
1181 }
1182 }
1183
1184 /**
1185 * Test Subtask with a task that has not completed.
1186 */
1187 @ParameterizedTest
1188 @MethodSource("factories")
1189 void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception {
1190 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
1191 cf -> cf.withThreadFactory(factory))) {
1192 Subtask<Void> subtask = scope.fork(() -> {
1193 Thread.sleep(Duration.ofDays(1));
1194 return null;
1195 });
1196 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1197
1198 // before join, owner thread
1199 assertThrows(IllegalStateException.class, subtask::get);
1200 assertThrows(IllegalStateException.class, subtask::exception);
1201
1202 // before join, another thread
1203 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
1204 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
1205
1206 // attempt join, join throws
1207 Thread.currentThread().interrupt();
1208 assertThrows(InterruptedException.class, scope::join);
1209
1210 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1211
1212 // after join, owner thread
1213 assertThrows(IllegalStateException.class, subtask::get);
1214 assertThrows(IllegalStateException.class, subtask::exception);
1215
1216 // before join, another thread
1217 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
1218 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
1219 }
1220 }
1221
1222 /**
1223 * Test Subtask forked after execution cancelled.
1224 */
1225 @ParameterizedTest
1226 @MethodSource("factories")
1227 void testSubtaskWhenCancelled(ThreadFactory factory) throws Exception {
1228 try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) {
1229 scope.fork(() -> "foo");
1230 awaitCancelled(scope);
1231
1232 var subtask = scope.fork(() -> "foo");
1233
1234 // before join, owner thread
1235 assertThrows(IllegalStateException.class, subtask::get);
1236 assertThrows(IllegalStateException.class, subtask::exception);
1237
1238 // before join, another thread
1239 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
1240 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
1241
1242 scope.join();
1243
1244 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1245
1246 // after join, owner thread
1247 assertThrows(IllegalStateException.class, subtask::get);
1248 assertThrows(IllegalStateException.class, subtask::exception);
1249
1250 // before join, another thread
1251 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
1252 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
1253 }
1254 }
1255
1256 /**
1257 * Test Subtask::toString.
1258 */
1259 @Test
1260 void testSubtaskToString() throws Exception {
1261 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1262 var latch = new CountDownLatch(1);
1263 var subtask1 = scope.fork(() -> {
1264 latch.await();
1265 return "foo";
1266 });
1267 var subtask2 = scope.fork(() -> { throw new FooException(); });
1268
1269 // subtask1 result is unavailable
1270 assertTrue(subtask1.toString().contains("Unavailable"));
1271 latch.countDown();
1272
1273 scope.join();
1274
1275 assertTrue(subtask1.toString().contains("Completed successfully"));
1276 assertTrue(subtask2.toString().contains("Failed"));
1277 }
1278 }
1279
1280 /**
1281 * Test Joiner.allSuccessfulOrThrow() with no subtasks.
1282 */
1283 @Test
1284 void testAllSuccessfulOrThrow1() throws Throwable {
1285 try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow())) {
1286 var results = scope.join();
1287 assertTrue(results.isEmpty());
1288 }
1289 }
1290
1291 /**
1292 * Test Joiner.allSuccessfulOrThrow() with subtasks that complete successfully.
1293 */
1294 @ParameterizedTest
1295 @MethodSource("factories")
1296 void testAllSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
1297 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1298 cf -> cf.withThreadFactory(factory))) {
1299 scope.fork(() -> "foo");
1300 scope.fork(() -> "bar");
1301 var results = scope.join();
1302 assertEquals(List.of("foo", "bar"), results);
1303 }
1304 }
1305
1306 /**
1307 * Test Joiner.allSuccessfulOrThrow() with a subtask that complete successfully and
1308 * a subtask that fails.
1309 */
1310 @ParameterizedTest
1311 @MethodSource("factories")
1312 void testAllSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1313 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1314 cf -> cf.withThreadFactory(factory))) {
1315 scope.fork(() -> "foo");
1316 scope.fork(() -> { throw new FooException(); });
1317 try {
1318 scope.join();
1319 } catch (FailedException e) {
1320 assertTrue(e.getCause() instanceof FooException);
1321 }
1322 }
1323 }
1324
1325 /**
1326 * Test Joiner.allSuccessfulOrThrow() with a timeout.
1327 */
1328 @Test
1329 void testAllSuccessfulOrThrow4() throws Exception {
1330 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1331 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1332 scope.fork(() -> "foo");
1333 scope.fork(() -> {
1334 Thread.sleep(Duration.ofDays(1));
1335 return "bar";
1336 });
1337 assertThrows(TimeoutException.class, scope::join);
1338
1339 // retry after join throws TimeoutException
1340 assertThrows(IllegalStateException.class, scope::join);
1341 }
1342 }
1343
1344 /**
1345 * Test Joiner.allSuccessfulOrThrow() yields an unmodifiable list.
1346 */
1347 @Test
1348 void testAllSuccessfulOrThrow5() throws Exception {
1349 // empty list
1350 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow())) {
1351 var results = scope.join();
1352 assertEquals(0, results.size());
1353 assertThrows(UnsupportedOperationException.class, () -> results.add("foo"));
1354 }
1355
1356 // non-empty list
1357 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow())) {
1358 scope.fork(() -> "foo");
1359 var results = scope.join();
1360 assertEquals(1, results.size());
1361 assertThrows(UnsupportedOperationException.class, () -> results.add("foo"));
1362 assertThrows(UnsupportedOperationException.class, () -> results.add("bar"));
1363 }
1364 }
1365
1366 /**
1367 * Test Joiner.anySuccessfulOrThrow() with no subtasks.
1368 */
1369 @Test
1370 void testAnySuccessfulOrThrow1() throws Exception {
1371 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow())) {
1372 try {
1373 scope.join();
1374 } catch (FailedException e) {
1375 assertTrue(e.getCause() instanceof NoSuchElementException);
1376 }
1377 }
1378 }
1379
1380 /**
1381 * Test Joiner.anySuccessfulOrThrow() with a subtask that completes successfully.
1382 */
1383 @ParameterizedTest
1384 @MethodSource("factories")
1385 void testAnySuccessfulOrThrow2(ThreadFactory factory) throws Exception {
1386 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulOrThrow(),
1387 cf -> cf.withThreadFactory(factory))) {
1388 scope.fork(() -> "foo");
1389 String result = scope.join();
1390 assertEquals("foo", result);
1391 }
1392 }
1393
1394 /**
1395 * Test Joiner.anySuccessfulOrThrow() with a subtask that completes successfully
1396 * with a null result.
1397 */
1398 @ParameterizedTest
1399 @MethodSource("factories")
1400 void testAnySuccessfulOrThrow3(ThreadFactory factory) throws Exception {
1401 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulOrThrow(),
1402 cf -> cf.withThreadFactory(factory))) {
1403 scope.fork(() -> null);
1404 String result = scope.join();
1405 assertNull(result);
1406 }
1407 }
1408
1409 /**
1410 * Test Joiner.anySuccessfulOrThrow() with a subtask that complete succcessfully
1411 * and a subtask that fails.
1412 */
1413 @ParameterizedTest
1414 @MethodSource("factories")
1415 void testAnySuccessfulOrThrow4(ThreadFactory factory) throws Exception {
1416 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulOrThrow(),
1417 cf -> cf.withThreadFactory(factory))) {
1418 scope.fork(() -> "foo");
1419 scope.fork(() -> { throw new FooException(); });
1420 String first = scope.join();
1421 assertEquals("foo", first);
1422 }
1423 }
1424
1425 /**
1426 * Test Joiner.anySuccessfulOrThrow() with a subtask that fails.
1427 */
1428 @ParameterizedTest
1429 @MethodSource("factories")
1430 void testAnySuccessfulOrThrow5(ThreadFactory factory) throws Exception {
1431 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow(),
1432 cf -> cf.withThreadFactory(factory))) {
1433 scope.fork(() -> { throw new FooException(); });
1434 Throwable ex = assertThrows(FailedException.class, scope::join);
1435 assertTrue(ex.getCause() instanceof FooException);
1436 }
1437 }
1438
1439 /**
1440 * Test Joiner.anySuccessfulOrThrow() with a timeout.
1441 */
1442 @Test
1443 void anySuccessfulOrThrow6() throws Exception {
1444 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulOrThrow(),
1445 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1446 scope.fork(() -> { throw new FooException(); });
1447 scope.fork(() -> {
1448 Thread.sleep(Duration.ofDays(1));
1449 return "bar";
1450 });
1451 assertThrows(TimeoutException.class, scope::join);
1452
1453 // retry after join throws TimeoutException
1454 assertThrows(IllegalStateException.class, scope::join);
1455 }
1456 }
1457
1458 /**
1459 * Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks.
1460 */
1461 @Test
1462 void testAwaitSuccessfulOrThrow1() throws Throwable {
1463 try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {
1464 var result = scope.join();
1465 assertNull(result);
1466 }
1467 }
1468
1469 /**
1470 * Test Joiner.awaitAllSuccessfulOrThrow() with subtasks that complete successfully.
1471 */
1472 @ParameterizedTest
1473 @MethodSource("factories")
1474 void testAwaitSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
1475 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1476 cf -> cf.withThreadFactory(factory))) {
1477 var subtask1 = scope.fork(() -> "foo");
1478 var subtask2 = scope.fork(() -> "bar");
1479 var result = scope.join();
1480 assertNull(result);
1481 assertEquals("foo", subtask1.get());
1482 assertEquals("bar", subtask2.get());
1483 }
1484 }
1485
1486 /**
1487 * Test Joiner.awaitAllSuccessfulOrThrow() with a subtask that complete successfully and
1488 * a subtask that fails.
1489 */
1490 @ParameterizedTest
1491 @MethodSource("factories")
1492 void testAwaitSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1493 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1494 cf -> cf.withThreadFactory(factory))) {
1495 scope.fork(() -> "foo");
1496 scope.fork(() -> { throw new FooException(); });
1497 try {
1498 scope.join();
1499 } catch (FailedException e) {
1500 assertTrue(e.getCause() instanceof FooException);
1501 }
1502 }
1503 }
1504
1505 /**
1506 * Test Joiner.awaitAllSuccessfulOrThrow() with a timeout.
1507 */
1508 @Test
1509 void testAwaitSuccessfulOrThrow4() throws Exception {
1510 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1511 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1512 scope.fork(() -> "foo");
1513 scope.fork(() -> {
1514 Thread.sleep(Duration.ofDays(1));
1515 return "bar";
1516 });
1517 assertThrows(TimeoutException.class, scope::join);
1518
1519 // retry after join throws TimeoutException
1520 assertThrows(IllegalStateException.class, scope::join);
1521 }
1522 }
1523
1524 /**
1525 * Test Joiner.awaitAll() with no subtasks.
1526 */
1527 @Test
1528 void testAwaitAll1() throws Throwable {
1529 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1530 var result = scope.join();
1531 assertNull(result);
1532 }
1533 }
1534
1535 /**
1536 * Test Joiner.awaitAll() with subtasks that complete successfully.
1537 */
1538 @ParameterizedTest
1539 @MethodSource("factories")
1540 void testAwaitAll2(ThreadFactory factory) throws Throwable {
1541 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1542 cf -> cf.withThreadFactory(factory))) {
1543 var subtask1 = scope.fork(() -> "foo");
1544 var subtask2 = scope.fork(() -> "bar");
1545 var result = scope.join();
1546 assertNull(result);
1547 assertEquals("foo", subtask1.get());
1548 assertEquals("bar", subtask2.get());
1549 }
1550 }
1551
1552 /**
1553 * Test Joiner.awaitAll() with a subtask that complete successfully and a subtask
1554 * that fails.
1555 */
1556 @ParameterizedTest
1557 @MethodSource("factories")
1558 void testAwaitAll3(ThreadFactory factory) throws Throwable {
1559 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1560 cf -> cf.withThreadFactory(factory))) {
1561 var subtask1 = scope.fork(() -> "foo");
1562 var subtask2 = scope.fork(() -> { throw new FooException(); });
1563 var result = scope.join();
1564 assertNull(result);
1565 assertEquals("foo", subtask1.get());
1566 assertTrue(subtask2.exception() instanceof FooException);
1567 }
1568 }
1569
1570 /**
1571 * Test Joiner.awaitAll() with a timeout.
1572 */
1573 @Test
1574 void testAwaitAll4() throws Exception {
1575 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1576 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1577 scope.fork(() -> "foo");
1578 scope.fork(() -> {
1579 Thread.sleep(Duration.ofDays(1));
1580 return "bar";
1581 });
1582 assertThrows(TimeoutException.class, scope::join);
1583
1584 // retry after join throws TimeoutException
1585 assertThrows(IllegalStateException.class, scope::join);
1586 }
1587 }
1588
1589 /**
1590 * Test Joiner.allUntil(Predicate) with no subtasks.
1591 */
1592 @Test
1593 void testAllUntil1() throws Throwable {
1594 try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) {
1595 var subtasks = scope.join();
1596 assertEquals(0, subtasks.size());
1597 }
1598 }
1599
1600 /**
1601 * Test Joiner.allUntil(Predicate) with no cancellation.
1602 */
1603 @ParameterizedTest
1604 @MethodSource("factories")
1605 void testAllUntil2(ThreadFactory factory) throws Exception {
1606 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false),
1607 cf -> cf.withThreadFactory(factory))) {
1608
1609 var subtask1 = scope.fork(() -> "foo");
1610 var subtask2 = scope.fork(() -> { throw new FooException(); });
1611
1612 var subtasks = scope.join();
1613 assertEquals(List.of(subtask1, subtask2), subtasks);
1614
1615 assertEquals("foo", subtask1.get());
1616 assertTrue(subtask2.exception() instanceof FooException);
1617 }
1618 }
1619
1620 /**
1621 * Test Joiner.allUntil(Predicate) with cancellation after one subtask completes.
1622 */
1623 @ParameterizedTest
1624 @MethodSource("factories")
1625 void testAllUntil3(ThreadFactory factory) throws Exception {
1626 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> true),
1627 cf -> cf.withThreadFactory(factory))) {
1628
1629 var subtask1 = scope.fork(() -> "foo");
1630 var subtask2 = scope.fork(() -> {
1631 Thread.sleep(Duration.ofDays(1));
1632 return "bar";
1633 });
1634
1635 var subtasks = scope.join();
1636 assertEquals(List.of(subtask1, subtask2), subtasks);
1637
1638 assertEquals("foo", subtask1.get());
1639 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1640 }
1641 }
1642
1643 /**
1644 * Test Joiner.allUntil(Predicate) with cancellation after serveral subtasks complete.
1645 */
1646 @ParameterizedTest
1647 @MethodSource("factories")
1648 void testAllUntil4(ThreadFactory factory) throws Exception {
1649
1650 // cancel execution after two or more failures
1651 class CancelAfterTwoFailures<T> implements Predicate<Subtask<T>> {
1652 final AtomicInteger failedCount = new AtomicInteger();
1653 @Override
1654 public boolean test(Subtask<T> subtask) {
1655 return subtask.state() == Subtask.State.FAILED
1656 && failedCount.incrementAndGet() >= 2;
1657 }
1658 }
1659 var joiner = Joiner.allUntil(new CancelAfterTwoFailures<String>());
1660
1661 try (var scope = StructuredTaskScope.open(joiner)) {
1662 int forkCount = 0;
1663
1664 // fork subtasks until execution cancelled
1665 while (!scope.isCancelled()) {
1666 scope.fork(() -> "foo");
1667 scope.fork(() -> { throw new FooException(); });
1668 forkCount += 2;
1669 Thread.sleep(Duration.ofMillis(20));
1670 }
1671
1672 var subtasks = scope.join();
1673 assertEquals(forkCount, subtasks.size());
1674
1675 long failedCount = subtasks.stream()
1676 .filter(s -> s.state() == Subtask.State.FAILED)
1677 .count();
1678 assertTrue(failedCount >= 2);
1679 }
1680 }
1681
1682 /**
1683 * Test Test Joiner.allUntil(Predicate) where the Predicate's test method throws.
1684 */
1685 @Test
1686 void testAllUntil5() throws Exception {
1687 var joiner = Joiner.allUntil(_ -> { throw new FooException(); });
1688 var excRef = new AtomicReference<Throwable>();
1689 Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e);
1690 ThreadFactory factory = Thread.ofVirtual()
1691 .uncaughtExceptionHandler(uhe)
1692 .factory();
1693 try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
1694 scope.fork(() -> "foo");
1695 scope.join();
1696 assertInstanceOf(FooException.class, excRef.get());
1697 }
1698 }
1699
1700 /**
1701 * Test Joiner.allUntil(Predicate) with a timeout.
1702 */
1703 @Test
1704 void testAllUntil6() throws Exception {
1705 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false),
1706 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1707 var subtask1 = scope.fork(() -> "foo");
1708 var subtask2 = scope.fork(() -> {
1709 Thread.sleep(Duration.ofDays(1));
1710 return "bar";
1711 });
1712
1713 // TimeoutException should not be thrown
1714 var subtasks = scope.join();
1715
1716 // stream should have two elements, subtask1 may or may not have completed
1717 assertEquals(List.of(subtask1, subtask2), subtasks);
1718 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1719
1720 // retry after join throws TimeoutException
1721 assertThrows(IllegalStateException.class, scope::join);
1722 }
1723 }
1724
1725 /**
1726 * Test Joiner.allUntil(Predicate) yields an unmodifiable list.
1727 */
1728 @Test
1729 void testAllUntil7() throws Exception {
1730 Subtask<String> subtask1;
1731 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false))) {
1732 subtask1 = scope.fork(() -> "?");
1733 scope.join();
1734 }
1735
1736 // empty list
1737 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false))) {
1738 var subtasks = scope.join();
1739 assertEquals(0, subtasks.size());
1740 assertThrows(UnsupportedOperationException.class, () -> subtasks.add(subtask1));
1741 }
1742
1743 // non-empty list
1744 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false))) {
1745 var subtask2 = scope.fork(() -> "foo");
1746 var subtasks = scope.join();
1747 assertEquals(1, subtasks.size());
1748 assertThrows(UnsupportedOperationException.class, () -> subtasks.add(subtask1));
1749 assertThrows(UnsupportedOperationException.class, () -> subtasks.add(subtask2));
1750 }
1751 }
1752
1753 /**
1754 * Test Joiner default methods.
1755 */
1756 @Test
1757 void testJoinerDefaultMethods() throws Exception {
1758 try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) {
1759
1760 // need subtasks to test default methods
1761 var subtask1 = scope.fork(() -> "foo");
1762 awaitCancelled(scope);
1763 var subtask2 = scope.fork(() -> "bar");
1764 scope.join();
1765
1766 assertEquals(Subtask.State.SUCCESS, subtask1.state());
1767 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1768
1769 // Joiner that does not override default methods
1770 Joiner<String, Void> joiner = () -> null;
1771 assertThrows(NullPointerException.class, () -> joiner.onFork(null));
1772 assertThrows(NullPointerException.class, () -> joiner.onComplete(null));
1773 assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1));
1774 assertFalse(joiner.onFork(subtask2));
1775 assertFalse(joiner.onComplete(subtask1));
1776 assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2));
1777 assertThrows(TimeoutException.class, joiner::onTimeout);
1778 }
1779 }
1780
1781 /**
1782 * Test Joiners onFork/onComplete methods with a subtask in an unexpected state.
1783 */
1784 @Test
1785 void testJoinersWithUnavailableResult() throws Exception {
1786 try (var scope = StructuredTaskScope.open()) {
1787 var done = new CountDownLatch(1);
1788 var subtask = scope.fork(() -> {
1789 done.await();
1790 return null;
1791 });
1792
1793 // onComplete with uncompleted task should throw IAE
1794 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1795 assertThrows(IllegalArgumentException.class,
1796 () -> Joiner.allSuccessfulOrThrow().onComplete(subtask));
1797 assertThrows(IllegalArgumentException.class,
1798 () -> Joiner.anySuccessfulOrThrow().onComplete(subtask));
1799 assertThrows(IllegalArgumentException.class,
1800 () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(subtask));
1801 assertThrows(IllegalArgumentException.class,
1802 () -> Joiner.awaitAll().onComplete(subtask));
1803 assertThrows(IllegalArgumentException.class,
1804 () -> Joiner.allUntil(_ -> false).onComplete(subtask));
1805
1806 done.countDown();
1807 scope.join();
1808
1809 // onFork with completed task should throw IAE
1810 assertEquals(Subtask.State.SUCCESS, subtask.state());
1811 assertThrows(IllegalArgumentException.class,
1812 () -> Joiner.allSuccessfulOrThrow().onFork(subtask));
1813 assertThrows(IllegalArgumentException.class,
1814 () -> Joiner.anySuccessfulOrThrow().onFork(subtask));
1815 assertThrows(IllegalArgumentException.class,
1816 () -> Joiner.awaitAllSuccessfulOrThrow().onFork(subtask));
1817 assertThrows(IllegalArgumentException.class,
1818 () -> Joiner.awaitAll().onFork(subtask));
1819 assertThrows(IllegalArgumentException.class,
1820 () -> Joiner.allUntil(_ -> false).onFork(subtask));
1821 }
1822
1823 }
1824
1825 /**
1826 * Test the Configuration function apply method throwing an exception.
1827 */
1828 @Test
1829 void testConfigFunctionThrows() throws Exception {
1830 assertThrows(FooException.class,
1831 () -> StructuredTaskScope.open(Joiner.awaitAll(),
1832 cf -> { throw new FooException(); }));
1833 }
1834
1835 /**
1836 * Test Configuration equals/hashCode/toString
1837 */
1838 @Test
1839 void testConfigMethods() throws Exception {
1840 UnaryOperator<Configuration> configOperator = cf -> {
1841 var name = "duke";
1842 var threadFactory = Thread.ofPlatform().factory();
1843 var timeout = Duration.ofSeconds(10);
1844
1845 assertEquals(cf, cf);
1846 assertEquals(cf.withName(name), cf.withName(name));
1847 assertEquals(cf.withThreadFactory(threadFactory), cf.withThreadFactory(threadFactory));
1848 assertEquals(cf.withTimeout(timeout), cf.withTimeout(timeout));
1849
1850 assertNotEquals(cf, cf.withName(name));
1851 assertNotEquals(cf, cf.withThreadFactory(threadFactory));
1852 assertNotEquals(cf, cf.withTimeout(timeout));
1853
1854 assertEquals(cf.withName(name).hashCode(), cf.withName(name).hashCode());
1855 assertEquals(cf.withThreadFactory(threadFactory).hashCode(),
1856 cf.withThreadFactory(threadFactory).hashCode());
1857 assertEquals(cf.withTimeout(timeout).hashCode(), cf.withTimeout(timeout).hashCode());
1858
1859 assertTrue(cf.withName(name).toString().contains(name));
1860 assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
1861 assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
1862
1863 return cf;
1864 };
1865 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), configOperator)) {
1866 // do nothing
1867 }
1868 }
1869
1870 /**
1871 * Test for NullPointerException.
1872 */
1873 @Test
1874 void testNulls() throws Exception {
1875 assertThrows(NullPointerException.class,
1876 () -> StructuredTaskScope.open(null));
1877 assertThrows(NullPointerException.class,
1878 () -> StructuredTaskScope.open(null, cf -> cf));
1879 assertThrows(NullPointerException.class,
1880 () -> StructuredTaskScope.open(Joiner.awaitAll(), null));
1881
1882 assertThrows(NullPointerException.class, () -> Joiner.allUntil(null));
1883
1884 // fork
1885 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1886 assertThrows(NullPointerException.class, () -> scope.fork((Callable<Object>) null));
1887 assertThrows(NullPointerException.class, () -> scope.fork((Runnable) null));
1888 }
1889
1890 // Configuration and withXXX methods
1891 assertThrows(NullPointerException.class,
1892 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> null));
1893 assertThrows(NullPointerException.class,
1894 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withName(null)));
1895 assertThrows(NullPointerException.class,
1896 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withThreadFactory(null)));
1897 assertThrows(NullPointerException.class,
1898 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withTimeout(null)));
1899
1900 // Joiner.onFork/onComplete
1901 assertThrows(NullPointerException.class,
1902 () -> Joiner.awaitAllSuccessfulOrThrow().onFork(null));
1903 assertThrows(NullPointerException.class,
1904 () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(null));
1905 assertThrows(NullPointerException.class,
1906 () -> Joiner.awaitAll().onFork(null));
1907 assertThrows(NullPointerException.class,
1908 () -> Joiner.awaitAll().onComplete(null));
1909 assertThrows(NullPointerException.class,
1910 () -> Joiner.allSuccessfulOrThrow().onFork(null));
1911 assertThrows(NullPointerException.class,
1912 () -> Joiner.allSuccessfulOrThrow().onComplete(null));
1913 assertThrows(NullPointerException.class,
1914 () -> Joiner.anySuccessfulOrThrow().onFork(null));
1915 assertThrows(NullPointerException.class,
1916 () -> Joiner.anySuccessfulOrThrow().onComplete(null));
1917 }
1918
1919 /**
1920 * ThreadFactory that counts usage.
1921 */
1922 private static class CountingThreadFactory implements ThreadFactory {
1923 final ThreadFactory delegate;
1924 final AtomicInteger threadCount = new AtomicInteger();
1925 CountingThreadFactory(ThreadFactory delegate) {
1926 this.delegate = delegate;
1927 }
1928 @Override
1929 public Thread newThread(Runnable task) {
1930 threadCount.incrementAndGet();
1931 return delegate.newThread(task);
1932 }
1933 int threadCount() {
1934 return threadCount.get();
1935 }
1936 }
1937
1938 /**
1939 * A joiner that counts that counts the number of subtasks that are forked and the
1940 * number of subtasks that complete.
1941 */
1942 private static class CountingJoiner<T> implements Joiner<T, Void> {
1943 final AtomicInteger onForkCount = new AtomicInteger();
1944 final AtomicInteger onCompleteCount = new AtomicInteger();
1945 @Override
1946 public boolean onFork(Subtask<T> subtask) {
1947 onForkCount.incrementAndGet();
1948 return false;
1949 }
1950 @Override
1951 public boolean onComplete(Subtask<T> subtask) {
1952 onCompleteCount.incrementAndGet();
1953 return false;
1954 }
1955 @Override
1956 public Void result() {
1957 return null;
1958 }
1959 int onForkCount() {
1960 return onForkCount.get();
1961 }
1962 int onCompleteCount() {
1963 return onCompleteCount.get();
1964 }
1965 }
1966
1967 /**
1968 * A joiner that cancels execution when a subtask completes. It also keeps a count
1969 * of the number of subtasks that are forked and the number of subtasks that complete.
1970 */
1971 private static class CancelAfterOneJoiner<T> implements Joiner<T, Void> {
1972 final AtomicInteger onForkCount = new AtomicInteger();
1973 final AtomicInteger onCompleteCount = new AtomicInteger();
1974 @Override
1975 public boolean onFork(Subtask<T> subtask) {
1976 onForkCount.incrementAndGet();
1977 return false;
1978 }
1979 @Override
1980 public boolean onComplete(Subtask<T> subtask) {
1981 onCompleteCount.incrementAndGet();
1982 return true;
1983 }
1984 @Override
1985 public Void result() {
1986 return null;
1987 }
1988 int onForkCount() {
1989 return onForkCount.get();
1990 }
1991 int onCompleteCount() {
1992 return onCompleteCount.get();
1993 }
1994 }
1995
1996 /**
1997 * A runtime exception for tests.
1998 */
1999 private static class FooException extends RuntimeException {
2000 FooException() { }
2001 FooException(Throwable cause) { super(cause); }
2002 }
2003
2004 /**
2005 * Returns the current time in milliseconds.
2006 */
2007 private long millisTime() {
2008 long now = System.nanoTime();
2009 return TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);
2010 }
2011
2012 /**
2013 * Check the duration of a task
2014 * @param start start time, in milliseconds
2015 * @param min minimum expected duration, in milliseconds
2016 * @param max maximum expected duration, in milliseconds
2017 * @return the duration (now - start), in milliseconds
2018 */
2019 private long expectDuration(long start, long min, long max) {
2020 long duration = millisTime() - start;
2021 assertTrue(duration >= min,
2022 "Duration " + duration + "ms, expected >= " + min + "ms");
2023 assertTrue(duration <= max,
2024 "Duration " + duration + "ms, expected <= " + max + "ms");
2025 return duration;
2026 }
2027
2028 /**
2029 * Wait for the given scope to be cancelled.
2030 */
2031 private static void awaitCancelled(StructuredTaskScope<?, ?> scope) throws InterruptedException {
2032 while (!scope.isCancelled()) {
2033 Thread.sleep(Duration.ofMillis(20));
2034 }
2035 }
2036
2037 /**
2038 * Interrupts a thread when it waits (timed or untimed) at location "{@code c.m}".
2039 * {@code c} is the fully qualified class name and {@code m} is the method name.
2040 */
2041 private void interruptThreadAt(Thread target, String location) throws InterruptedException {
2042 int index = location.lastIndexOf('.');
2043 String className = location.substring(0, index);
2044 String methodName = location.substring(index + 1);
2045
2046 boolean found = false;
2047 while (!found) {
2048 Thread.State state = target.getState();
2049 assertTrue(state != TERMINATED);
2050 if ((state == WAITING || state == TIMED_WAITING)
2051 && contains(target.getStackTrace(), className, methodName)) {
2052 found = true;
2053 } else {
2054 Thread.sleep(20);
2055 }
2056 }
2057 target.interrupt();
2058 }
2059
2060 /**
2061 * Schedules the current thread to be interrupted when it waits (timed or untimed)
2062 * at the given location.
2063 */
2064 private void scheduleInterruptAt(String location) {
2065 Thread target = Thread.currentThread();
2066 scheduler.submit(() -> {
2067 interruptThreadAt(target, location);
2068 return null;
2069 });
2070 }
2071
2072 /**
2073 * Calls a result returning task from another thread.
2074 */
2075 private <V> V callInOtherThread(Callable<V> task) throws Exception {
2076 var result = new AtomicReference<V>();
2077 var exc = new AtomicReference<Exception>();
2078 Thread thread = Thread.ofVirtual().start(() -> {
2079 try {
2080 result.set(task.call());
2081 } catch (Exception e) {
2082 exc.set(e);
2083 }
2084 });
2085 boolean interrupted = false;
2086 boolean terminated = false;
2087 while (!terminated) {
2088 try {
2089 thread.join();
2090 terminated = true;
2091 } catch (InterruptedException e) {
2092 interrupted = true;
2093 }
2094 }
2095 if (interrupted) {
2096 Thread.currentThread().interrupt();
2097 }
2098 Exception e = exc.get();
2099 if (e != null) {
2100 throw e;
2101 } else {
2102 return result.get();
2103 }
2104 }
2105
2106 /**
2107 * Returns true if the given stack trace contains an element for the given class
2108 * and method name.
2109 */
2110 private boolean contains(StackTraceElement[] stack, String className, String methodName) {
2111 return Arrays.stream(stack)
2112 .anyMatch(e -> className.equals(e.getClassName())
2113 && methodName.equals(e.getMethodName()));
2114 }
2115 }