1 /*
2 * Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /*
25 * @test id=platform
26 * @bug 8284199 8296779 8306647
27 * @summary Basic tests for StructuredTaskScope
28 * @enablePreview
29 * @run junit/othervm -DthreadFactory=platform StructuredTaskScopeTest
30 */
31
32 /*
33 * @test id=virtual
34 * @enablePreview
35 * @run junit/othervm -DthreadFactory=virtual StructuredTaskScopeTest
36 */
37
38 import java.time.Duration;
39 import java.io.IOException;
40 import java.time.Instant;
41 import java.util.Arrays;
42 import java.util.ArrayList;
43 import java.util.List;
44 import java.util.Set;
45 import java.util.concurrent.Callable;
46 import java.util.concurrent.ConcurrentHashMap;
47 import java.util.concurrent.CountDownLatch;
48 import java.util.concurrent.Executors;
49 import java.util.concurrent.ExecutionException;
50 import java.util.concurrent.Future;
51 import java.util.concurrent.ThreadFactory;
52 import java.util.concurrent.TimeoutException;
53 import java.util.concurrent.TimeUnit;
54 import java.util.concurrent.RejectedExecutionException;
55 import java.util.concurrent.ScheduledExecutorService;
56 import java.util.concurrent.StructuredTaskScope;
57 import java.util.concurrent.StructuredTaskScope.Subtask;
58 import java.util.concurrent.StructuredTaskScope.ShutdownOnSuccess;
59 import java.util.concurrent.StructuredTaskScope.ShutdownOnFailure;
60 import java.util.concurrent.StructureViolationException;
61 import java.util.concurrent.atomic.AtomicBoolean;
62 import java.util.concurrent.atomic.AtomicInteger;
63 import java.util.concurrent.atomic.AtomicReference;
64 import java.util.function.Supplier;
65 import java.util.stream.Stream;
66 import static java.lang.Thread.State.*;
67
68 import org.junit.jupiter.api.Test;
69 import org.junit.jupiter.api.BeforeAll;
70 import org.junit.jupiter.api.AfterAll;
71 import org.junit.jupiter.params.ParameterizedTest;
72 import org.junit.jupiter.params.provider.MethodSource;
73 import static org.junit.jupiter.api.Assertions.*;
74
75 class StructuredTaskScopeTest {
76 private static ScheduledExecutorService scheduler;
77 private static List<ThreadFactory> threadFactories;
78
79 @BeforeAll
80 static void setup() throws Exception {
81 scheduler = Executors.newSingleThreadScheduledExecutor();
82
83 // thread factories
84 String value = System.getProperty("threadFactory");
85 List<ThreadFactory> list = new ArrayList<>();
86 if (value == null || value.equals("platform"))
87 list.add(Thread.ofPlatform().factory());
88 if (value == null || value.equals("virtual"))
89 list.add(Thread.ofVirtual().factory());
90 assertTrue(list.size() > 0, "No thread factories for tests");
91 threadFactories = list;
92 }
93
94 @AfterAll
95 static void shutdown() {
96 scheduler.shutdown();
97 }
98
99 private static Stream<ThreadFactory> factories() {
100 return threadFactories.stream();
101 }
102
103 /**
104 * Test that fork creates a new thread for each task.
105 */
106 @ParameterizedTest
107 @MethodSource("factories")
108 void testForkCreatesThread(ThreadFactory factory) throws Exception {
109 Set<Long> tids = ConcurrentHashMap.newKeySet();
110 try (var scope = new StructuredTaskScope<Object>(null, factory)) {
111 for (int i = 0; i < 100; i++) {
112 scope.fork(() -> {
113 tids.add(Thread.currentThread().threadId());
114 return null;
115 });
116 }
117 scope.join();
118 }
119 assertEquals(100, tids.size());
120 }
121
122 /**
123 * Test that fork creates a new virtual thread for each task.
124 */
125 @Test
126 void testForkCreateVirtualThread() throws Exception {
127 Set<Thread> threads = ConcurrentHashMap.newKeySet();
128 try (var scope = new StructuredTaskScope<Object>()) {
129 for (int i = 0; i < 100; i++) {
130 scope.fork(() -> {
131 threads.add(Thread.currentThread());
132 return null;
133 });
134 }
135 scope.join();
136 }
137 assertEquals(100, threads.size());
138 threads.forEach(t -> assertTrue(t.isVirtual()));
139 }
140
141 /**
142 * Test that fork creates a new thread with the given thread factory.
143 */
144 @ParameterizedTest
145 @MethodSource("factories")
146 void testForkUsesFactory(ThreadFactory factory) throws Exception {
147 var count = new AtomicInteger();
148 ThreadFactory countingFactory = task -> {
149 count.incrementAndGet();
150 return factory.newThread(task);
151 };
152 try (var scope = new StructuredTaskScope<Object>(null, countingFactory)) {
153 for (int i = 0; i < 100; i++) {
154 scope.fork(() -> null);
155 }
156 scope.join();
157 }
158 assertEquals(100, count.get());
159 }
160
161 /**
162 * Test fork is confined to threads in the scope "tree".
163 */
164 @ParameterizedTest
165 @MethodSource("factories")
166 void testForkConfined(ThreadFactory factory) throws Exception {
167 try (var scope1 = new StructuredTaskScope<Boolean>();
168 var scope2 = new StructuredTaskScope<Boolean>()) {
169
170 // thread in scope1 cannot fork thread in scope2
171 Subtask<Boolean> subtask1 = scope1.fork(() -> {
172 assertThrows(WrongThreadException.class, () -> {
173 scope2.fork(() -> null);
174 });
175 return true;
176 });
177
178 // thread in scope2 can fork thread in scope1
179 Subtask<Boolean> subtask2 = scope2.fork(() -> {
180 scope1.fork(() -> null);
181 return true;
182 });
183
184 scope2.join();
185 scope1.join();
186
187 assertTrue(subtask1.get());
188 assertTrue(subtask2.get());
189
190 // random thread cannot fork
191 try (var pool = Executors.newSingleThreadExecutor()) {
192 Future<Void> future = pool.submit(() -> {
193 assertThrows(WrongThreadException.class, () -> {
194 scope1.fork(() -> null);
195 });
196 assertThrows(WrongThreadException.class, () -> {
197 scope2.fork(() -> null);
198 });
199 return null;
200 });
201 future.get();
202 }
203 }
204 }
205
206 /**
207 * Test fork after join completes.
208 */
209 @ParameterizedTest
210 @MethodSource("factories")
211 void testForkAfterJoin(ThreadFactory factory) throws Exception {
212 try (var scope = new StructuredTaskScope<String>(null, factory)) {
213 // round 1
214 var subtask1 = scope.fork(() -> "foo");
215 assertThrows(IllegalStateException.class, subtask1::get);
216 scope.join();
217 assertEquals("foo", subtask1.get());
218
219 // round 2
220 var subtask2 = scope.fork(() -> "bar");
221 assertEquals("foo", subtask1.get());
222 assertThrows(IllegalStateException.class, subtask2::get);
223 scope.join();
224 assertEquals("foo", subtask1.get());
225 assertEquals("bar", subtask2.get());
226
227 // round 3
228 var subtask3 = scope.fork(() -> "baz");
229 assertEquals("foo", subtask1.get());
230 assertEquals("bar", subtask2.get());
231 assertThrows(IllegalStateException.class, subtask3::get);
232 scope.join();
233 assertEquals("foo", subtask1.get());
234 assertEquals("bar", subtask2.get());
235 assertEquals("baz", subtask3.get());
236 }
237 }
238
239 /**
240 * Test fork after join throws.
241 */
242 @ParameterizedTest
243 @MethodSource("factories")
244 void testForkAfterJoinThrows(ThreadFactory factory) throws Exception {
245 try (var scope = new StructuredTaskScope<String>(null, factory)) {
246 var latch = new CountDownLatch(1);
247 var subtask1 = scope.fork(() -> {
248 latch.await();
249 return "foo";
250 });
251
252 // join throws
253 Thread.currentThread().interrupt();
254 assertThrows(InterruptedException.class, scope::join);
255
256 // allow subtask1 to finish
257 latch.countDown();
258
259 // continue to fork
260 var subtask2 = scope.fork(() -> "bar");
261 assertThrows(IllegalStateException.class, subtask1::get);
262 assertThrows(IllegalStateException.class, subtask2::get);
263 scope.join();
264 assertEquals("foo", subtask1.get());
265 assertEquals("bar", subtask2.get());
266 }
267 }
268
269 /**
270 * Test fork after scope is shutdown.
271 */
272 @ParameterizedTest
273 @MethodSource("factories")
274 void testForkAfterShutdown(ThreadFactory factory) throws Exception {
275 var executed = new AtomicBoolean();
276 try (var scope = new StructuredTaskScope<Object>(null, factory)) {
277 scope.shutdown();
278 Subtask<String> subtask = scope.fork(() -> {
279 executed.set(true);
280 return null;
281 });
282 scope.join();
283 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
284 assertThrows(IllegalStateException.class, subtask::get);
285 assertThrows(IllegalStateException.class, subtask::exception);
286 }
287 assertFalse(executed.get());
288 }
289
290 /**
291 * Test fork after scope is closed.
292 */
293 @ParameterizedTest
294 @MethodSource("factories")
295 void testForkAfterClose(ThreadFactory factory) throws Exception {
296 try (var scope = new StructuredTaskScope<Object>(null, factory)) {
297 scope.close();
298 assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
299 }
300 }
301
302 /**
303 * Test fork when the thread factory rejects creating a thread.
304 */
305 @Test
306 void testForkRejectedExecutionException() throws Exception {
307 ThreadFactory factory = task -> null;
308 try (var scope = new StructuredTaskScope(null, factory)) {
309 assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null));
310 scope.join();
311 }
312 }
313
314 /**
315 * Test join with no subtasks.
316 */
317 @Test
318 void testJoinWithNoSubtasks() throws Exception {
319 try (var scope = new StructuredTaskScope()) {
320 scope.join();
321 }
322 }
323
324 /**
325 * Test join with unfinished subtasks.
326 */
327 @ParameterizedTest
328 @MethodSource("factories")
329 void testJoinWithSubtasks(ThreadFactory factory) throws Exception {
330 try (var scope = new StructuredTaskScope(null, factory)) {
331 Subtask<String> subtask = scope.fork(() -> {
332 Thread.sleep(Duration.ofMillis(50));
333 return "foo";
334 });
335 scope.join();
336 assertEquals("foo", subtask.get());
337 }
338 }
339
340 /**
341 * Test join is owner confined.
342 */
343 @ParameterizedTest
344 @MethodSource("factories")
345 void testJoinConfined(ThreadFactory factory) throws Exception {
346 try (var scope = new StructuredTaskScope<Boolean>()) {
347
348 // thread in scope cannot join
349 Subtask<Boolean> subtask = scope.fork(() -> {
350 assertThrows(WrongThreadException.class, () -> { scope.join(); });
351 return true;
352 });
353
354 scope.join();
355
356 assertTrue(subtask.get());
357
358 // random thread cannot join
359 try (var pool = Executors.newSingleThreadExecutor()) {
360 Future<Void> future = pool.submit(() -> {
361 assertThrows(WrongThreadException.class, scope::join);
362 return null;
363 });
364 future.get();
365 }
366 }
367 }
368
369 /**
370 * Test join with interrupt status set.
371 */
372 @ParameterizedTest
373 @MethodSource("factories")
374 void testInterruptJoin1(ThreadFactory factory) throws Exception {
375 try (var scope = new StructuredTaskScope(null, factory)) {
376 var latch = new CountDownLatch(1);
377
378 Subtask<String> subtask = scope.fork(() -> {
379 latch.await();
380 return "foo";
381 });
382
383 // join should throw
384 Thread.currentThread().interrupt();
385 try {
386 scope.join();
387 fail("join did not throw");
388 } catch (InterruptedException expected) {
389 assertFalse(Thread.interrupted()); // interrupt status should be clear
390 } finally {
391 // let task continue
392 latch.countDown();
393 }
394
395 // join should complete
396 scope.join();
397 assertEquals("foo", subtask.get());
398 }
399 }
400
401 /**
402 * Test interrupt of thread blocked in join.
403 */
404 @ParameterizedTest
405 @MethodSource("factories")
406 void testInterruptJoin2(ThreadFactory factory) throws Exception {
407 try (var scope = new StructuredTaskScope(null, factory)) {
408 var latch = new CountDownLatch(1);
409 Subtask<String> subtask = scope.fork(() -> {
410 latch.await();
411 return "foo";
412 });
413
414 // join should throw
415 scheduleInterruptAt("java.util.concurrent.StructuredTaskScope.join");
416 try {
417 scope.join();
418 fail("join did not throw");
419 } catch (InterruptedException expected) {
420 assertFalse(Thread.interrupted()); // interrupt status should be clear
421 } finally {
422 // let task continue
423 latch.countDown();
424 }
425
426 // join should complete
427 scope.join();
428 assertEquals("foo", subtask.get());
429 }
430 }
431
432 /**
433 * Test join when scope is shutdown.
434 */
435 @ParameterizedTest
436 @MethodSource("factories")
437 void testJoinWithShutdown1(ThreadFactory factory) throws Exception {
438 try (var scope = new StructuredTaskScope<String>(null, factory)) {
439 var interrupted = new CountDownLatch(1);
440 var finish = new CountDownLatch(1);
441
442 Subtask<String> subtask = scope.fork(() -> {
443 try {
444 Thread.sleep(Duration.ofDays(1));
445 } catch (InterruptedException e) {
446 interrupted.countDown();
447 }
448 finish.await();
449 return "foo";
450 });
451
452 scope.shutdown(); // should interrupt task
453
454 interrupted.await();
455
456 scope.join();
457
458 // signal task to finish
459 finish.countDown();
460 }
461 }
462
463 /**
464 * Test shutdown when owner is blocked in join.
465 */
466 @ParameterizedTest
467 @MethodSource("factories")
468 void testJoinWithShutdown2(ThreadFactory factory) throws Exception {
469 class MyScope<T> extends StructuredTaskScope<T> {
470 MyScope(ThreadFactory factory) {
471 super(null, factory);
472 }
473 @Override
474 protected void handleComplete(Subtask<? extends T> subtask) {
475 shutdown();
476 }
477 }
478
479 try (var scope = new MyScope<String>(factory)) {
480 Subtask<String> subtask1 = scope.fork(() -> {
481 Thread.sleep(Duration.ofMillis(50));
482 return "foo";
483 });
484 Subtask<String> subtask2 = scope.fork(() -> {
485 Thread.sleep(Duration.ofDays(1));
486 return "bar";
487 });
488
489 // join should wakeup when shutdown is called
490 scope.join();
491
492 // task1 should have completed successfully
493 assertEquals(Subtask.State.SUCCESS, subtask1.state());
494 assertEquals("foo", subtask1.get());
495 assertThrows(IllegalStateException.class, subtask1::exception);
496
497 // task2 result/exception not available
498 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
499 assertThrows(IllegalStateException.class, subtask2::get);
500 assertThrows(IllegalStateException.class, subtask2::exception);
501 }
502 }
503
504 /**
505 * Test join after scope is closed.
506 */
507 @Test
508 void testJoinAfterClose() throws Exception {
509 try (var scope = new StructuredTaskScope()) {
510 scope.join();
511 scope.close();
512 assertThrows(IllegalStateException.class, () -> scope.join());
513 assertThrows(IllegalStateException.class, () -> scope.joinUntil(Instant.now()));
514 }
515 }
516
517 /**
518 * Test joinUntil, subtasks finish before deadline expires.
519 */
520 @ParameterizedTest
521 @MethodSource("factories")
522 void testJoinUntil1(ThreadFactory factory) throws Exception {
523 try (var scope = new StructuredTaskScope<String>(null, factory)) {
524 Subtask<String> subtask = scope.fork(() -> {
525 try {
526 Thread.sleep(Duration.ofSeconds(2));
527 } catch (InterruptedException e) { }
528 return "foo";
529 });
530
531 long startMillis = millisTime();
532 scope.joinUntil(Instant.now().plusSeconds(30));
533 expectDuration(startMillis, /*min*/1900, /*max*/20_000);
534 assertEquals("foo", subtask.get());
535 }
536 }
537
538 /**
539 * Test joinUntil, deadline expires before subtasks finish.
540 */
541 @ParameterizedTest
542 @MethodSource("factories")
543 void testJoinUntil2(ThreadFactory factory) throws Exception {
544 try (var scope = new StructuredTaskScope<Object>(null, factory)) {
545 Subtask<Void> subtask = scope.fork(() -> {
546 Thread.sleep(Duration.ofDays(1));
547 return null;
548 });
549
550 long startMillis = millisTime();
551 try {
552 scope.joinUntil(Instant.now().plusSeconds(2));
553 } catch (TimeoutException e) {
554 expectDuration(startMillis, /*min*/1900, /*max*/20_000);
555 }
556 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
557 }
558 }
559
560 /**
561 * Test joinUntil many times.
562 */
563 @ParameterizedTest
564 @MethodSource("factories")
565 void testJoinUntil3(ThreadFactory factory) throws Exception {
566 try (var scope = new StructuredTaskScope<String>(null, factory)) {
567 Subtask<String> subtask = scope.fork(() -> {
568 Thread.sleep(Duration.ofDays(1));
569 return null;
570 });
571
572 for (int i = 0; i < 3; i++) {
573 try {
574 scope.joinUntil(Instant.now().plusMillis(50));
575 fail("joinUntil did not throw");
576 } catch (TimeoutException expected) {
577 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
578 }
579 }
580 }
581 }
582
583 /**
584 * Test joinUntil with a deadline that has already expired.
585 */
586 @ParameterizedTest
587 @MethodSource("factories")
588 void testJoinUntil4(ThreadFactory factory) throws Exception {
589 try (var scope = new StructuredTaskScope<Object>(null, factory)) {
590 Subtask<Void> subtask = scope.fork(() -> {
591 Thread.sleep(Duration.ofDays(1));
592 return null;
593 });
594
595 // now
596 try {
597 scope.joinUntil(Instant.now());
598 fail("joinUntil did not throw");
599 } catch (TimeoutException expected) {
600 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
601 }
602
603 // in the past
604 try {
605 scope.joinUntil(Instant.now().minusSeconds(1));
606 fail("joinUntil did not throw");
607 } catch (TimeoutException expected) {
608 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
609 }
610 }
611 }
612
613 /**
614 * Test joinUntil with interrupt status set.
615 */
616 @ParameterizedTest
617 @MethodSource("factories")
618 void testInterruptJoinUntil1(ThreadFactory factory) throws Exception {
619 try (var scope = new StructuredTaskScope<String>(null, factory)) {
620 var latch = new CountDownLatch(1);
621
622 Subtask<String> subtask = scope.fork(() -> {
623 latch.await();
624 return "foo";
625 });
626
627 // joinUntil should throw
628 Thread.currentThread().interrupt();
629 try {
630 scope.joinUntil(Instant.now().plusSeconds(30));
631 fail("joinUntil did not throw");
632 } catch (InterruptedException expected) {
633 assertFalse(Thread.interrupted()); // interrupt status should be clear
634 } finally {
635 // let task continue
636 latch.countDown();
637 }
638
639 // join should complete
640 scope.join();
641 assertEquals("foo", subtask.get());
642 }
643 }
644
645 /**
646 * Test interrupt of thread blocked in joinUntil.
647 */
648 @ParameterizedTest
649 @MethodSource("factories")
650 void testInterruptJoinUntil2(ThreadFactory factory) throws Exception {
651 try (var scope = new StructuredTaskScope(null, factory)) {
652 var latch = new CountDownLatch(1);
653
654 Subtask<String> subtask = scope.fork(() -> {
655 latch.await();
656 return "foo";
657 });
658
659 // joinUntil should throw
660 scheduleInterruptAt("java.util.concurrent.StructuredTaskScope.joinUntil");
661 try {
662 scope.joinUntil(Instant.now().plusSeconds(30));
663 fail("joinUntil did not throw");
664 } catch (InterruptedException expected) {
665 assertFalse(Thread.interrupted()); // interrupt status should be clear
666 } finally {
667 // let task continue
668 latch.countDown();
669 }
670
671 // join should complete
672 scope.join();
673 assertEquals("foo", subtask.get());
674 }
675 }
676
677 /**
678 * Test that shutdown interrupts unfinished subtasks.
679 */
680 @ParameterizedTest
681 @MethodSource("factories")
682 void testShutdownInterruptsThreads1(ThreadFactory factory) throws Exception {
683 try (var scope = new StructuredTaskScope<Object>(null, factory)) {
684 var interrupted = new AtomicBoolean();
685 var latch = new CountDownLatch(1);
686 var subtask = scope.fork(() -> {
687 try {
688 Thread.sleep(Duration.ofDays(1));
689 } catch (InterruptedException e) {
690 interrupted.set(true);
691 } finally {
692 latch.countDown();
693 }
694 return null;
695 });
696
697 scope.shutdown();
698
699 // wait for task to complete
700 latch.await();
701 assertTrue(interrupted.get());
702
703 scope.join();
704
705 // subtask result/exception not available
706 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
707 assertThrows(IllegalStateException.class, subtask::get);
708 assertThrows(IllegalStateException.class, subtask::exception);
709 }
710 }
711
712 /**
713 * Test that shutdown does not interrupt current thread.
714 */
715 @ParameterizedTest
716 @MethodSource("factories")
717 void testShutdownInterruptsThreads2(ThreadFactory factory) throws Exception {
718 try (var scope = new StructuredTaskScope<Object>(null, factory)) {
719 var interrupted = new AtomicBoolean();
720 var latch = new CountDownLatch(1);
721 var subtask = scope.fork(() -> {
722 try {
723 scope.shutdown();
724 interrupted.set(Thread.currentThread().isInterrupted());
725 } finally {
726 latch.countDown();
727 }
728 return null;
729 });
730
731 // wait for task to complete
732 latch.await();
733 assertFalse(interrupted.get());
734
735 scope.join();
736 }
737 }
738
739 /**
740 * Test shutdown wakes join.
741 */
742 @ParameterizedTest
743 @MethodSource("factories")
744 void testShutdownWakesJoin(ThreadFactory factory) throws Exception {
745 try (var scope = new StructuredTaskScope<Object>(null, factory)) {
746 var latch = new CountDownLatch(1);
747 scope.fork(() -> {
748 Thread.sleep(Duration.ofMillis(100)); // give time for join to block
749 scope.shutdown();
750 latch.await();
751 return null;
752 });
753
754 scope.join();
755
756 // join woke up, allow task to complete
757 latch.countDown();
758 }
759 }
760
761 /**
762 * Test shutdown after scope is closed.
763 */
764 @Test
765 void testShutdownAfterClose() throws Exception {
766 try (var scope = new StructuredTaskScope<Object>()) {
767 scope.join();
768 scope.close();
769 assertThrows(IllegalStateException.class, scope::shutdown);
770 }
771 }
772
773 /**
774 * Test shutdown is confined to threads in the scope "tree".
775 */
776 @ParameterizedTest
777 @MethodSource("factories")
778 void testShutdownConfined(ThreadFactory factory) throws Exception {
779 try (var scope1 = new StructuredTaskScope<Boolean>();
780 var scope2 = new StructuredTaskScope<Boolean>()) {
781
782 // thread in scope1 cannot shutdown scope2
783 Subtask<Boolean> subtask1 = scope1.fork(() -> {
784 assertThrows(WrongThreadException.class, scope2::shutdown);
785 return true;
786 });
787
788 // wait for task in scope1 to complete to avoid racing with task in scope2
789 while (subtask1.state() == Subtask.State.UNAVAILABLE) {
790 Thread.sleep(10);
791 }
792
793 // thread in scope2 shutdown scope1
794 Subtask<Boolean> subtask2 = scope2.fork(() -> {
795 scope1.shutdown();
796 return true;
797 });
798
799 scope2.join();
800 scope1.join();
801
802 assertTrue(subtask1.get());
803 assertTrue(subtask1.get());
804
805 // random thread cannot shutdown
806 try (var pool = Executors.newSingleThreadExecutor()) {
807 Future<Void> future = pool.submit(() -> {
808 assertThrows(WrongThreadException.class, scope1::shutdown);
809 assertThrows(WrongThreadException.class, scope2::shutdown);
810 return null;
811 });
812 future.get();
813 }
814 }
815 }
816
817 /**
818 * Test isShutdown.
819 */
820 @Test
821 void testIsShutdown() {
822 try (var scope = new StructuredTaskScope<Object>()) {
823 assertFalse(scope.isShutdown()); // before shutdown
824 scope.shutdown();
825 assertTrue(scope.isShutdown()); // after shutdown
826 scope.close();
827 assertTrue(scope.isShutdown()); // after cose
828 }
829 }
830
831 /**
832 * Test close without join, no subtasks forked.
833 */
834 @Test
835 void testCloseWithoutJoin1() {
836 try (var scope = new StructuredTaskScope<Object>()) {
837 // do nothing
838 }
839 }
840
841 /**
842 * Test close without join, unfinished subtasks.
843 */
844 @ParameterizedTest
845 @MethodSource("factories")
846 void testCloseWithoutJoin2(ThreadFactory factory) {
847 try (var scope = new StructuredTaskScope<String>(null, factory)) {
848 Subtask<String> subtask = scope.fork(() -> {
849 Thread.sleep(Duration.ofDays(1));
850 return null;
851 });
852 assertThrows(IllegalStateException.class, scope::close);
853
854 // subtask result/exception not available
855 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
856 assertThrows(IllegalStateException.class, subtask::get);
857 assertThrows(IllegalStateException.class, subtask::exception);
858 }
859 }
860
861 /**
862 * Test close without join, unfinished subtasks forked after join.
863 */
864 @ParameterizedTest
865 @MethodSource("factories")
866 void testCloseWithoutJoin3(ThreadFactory factory) throws Exception {
867 try (var scope = new StructuredTaskScope(null, factory)) {
868 scope.fork(() -> "foo");
869 scope.join();
870
871 Subtask<String> subtask = scope.fork(() -> {
872 Thread.sleep(Duration.ofDays(1));
873 return null;
874 });
875 assertThrows(IllegalStateException.class, scope::close);
876
877 // subtask result/exception not available
878 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
879 assertThrows(IllegalStateException.class, subtask::get);
880 assertThrows(IllegalStateException.class, subtask::exception);
881 }
882 }
883
884 /**
885 * Test close after join throws. Close should not throw as join attempted.
886 */
887 @ParameterizedTest
888 @MethodSource("factories")
889 void testCloseAfterJoinThrows(ThreadFactory factory) throws Exception {
890 try (var scope = new StructuredTaskScope<Object>()) {
891 var subtask = scope.fork(() -> {
892 Thread.sleep(Duration.ofDays(1));
893 return null;
894 });
895
896 // join throws
897 Thread.currentThread().interrupt();
898 assertThrows(InterruptedException.class, scope::join);
899 assertThrows(IllegalStateException.class, subtask::get);
900 }
901 }
902
903 /**
904 * Test close after joinUntil throws. Close should not throw as join attempted.
905 */
906 @ParameterizedTest
907 @MethodSource("factories")
908 void testCloseAfterJoinUntilThrows(ThreadFactory factory) throws Exception {
909 try (var scope = new StructuredTaskScope<Object>()) {
910 var subtask = scope.fork(() -> {
911 Thread.sleep(Duration.ofDays(1));
912 return null;
913 });
914
915 // joinUntil throws
916 assertThrows(TimeoutException.class, () -> scope.joinUntil(Instant.now()));
917 assertThrows(IllegalStateException.class, subtask::get);
918 }
919 }
920
921 /**
922 * Test close is owner confined.
923 */
924 @ParameterizedTest
925 @MethodSource("factories")
926 void testCloseConfined(ThreadFactory factory) throws Exception {
927 try (var scope = new StructuredTaskScope<Boolean>()) {
928
929 // attempt to close from thread in scope
930 Subtask<Boolean> subtask = scope.fork(() -> {
931 assertThrows(WrongThreadException.class, scope::close);
932 return true;
933 });
934
935 scope.join();
936 assertTrue(subtask.get());
937
938 // random thread cannot close scope
939 try (var pool = Executors.newCachedThreadPool(factory)) {
940 Future<Boolean> future = pool.submit(() -> {
941 assertThrows(WrongThreadException.class, scope::close);
942 return null;
943 });
944 future.get();
945 }
946 }
947 }
948
949 /**
950 * Test close with interrupt status set.
951 */
952 @ParameterizedTest
953 @MethodSource("factories")
954 void testInterruptClose1(ThreadFactory factory) throws Exception {
955 try (var scope = new StructuredTaskScope<Object>(null, factory)) {
956 var done = new AtomicBoolean();
957 scope.fork(() -> {
958 try {
959 Thread.sleep(Duration.ofDays(1));
960 } catch (InterruptedException e) {
961 // interrupted by shutdown, expected
962 }
963 Thread.sleep(Duration.ofMillis(100)); // force close to wait
964 done.set(true);
965 return null;
966 });
967
968 scope.shutdown();
969 scope.join();
970
971 // invoke close with interrupt status set
972 Thread.currentThread().interrupt();
973 try {
974 scope.close();
975 } finally {
976 assertTrue(Thread.interrupted()); // clear interrupt status
977 assertTrue(done.get());
978 }
979 }
980 }
981
982 /**
983 * Test interrupting thread waiting in close.
984 */
985 @ParameterizedTest
986 @MethodSource("factories")
987 void testInterruptClose2(ThreadFactory factory) throws Exception {
988 try (var scope = new StructuredTaskScope<Object>(null, factory)) {
989 var done = new AtomicBoolean();
990 Thread mainThread = Thread.currentThread();
991 scope.fork(() -> {
992 try {
993 Thread.sleep(Duration.ofDays(1));
994 } catch (InterruptedException e) {
995 // interrupted by shutdown, expected
996 }
997
998 // interrupt main thread when it blocks in close
999 interruptThreadAt(mainThread, "java.util.concurrent.StructuredTaskScope.close");
1000
1001 Thread.sleep(Duration.ofMillis(100)); // force close to wait
1002 done.set(true);
1003 return null;
1004 });
1005
1006 scope.shutdown(); // interrupts task
1007 scope.join();
1008 try {
1009 scope.close();
1010 } finally {
1011 assertTrue(Thread.interrupted()); // clear interrupt status
1012 assertTrue(done.get());
1013 }
1014 }
1015 }
1016
1017 /**
1018 * Test that closing an enclosing scope closes the thread flock of a nested scope.
1019 */
1020 @Test
1021 void testCloseThrowsStructureViolation() throws Exception {
1022 try (var scope1 = new StructuredTaskScope<Object>()) {
1023 try (var scope2 = new StructuredTaskScope<Object>()) {
1024
1025 // join + close enclosing scope
1026 scope1.join();
1027 try {
1028 scope1.close();
1029 fail("close did not throw");
1030 } catch (StructureViolationException expected) { }
1031
1032 // underlying flock should be closed, fork should return a cancelled task
1033 var executed = new AtomicBoolean();
1034 Subtask<Void> subtask = scope2.fork(() -> {
1035 executed.set(true);
1036 return null;
1037 });
1038 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1039 scope2.join();
1040 assertFalse(executed.get());
1041 }
1042 }
1043 }
1044
1045 /**
1046 * A StructuredTaskScope that collects the subtasks notified to the handleComplete method.
1047 */
1048 private static class CollectAll<T> extends StructuredTaskScope<T> {
1049 private final Set<Subtask<? extends T>> subtasks = ConcurrentHashMap.newKeySet();
1050
1051 CollectAll(ThreadFactory factory) {
1052 super(null, factory);
1053 }
1054
1055 @Override
1056 protected void handleComplete(Subtask<? extends T> subtask) {
1057 subtasks.add(subtask);
1058 }
1059
1060 Set<Subtask<? extends T>> subtasks() {
1061 return subtasks;
1062 }
1063
1064 Subtask<? extends T> find(Callable<T> task) {
1065 return subtasks.stream()
1066 .filter(h -> task.equals(h.task()))
1067 .findAny()
1068 .orElseThrow();
1069 }
1070 }
1071
1072 /**
1073 * Test that handleComplete method is invoked for tasks that complete before shutdown.
1074 */
1075 @ParameterizedTest
1076 @MethodSource("factories")
1077 void testHandleCompleteBeforeShutdown(ThreadFactory factory) throws Exception {
1078 try (var scope = new CollectAll<String>(factory)) {
1079 Callable<String> task1 = () -> "foo";
1080 Callable<String> task2 = () -> { throw new FooException(); };
1081 scope.fork(task1);
1082 scope.fork(task2);
1083 scope.join();
1084
1085 var subtask1 = scope.find(task1);
1086 assertEquals("foo", subtask1.get());
1087
1088 var subtask2 = scope.find(task2);
1089 assertTrue(subtask2.exception() instanceof FooException);
1090 }
1091 }
1092
1093 /**
1094 * Test that handleComplete method is not invoked for tasks that finish after shutdown
1095 * or are forked after shutdown.
1096 */
1097 @ParameterizedTest
1098 @MethodSource("factories")
1099 void testHandleCompleteAfterShutdown(ThreadFactory factory) throws Exception {
1100 try (var scope = new CollectAll<String>(factory)) {
1101 Callable<String> task1 = () -> {
1102 try {
1103 Thread.sleep(Duration.ofDays(1));
1104 } catch (InterruptedException ignore) { }
1105 return "foo";
1106 };
1107 Callable<String> task2 = () -> {
1108 Thread.sleep(Duration.ofDays(1));
1109 return "bar";
1110 };
1111 Callable<String> task3 = () -> "baz";
1112
1113 // forked before shutdown, will complete after shutdown
1114 var subtask1 = scope.fork(task1);
1115 var subtask2 = scope.fork(task2);
1116
1117 scope.shutdown();
1118
1119 // forked after shutdown
1120 var subtask3 = scope.fork(task3);
1121
1122 scope.join();
1123
1124 // handleComplete should not be called
1125 for (int i = 0; i < 3; i++) {
1126 assertEquals(0, scope.subtasks().size());
1127 Thread.sleep(20);
1128 }
1129
1130 assertEquals(Subtask.State.UNAVAILABLE, subtask1.state());
1131 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1132 assertEquals(Subtask.State.UNAVAILABLE, subtask3.state());
1133 }
1134 }
1135
1136 /**
1137 * Test that the default handleComplete throws IllegalArgumentException if called
1138 * with a running task.
1139 */
1140 @Test
1141 void testHandleCompleteThrows() throws Exception {
1142 class TestScope<T> extends StructuredTaskScope<T> {
1143 protected void handleComplete(Subtask<? extends T> subtask) {
1144 super.handleComplete(subtask);
1145 }
1146 }
1147
1148 try (var scope = new TestScope<String>()) {
1149 var subtask = scope.fork(() -> {
1150 Thread.sleep(Duration.ofDays(1));
1151 return "foo";
1152 });
1153
1154 // running task
1155 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1156 assertThrows(IllegalArgumentException.class, () -> scope.handleComplete(subtask));
1157 scope.shutdown();
1158
1159 // null task
1160 assertThrows(NullPointerException.class, () -> scope.handleComplete(null));
1161
1162 scope.join();
1163 }
1164 }
1165
1166 /**
1167 * Test ensureOwnerAndJoined.
1168 */
1169 @ParameterizedTest
1170 @MethodSource("factories")
1171 void testEnsureOwnerAndJoined(ThreadFactory factory) throws Exception {
1172 class MyScope<T> extends StructuredTaskScope<T> {
1173 MyScope(ThreadFactory factory) {
1174 super(null, factory);
1175 }
1176 void invokeEnsureOwnerAndJoined() {
1177 super.ensureOwnerAndJoined();
1178 }
1179 }
1180
1181 try (var scope = new MyScope<Boolean>(factory)) {
1182 // owner thread, before join
1183 scope.fork(() -> true);
1184 assertThrows(IllegalStateException.class, () -> {
1185 scope.invokeEnsureOwnerAndJoined();
1186 });
1187
1188 // owner thread, after join
1189 scope.join();
1190 scope.invokeEnsureOwnerAndJoined();
1191
1192 // thread in scope cannot invoke ensureOwnerAndJoined
1193 Subtask<Boolean> subtask = scope.fork(() -> {
1194 assertThrows(WrongThreadException.class, () -> {
1195 scope.invokeEnsureOwnerAndJoined();
1196 });
1197 return true;
1198 });
1199 scope.join();
1200 assertTrue(subtask.get());
1201
1202 // random thread cannot invoke ensureOwnerAndJoined
1203 try (var pool = Executors.newSingleThreadExecutor()) {
1204 Future<Void> future = pool.submit(() -> {
1205 assertThrows(WrongThreadException.class, () -> {
1206 scope.invokeEnsureOwnerAndJoined();
1207 });
1208 return null;
1209 });
1210 future.get();
1211 }
1212 }
1213 }
1214
1215 /**
1216 * Test ensureOwnerAndJoined after the task scope has been closed.
1217 */
1218 @ParameterizedTest
1219 @MethodSource("factories")
1220 void testEnsureOwnerAndJoinedAfterClose(ThreadFactory factory) throws Exception {
1221 class MyScope<T> extends StructuredTaskScope<T> {
1222 MyScope(ThreadFactory factory) {
1223 super(null, factory);
1224 }
1225 public void invokeEnsureOwnerAndJoined() {
1226 super.ensureOwnerAndJoined();
1227 }
1228 }
1229
1230 // ensureOwnerAndJoined after close, join invoked
1231 try (var scope = new MyScope<String>(factory)) {
1232 scope.fork(() -> "foo");
1233 scope.join();
1234 scope.close();
1235 scope.invokeEnsureOwnerAndJoined(); // should not throw
1236 }
1237
1238 // ensureOwnerAndJoined after close, join not invoked
1239 try (var scope = new MyScope<String>(factory)) {
1240 scope.fork(() -> "foo");
1241 assertThrows(IllegalStateException.class, scope::close);
1242 scope.invokeEnsureOwnerAndJoined(); // should not throw
1243 }
1244 }
1245
1246
1247 /**
1248 * Test toString.
1249 */
1250 @Test
1251 void testToString() throws Exception {
1252 ThreadFactory factory = Thread.ofVirtual().factory();
1253 try (var scope = new StructuredTaskScope<Object>("duke", factory)) {
1254 // open
1255 assertTrue(scope.toString().contains("duke"));
1256
1257 // shutdown
1258 scope.shutdown();
1259 assertTrue(scope.toString().contains("duke"));
1260
1261 // closed
1262 scope.join();
1263 scope.close();
1264 assertTrue(scope.toString().contains("duke"));
1265 }
1266 }
1267
1268 /**
1269 * Test Subtask with task that completes successfully.
1270 */
1271 @ParameterizedTest
1272 @MethodSource("factories")
1273 void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception {
1274 try (var scope = new StructuredTaskScope<String>(null, factory)) {
1275 Callable<String> task = () -> "foo";
1276 Subtask<String> subtask = scope.fork(task);
1277
1278 // before join, owner thread
1279 assertEquals(task, subtask.task());
1280 assertThrows(IllegalStateException.class, subtask::get);
1281 assertThrows(IllegalStateException.class, subtask::exception);
1282
1283 scope.join();
1284
1285 // after join
1286 assertEquals(task, subtask.task());
1287 assertEquals(Subtask.State.SUCCESS, subtask.state());
1288 assertEquals("foo", subtask.get());
1289 assertThrows(IllegalStateException.class, subtask::exception);
1290 }
1291 }
1292
1293 /**
1294 * Test Subtask with task that fails.
1295 */
1296 @ParameterizedTest
1297 @MethodSource("factories")
1298 void testSubtaskWhenFailed(ThreadFactory factory) throws Exception {
1299 try (var scope = new StructuredTaskScope<String>(null, factory)) {
1300 Callable<String> task = () -> { throw new FooException(); };
1301 Subtask<String> subtask = scope.fork(task);
1302
1303 // before join, owner thread
1304 assertEquals(task, subtask.task());
1305 assertThrows(IllegalStateException.class, subtask::get);
1306 assertThrows(IllegalStateException.class, subtask::exception);
1307
1308 scope.join();
1309
1310 // after join
1311 assertEquals(task, subtask.task());
1312 assertEquals(Subtask.State.FAILED, subtask.state());
1313 assertThrows(IllegalStateException.class, subtask::get);
1314 assertTrue(subtask.exception() instanceof FooException);
1315 }
1316 }
1317
1318 /**
1319 * Test Subtask with a task that has not completed.
1320 */
1321 @ParameterizedTest
1322 @MethodSource("factories")
1323 void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception {
1324 try (var scope = new StructuredTaskScope<Object>(null, factory)) {
1325 Callable<Void> task = () -> {
1326 Thread.sleep(Duration.ofDays(1));
1327 return null;
1328 };
1329 Subtask<Void> subtask = scope.fork(task);
1330
1331 // before join
1332 assertEquals(task, subtask.task());
1333 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1334 assertThrows(IllegalStateException.class, subtask::get);
1335 assertThrows(IllegalStateException.class, subtask::exception);
1336
1337 // attempt join, join throws
1338 Thread.currentThread().interrupt();
1339 assertThrows(InterruptedException.class, scope::join);
1340
1341 // after join
1342 assertEquals(task, subtask.task());
1343 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1344 assertThrows(IllegalStateException.class, subtask::get);
1345 assertThrows(IllegalStateException.class, subtask::exception);
1346 }
1347 }
1348
1349 /**
1350 * Test Subtask when forked after shutdown.
1351 */
1352 @ParameterizedTest
1353 @MethodSource("factories")
1354 void testSubtaskWhenShutdown(ThreadFactory factory) throws Exception {
1355 try (var scope = new StructuredTaskScope<Object>(null, factory)) {
1356 Callable<Void> task = () -> {
1357 Thread.sleep(Duration.ofDays(1));
1358 return null;
1359 };
1360
1361 scope.shutdown();
1362
1363 // fork after shutdown
1364 Subtask<Void> subtask = scope.fork(task);
1365 scope.join();
1366 assertEquals(task, subtask.task());
1367 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1368 assertThrows(IllegalStateException.class, subtask::get);
1369 assertThrows(IllegalStateException.class, subtask::exception);
1370 }
1371 }
1372
1373 /**
1374 * Test Subtask::toString.
1375 */
1376 @Test
1377 void testSubtaskToString() throws Exception {
1378 try (var scope = new StructuredTaskScope<Object>()) {
1379 // success
1380 var subtask1 = scope.fork(() -> "foo");
1381 scope.join();
1382 assertTrue(subtask1.toString().contains("Completed successfully"));
1383
1384 // failed
1385 var subtask2 = scope.fork(() -> { throw new FooException(); });
1386 scope.join();
1387 assertTrue(subtask2.toString().contains("Failed"));
1388
1389 // not completed
1390 Callable<Void> sleepForDay = () -> {
1391 Thread.sleep(Duration.ofDays(1));
1392 return null;
1393 };
1394 var subtask3 = scope.fork(sleepForDay);
1395 assertTrue(subtask3.toString().contains("Unavailable"));
1396
1397 scope.shutdown();
1398
1399 // forked after shutdown
1400 var subtask4 = scope.fork(sleepForDay);
1401 assertTrue(subtask4.toString().contains("Unavailable"));
1402
1403 scope.join();
1404 }
1405 }
1406
1407 /**
1408 * Test ShutdownOnSuccess with no completed tasks.
1409 */
1410 @Test
1411 void testShutdownOnSuccess1() throws Exception {
1412 try (var scope = new ShutdownOnSuccess<Object>()) {
1413 assertThrows(IllegalStateException.class, () -> scope.result());
1414 assertThrows(IllegalStateException.class, () -> scope.result(e -> null));
1415 }
1416 }
1417
1418 /**
1419 * Test ShutdownOnSuccess with tasks that complete successfully.
1420 */
1421 @ParameterizedTest
1422 @MethodSource("factories")
1423 void testShutdownOnSuccess2(ThreadFactory factory) throws Exception {
1424 try (var scope = new ShutdownOnSuccess<String>(null, factory)) {
1425 scope.fork(() -> "foo");
1426 scope.join(); // ensures foo completes first
1427 scope.fork(() -> "bar");
1428 scope.join();
1429 assertEquals("foo", scope.result());
1430 assertEquals("foo", scope.result(e -> null));
1431 }
1432 }
1433
1434 /**
1435 * Test ShutdownOnSuccess with a task that completes successfully with a null result.
1436 */
1437 @ParameterizedTest
1438 @MethodSource("factories")
1439 void testShutdownOnSuccess3(ThreadFactory factory) throws Exception {
1440 try (var scope = new ShutdownOnSuccess<Object>(null, factory)) {
1441 scope.fork(() -> null);
1442 scope.join();
1443 assertNull(scope.result());
1444 assertNull(scope.result(e -> null));
1445 }
1446 }
1447
1448 /**
1449 * Test ShutdownOnSuccess with tasks that complete succcessfully and tasks that fail.
1450 */
1451 @ParameterizedTest
1452 @MethodSource("factories")
1453 void testShutdownOnSuccess4(ThreadFactory factory) throws Exception {
1454 try (var scope = new ShutdownOnSuccess<String>(null, factory)) {
1455 scope.fork(() -> "foo");
1456 scope.fork(() -> { throw new ArithmeticException(); });
1457 scope.join();
1458 assertEquals("foo", scope.result());
1459 assertEquals("foo", scope.result(e -> null));
1460 }
1461 }
1462
1463 /**
1464 * Test ShutdownOnSuccess with a task that fails.
1465 */
1466 @ParameterizedTest
1467 @MethodSource("factories")
1468 void testShutdownOnSuccess5(ThreadFactory factory) throws Exception {
1469 try (var scope = new ShutdownOnSuccess<Object>(null, factory)) {
1470 scope.fork(() -> { throw new ArithmeticException(); });
1471 scope.join();
1472 Throwable ex = assertThrows(ExecutionException.class, () -> scope.result());
1473 assertTrue(ex.getCause() instanceof ArithmeticException);
1474 ex = assertThrows(FooException.class, () -> scope.result(e -> new FooException(e)));
1475 assertTrue(ex.getCause() instanceof ArithmeticException);
1476 }
1477 }
1478
1479 /**
1480 * Test ShutdownOnSuccess methods are confined to the owner.
1481 */
1482 @ParameterizedTest
1483 @MethodSource("factories")
1484 void testShutdownOnSuccessConfined(ThreadFactory factory) throws Exception {
1485 // owner before join
1486 try (var scope = new ShutdownOnSuccess<Boolean>(null, factory)) {
1487 scope.fork(() -> { throw new FooException(); });
1488 assertThrows(IllegalStateException.class, scope::result);
1489 assertThrows(IllegalStateException.class, () -> {
1490 scope.result(e -> new RuntimeException(e));
1491 });
1492 scope.join();
1493 }
1494
1495 // non-owner
1496 try (var scope = new ShutdownOnSuccess<Boolean>(null, factory)) {
1497 Subtask<Boolean> subtask = scope.fork(() -> {
1498 assertThrows(WrongThreadException.class, scope::result);
1499 assertThrows(WrongThreadException.class, () -> {
1500 scope.result(e -> new RuntimeException(e));
1501 });
1502 return true;
1503 });
1504 scope.join();
1505 assertTrue(subtask.get());
1506 }
1507 }
1508
1509 /**
1510 * Test ShutdownOnFailure with no completed tasks.
1511 */
1512 @Test
1513 void testShutdownOnFailure1() throws Throwable {
1514 try (var scope = new ShutdownOnFailure()) {
1515 assertTrue(scope.exception().isEmpty());
1516 scope.throwIfFailed();
1517 scope.throwIfFailed(e -> new FooException(e));
1518 }
1519 }
1520
1521 /**
1522 * Test ShutdownOnFailure with tasks that complete successfully.
1523 */
1524 @ParameterizedTest
1525 @MethodSource("factories")
1526 void testShutdownOnFailure2(ThreadFactory factory) throws Throwable {
1527 try (var scope = new ShutdownOnFailure(null, factory)) {
1528 scope.fork(() -> "foo");
1529 scope.fork(() -> "bar");
1530 scope.join();
1531
1532 // no exception
1533 assertTrue(scope.exception().isEmpty());
1534 scope.throwIfFailed();
1535 scope.throwIfFailed(e -> new FooException(e));
1536 }
1537 }
1538
1539 /**
1540 * Test ShutdownOnFailure with tasks that complete succcessfully and tasks that fail.
1541 */
1542 @ParameterizedTest
1543 @MethodSource("factories")
1544 void testShutdownOnFailure3(ThreadFactory factory) throws Throwable {
1545 try (var scope = new ShutdownOnFailure(null, factory)) {
1546
1547 // one task completes successfully, the other fails
1548 scope.fork(() -> "foo");
1549 scope.fork(() -> { throw new ArithmeticException(); });
1550 scope.join();
1551
1552 Throwable ex = scope.exception().orElse(null);
1553 assertTrue(ex instanceof ArithmeticException);
1554
1555 ex = assertThrows(ExecutionException.class, () -> scope.throwIfFailed());
1556 assertTrue(ex.getCause() instanceof ArithmeticException);
1557
1558 ex = assertThrows(FooException.class,
1559 () -> scope.throwIfFailed(e -> new FooException(e)));
1560 assertTrue(ex.getCause() instanceof ArithmeticException);
1561 }
1562 }
1563
1564 /**
1565 * Test ShutdownOnFailure methods are confined to the owner.
1566 */
1567 @ParameterizedTest
1568 @MethodSource("factories")
1569 void testShutdownOnFailureConfined(ThreadFactory factory) throws Exception {
1570 // owner before join
1571 try (var scope = new ShutdownOnFailure(null, factory)) {
1572 scope.fork(() -> "foo");
1573 assertThrows(IllegalStateException.class, scope::exception);
1574 assertThrows(IllegalStateException.class, scope::throwIfFailed);
1575 assertThrows(IllegalStateException.class, () -> {
1576 scope.throwIfFailed(e -> new RuntimeException(e));
1577 });
1578 scope.join();
1579 }
1580
1581 // non-owner
1582 try (var scope = new ShutdownOnFailure(null, factory)) {
1583 Subtask<Boolean> subtask = scope.fork(() -> {
1584 assertThrows(WrongThreadException.class, scope::exception);
1585 assertThrows(WrongThreadException.class, scope::throwIfFailed);
1586 assertThrows(WrongThreadException.class, () -> {
1587 scope.throwIfFailed(e -> new RuntimeException(e));
1588 });
1589 return true;
1590 });
1591 scope.join();
1592 assertTrue(subtask.get());
1593 }
1594 }
1595
1596 /**
1597 * Test for NullPointerException.
1598 */
1599 @Test
1600 void testNulls() throws Exception {
1601 assertThrows(NullPointerException.class, () -> new StructuredTaskScope("", null));
1602 try (var scope = new StructuredTaskScope<Object>()) {
1603 assertThrows(NullPointerException.class, () -> scope.fork(null));
1604 assertThrows(NullPointerException.class, () -> scope.joinUntil(null));
1605 }
1606
1607 assertThrows(NullPointerException.class, () -> new ShutdownOnSuccess<Object>("", null));
1608 try (var scope = new ShutdownOnSuccess<Object>()) {
1609 assertThrows(NullPointerException.class, () -> scope.fork(null));
1610 assertThrows(NullPointerException.class, () -> scope.joinUntil(null));
1611 assertThrows(NullPointerException.class, () -> scope.result(null));
1612 }
1613
1614 assertThrows(NullPointerException.class, () -> new ShutdownOnFailure("", null));
1615 try (var scope = new ShutdownOnFailure()) {
1616 assertThrows(NullPointerException.class, () -> scope.fork(null));
1617 assertThrows(NullPointerException.class, () -> scope.joinUntil(null));
1618 assertThrows(NullPointerException.class, () -> scope.throwIfFailed(null));
1619 }
1620 }
1621
1622 /**
1623 * A runtime exception for tests.
1624 */
1625 private static class FooException extends RuntimeException {
1626 FooException() { }
1627 FooException(Throwable cause) { super(cause); }
1628 }
1629
1630 /**
1631 * Returns the current time in milliseconds.
1632 */
1633 private long millisTime() {
1634 long now = System.nanoTime();
1635 return TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);
1636 }
1637
1638 /**
|
1 /*
2 * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /*
25 * @test id=platform
26 * @bug 8284199 8296779 8306647
27 * @summary Basic tests for StructuredTaskScope
28 * @enablePreview
29 * @run junit/othervm -DthreadFactory=platform StructuredTaskScopeTest
30 */
31
32 /*
33 * @test id=virtual
34 * @enablePreview
35 * @run junit/othervm -DthreadFactory=virtual StructuredTaskScopeTest
36 */
37
38 import java.time.Duration;
39 import java.util.Arrays;
40 import java.util.ArrayList;
41 import java.util.List;
42 import java.util.Set;
43 import java.util.NoSuchElementException;
44 import java.util.concurrent.Callable;
45 import java.util.concurrent.ConcurrentHashMap;
46 import java.util.concurrent.CountDownLatch;
47 import java.util.concurrent.Executors;
48 import java.util.concurrent.Future;
49 import java.util.concurrent.LinkedTransferQueue;
50 import java.util.concurrent.ThreadFactory;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.RejectedExecutionException;
53 import java.util.concurrent.ScheduledExecutorService;
54 import java.util.concurrent.StructuredTaskScope;
55 import java.util.concurrent.StructuredTaskScope.TimeoutException;
56 import java.util.concurrent.StructuredTaskScope.Config;
57 import java.util.concurrent.StructuredTaskScope.FailedException;
58 import java.util.concurrent.StructuredTaskScope.Joiner;
59 import java.util.concurrent.StructuredTaskScope.Subtask;
60 import java.util.concurrent.StructureViolationException;
61 import java.util.concurrent.atomic.AtomicBoolean;
62 import java.util.concurrent.atomic.AtomicInteger;
63 import java.util.concurrent.atomic.AtomicReference;
64 import java.util.function.Function;
65 import java.util.function.Predicate;
66 import java.util.stream.Stream;
67 import static java.lang.Thread.State.*;
68
69 import org.junit.jupiter.api.Test;
70 import org.junit.jupiter.api.BeforeAll;
71 import org.junit.jupiter.api.AfterAll;
72 import org.junit.jupiter.params.ParameterizedTest;
73 import org.junit.jupiter.params.provider.MethodSource;
74 import static org.junit.jupiter.api.Assertions.*;
75
76 class StructuredTaskScopeTest {
77 private static ScheduledExecutorService scheduler;
78 private static List<ThreadFactory> threadFactories;
79
80 @BeforeAll
81 static void setup() throws Exception {
82 scheduler = Executors.newSingleThreadScheduledExecutor();
83
84 // thread factories
85 String value = System.getProperty("threadFactory");
86 List<ThreadFactory> list = new ArrayList<>();
87 if (value == null || value.equals("platform"))
88 list.add(Thread.ofPlatform().factory());
89 if (value == null || value.equals("virtual"))
90 list.add(Thread.ofVirtual().factory());
91 assertTrue(list.size() > 0, "No thread factories for tests");
92 threadFactories = list;
93 }
94
95 @AfterAll
96 static void shutdown() {
97 scheduler.shutdown();
98 }
99
100 private static Stream<ThreadFactory> factories() {
101 return threadFactories.stream();
102 }
103
104 /**
105 * Test that fork creates virtual threads when no ThreadFactory is configured.
106 */
107 @Test
108 void testForkCreatesVirtualThread() throws Exception {
109 Set<Thread> threads = ConcurrentHashMap.newKeySet();
110 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
111 for (int i = 0; i < 50; i++) {
112 // runnable
113 scope.fork(() -> {
114 threads.add(Thread.currentThread());
115 });
116
117 // callable
118 scope.fork(() -> {
119 threads.add(Thread.currentThread());
120 return null;
121 });
122 }
123 scope.join();
124 }
125 assertEquals(100, threads.size());
126 threads.forEach(t -> assertTrue(t.isVirtual()));
127 }
128
129 /**
130 * Test that fork create threads with the configured ThreadFactory.
131 */
132 @ParameterizedTest
133 @MethodSource("factories")
134 void testForkUsesThreadFactory(ThreadFactory factory) throws Exception {
135 // TheadFactory that keeps reference to all threads it creates
136 class RecordingThreadFactory implements ThreadFactory {
137 final ThreadFactory delegate;
138 final Set<Thread> threads = ConcurrentHashMap.newKeySet();
139 RecordingThreadFactory(ThreadFactory delegate) {
140 this.delegate = delegate;
141 }
142 @Override
143 public Thread newThread(Runnable task) {
144 Thread thread = delegate.newThread(task);
145 threads.add(thread);
146 return thread;
147 }
148 Set<Thread> threads() {
149 return threads;
150 }
151 }
152 var recordingThreadFactory = new RecordingThreadFactory(factory);
153 Set<Thread> threads = ConcurrentHashMap.newKeySet();
154 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
155 cf -> cf.withThreadFactory(recordingThreadFactory))) {
156
157 for (int i = 0; i < 50; i++) {
158 // runnable
159 scope.fork(() -> {
160 threads.add(Thread.currentThread());
161 });
162
163 // callable
164 scope.fork(() -> {
165 threads.add(Thread.currentThread());
166 return null;
167 });
168 }
169 scope.join();
170 }
171 assertEquals(100, threads.size());
172 assertEquals(recordingThreadFactory.threads(), threads);
173 }
174
175 /**
176 * Test fork method is owner confined.
177 */
178 @ParameterizedTest
179 @MethodSource("factories")
180 void testForkConfined(ThreadFactory factory) throws Exception {
181 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
182 cf -> cf.withThreadFactory(factory))) {
183
184 // random thread cannot fork
185 try (var pool = Executors.newSingleThreadExecutor()) {
186 Future<Void> future = pool.submit(() -> {
187 assertThrows(WrongThreadException.class, () -> {
188 scope.fork(() -> null);
189 });
190 return null;
191 });
192 future.get();
193 }
194
195 // subtask cannot fork
196 Subtask<Boolean> subtask = scope.fork(() -> {
197 assertThrows(WrongThreadException.class, () -> {
198 scope.fork(() -> null);
199 });
200 return true;
201 });
202 scope.join();
203 assertTrue(subtask.get());
204 }
205 }
206
207 /**
208 * Test fork after join, no subtasks forked before join.
209 */
210 @ParameterizedTest
211 @MethodSource("factories")
212 void testForkAfterJoin1(ThreadFactory factory) throws Exception {
213 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
214 cf -> cf.withThreadFactory(factory))) {
215 scope.join();
216 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
217 }
218 }
219
220 /**
221 * Test fork after join, subtasks forked before join.
222 */
223 @ParameterizedTest
224 @MethodSource("factories")
225 void testForkAfterJoin2(ThreadFactory factory) throws Exception {
226 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
227 cf -> cf.withThreadFactory(factory))) {
228 scope.fork(() -> "foo");
229 scope.join();
230 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
231 }
232 }
233
234 /**
235 * Test fork after join throws.
236 */
237 @ParameterizedTest
238 @MethodSource("factories")
239 void testForkAfterJoinThrows(ThreadFactory factory) throws Exception {
240 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
241 cf -> cf.withThreadFactory(factory))) {
242 var latch = new CountDownLatch(1);
243 var subtask1 = scope.fork(() -> {
244 latch.await();
245 return "foo";
246 });
247
248 // join throws
249 Thread.currentThread().interrupt();
250 assertThrows(InterruptedException.class, scope::join);
251
252 // fork should throw
253 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
254 }
255 }
256
257 /**
258 * Test fork after task scope is cancelled. This test uses a custom Joiner to
259 * cancel execution.
260 */
261 @ParameterizedTest
262 @MethodSource("factories")
263 void testForkAfterCancel2(ThreadFactory factory) throws Exception {
264 var countingThreadFactory = new CountingThreadFactory(factory);
265 var testJoiner = new CancelAfterOneJoiner<String>();
266
267 try (var scope = StructuredTaskScope.open(testJoiner,
268 cf -> cf.withThreadFactory(countingThreadFactory))) {
269
270 // fork subtask, the scope should be cancelled when the subtask completes
271 var subtask1 = scope.fork(() -> "foo");
272 while (!scope.isCancelled()) {
273 Thread.sleep(20);
274 }
275
276 assertEquals(1, countingThreadFactory.threadCount());
277 assertEquals(1, testJoiner.onForkCount());
278 assertEquals(1, testJoiner.onCompleteCount());
279
280 // fork second subtask, it should not run
281 var subtask2 = scope.fork(() -> "bar");
282
283 // onFork should be invoked, newThread and onComplete should not be invoked
284 assertEquals(1, countingThreadFactory.threadCount());
285 assertEquals(2, testJoiner.onForkCount());
286 assertEquals(1, testJoiner.onCompleteCount());
287
288 scope.join();
289
290 assertEquals(1, countingThreadFactory.threadCount());
291 assertEquals(2, testJoiner.onForkCount());
292 assertEquals(1, testJoiner.onCompleteCount());
293 assertEquals("foo", subtask1.get());
294 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
295 }
296 }
297
298 /**
299 * Test fork after task scope is closed.
300 */
301 @Test
302 void testForkAfterClose() {
303 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
304 scope.close();
305 assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
306 }
307 }
308
309 /**
310 * Test fork with a ThreadFactory that rejects creating a thread.
311 */
312 @Test
313 void testForkRejectedExecutionException() {
314 ThreadFactory factory = task -> null;
315 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
316 cf -> cf.withThreadFactory(factory))) {
317 assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null));
318 }
319 }
320
321 /**
322 * Test join with no subtasks.
323 */
324 @Test
325 void testJoinWithNoSubtasks() throws Exception {
326 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
327 scope.join();
328 }
329 }
330
331 /**
332 * Test join with a remaining subtask.
333 */
334 @ParameterizedTest
335 @MethodSource("factories")
336 void testJoinWithRemainingSubtasks(ThreadFactory factory) throws Exception {
337 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
338 cf -> cf.withThreadFactory(factory))) {
339 Subtask<String> subtask = scope.fork(() -> {
340 Thread.sleep(Duration.ofMillis(100));
341 return "foo";
342 });
343 scope.join();
344 assertEquals("foo", subtask.get());
345 }
346 }
347
348 /**
349 * Test join after join completed with a result.
350 */
351 @Test
352 void testJoinAfterJoin1() throws Exception {
353 var results = new LinkedTransferQueue<>(List.of("foo", "bar", "baz"));
354 Joiner<Object, String> joiner = results::take;
355 try (var scope = StructuredTaskScope.open(joiner)) {
356 scope.fork(() -> "foo");
357 assertEquals("foo", scope.join());
358
359 // join already called
360 for (int i = 0 ; i < 3; i++) {
361 assertThrows(IllegalStateException.class, scope::join);
362 }
363 }
364 }
365
366 /**
367 * Test join after join completed with an exception.
368 */
369 @Test
370 void testJoinAfterJoin2() throws Exception {
371 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
372 scope.fork(() -> { throw new FooException(); });
373 Throwable ex = assertThrows(FailedException.class, scope::join);
374 assertTrue(ex.getCause() instanceof FooException);
375
376 // join already called
377 for (int i = 0 ; i < 3; i++) {
378 assertThrows(IllegalStateException.class, scope::join);
379 }
380 }
381 }
382
383 /**
384 * Test join after join completed with a timeout.
385 */
386 @Test
387 void testJoinAfterJoin3() throws Exception {
388 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
389 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
390 // wait for scope to be cancelled by timeout
391 while (!scope.isCancelled()) {
392 Thread.sleep(20);
393 }
394 assertThrows(TimeoutException.class, scope::join);
395
396 // join already called
397 for (int i = 0 ; i < 3; i++) {
398 assertThrows(IllegalStateException.class, scope::join);
399 }
400 }
401 }
402
403 /**
404 * Test join method is owner confined.
405 */
406 @ParameterizedTest
407 @MethodSource("factories")
408 void testJoinConfined(ThreadFactory factory) throws Exception {
409 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
410 cf -> cf.withThreadFactory(factory))) {
411
412 // random thread cannot join
413 try (var pool = Executors.newSingleThreadExecutor()) {
414 Future<Void> future = pool.submit(() -> {
415 assertThrows(WrongThreadException.class, scope::join);
416 return null;
417 });
418 future.get();
419 }
420
421 // subtask cannot join
422 Subtask<Boolean> subtask = scope.fork(() -> {
423 assertThrows(WrongThreadException.class, () -> { scope.join(); });
424 return true;
425 });
426 scope.join();
427 assertTrue(subtask.get());
428 }
429 }
430
431 /**
432 * Test join with interrupt status set.
433 */
434 @ParameterizedTest
435 @MethodSource("factories")
436 void testInterruptJoin1(ThreadFactory factory) throws Exception {
437 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
438 cf -> cf.withThreadFactory(factory))) {
439
440 Subtask<String> subtask = scope.fork(() -> {
441 Thread.sleep(60_000);
442 return "foo";
443 });
444
445 // join should throw
446 Thread.currentThread().interrupt();
447 try {
448 scope.join();
449 fail("join did not throw");
450 } catch (InterruptedException expected) {
451 assertFalse(Thread.interrupted()); // interrupt status should be cleared
452 }
453 }
454 }
455
456 /**
457 * Test interrupt of thread blocked in join.
458 */
459 @ParameterizedTest
460 @MethodSource("factories")
461 void testInterruptJoin2(ThreadFactory factory) throws Exception {
462 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
463 cf -> cf.withThreadFactory(factory))) {
464
465 var latch = new CountDownLatch(1);
466 Subtask<String> subtask = scope.fork(() -> {
467 Thread.sleep(60_000);
468 return "foo";
469 });
470
471 // interrupt main thread when it blocks in join
472 scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join");
473 try {
474 scope.join();
475 fail("join did not throw");
476 } catch (InterruptedException expected) {
477 assertFalse(Thread.interrupted()); // interrupt status should be clear
478 }
479 }
480 }
481
482 /**
483 * Test join when scope is cancelled.
484 */
485 @ParameterizedTest
486 @MethodSource("factories")
487 void testJoinWhenCancelled(ThreadFactory factory) throws Exception {
488 var countingThreadFactory = new CountingThreadFactory(factory);
489 var testJoiner = new CancelAfterOneJoiner<String>();
490
491 try (var scope = StructuredTaskScope.open(testJoiner,
492 cf -> cf.withThreadFactory(countingThreadFactory))) {
493
494 // fork subtask, the scope should be cancelled when the subtask completes
495 var subtask1 = scope.fork(() -> "foo");
496 while (!scope.isCancelled()) {
497 Thread.sleep(20);
498 }
499
500 // fork second subtask, it should not run
501 var subtask2 = scope.fork(() -> {
502 Thread.sleep(Duration.ofDays(1));
503 return "bar";
504 });
505
506 scope.join();
507
508 assertEquals("foo", subtask1.get());
509 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
510 }
511 }
512
513 /**
514 * Test join after scope is closed.
515 */
516 @Test
517 void testJoinAfterClose() throws Exception {
518 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
519 scope.close();
520 assertThrows(IllegalStateException.class, () -> scope.join());
521 }
522 }
523
524 /**
525 * Test join with timeout, subtasks finish before timeout expires.
526 */
527 @ParameterizedTest
528 @MethodSource("factories")
529 void testJoinWithTimeout1(ThreadFactory factory) throws Exception {
530 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
531 cf -> cf.withThreadFactory(factory)
532 .withTimeout(Duration.ofDays(1)))) {
533
534 Subtask<String> subtask = scope.fork(() -> {
535 Thread.sleep(Duration.ofSeconds(1));
536 return "foo";
537 });
538
539 scope.join();
540
541 assertFalse(scope.isCancelled());
542 assertEquals("foo", subtask.get());
543 }
544 }
545
546 /**
547 * Test join with timeout, timeout expires before subtasks finish.
548 */
549 @ParameterizedTest
550 @MethodSource("factories")
551 void testJoinWithTimeout2(ThreadFactory factory) throws Exception {
552 long startMillis = millisTime();
553 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
554 cf -> cf.withThreadFactory(factory)
555 .withTimeout(Duration.ofSeconds(2)))) {
556
557 Subtask<Void> subtask = scope.fork(() -> {
558 Thread.sleep(Duration.ofDays(1));
559 return null;
560 });
561
562 assertThrows(TimeoutException.class, scope::join);
563 expectDuration(startMillis, /*min*/1900, /*max*/20_000);
564
565 assertTrue(scope.isCancelled());
566 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
567 }
568 }
569
570 /**
571 * Test join with timeout that has already expired.
572 */
573 @ParameterizedTest
574 @MethodSource("factories")
575 void testJoinWithTimeout3(ThreadFactory factory) throws Exception {
576 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
577 cf -> cf.withThreadFactory(factory)
578 .withTimeout(Duration.ofSeconds(-1)))) {
579
580 Subtask<Void> subtask = scope.fork(() -> {
581 Thread.sleep(Duration.ofDays(1));
582 return null;
583 });
584
585 assertThrows(TimeoutException.class, scope::join);
586
587 assertTrue(scope.isCancelled());
588 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
589 }
590 }
591
592 /**
593 * Test that cancelling execution interrupts unfinished threads. This test uses
594 * a custom Joiner to cancel execution.
595 */
596 @ParameterizedTest
597 @MethodSource("factories")
598 void testCancelInterruptsThreads2(ThreadFactory factory) throws Exception {
599 var testJoiner = new CancelAfterOneJoiner<String>();
600
601 try (var scope = StructuredTaskScope.open(testJoiner,
602 cf -> cf.withThreadFactory(factory))) {
603
604 // fork subtask1 that runs for a long time
605 var started = new CountDownLatch(1);
606 var interrupted = new CountDownLatch(1);
607 var subtask1 = scope.fork(() -> {
608 started.countDown();
609 try {
610 Thread.sleep(Duration.ofDays(1));
611 } catch (InterruptedException e) {
612 interrupted.countDown();
613 }
614 });
615 started.await();
616
617 // fork subtask2, the scope should be cancelled when the subtask completes
618 var subtask2 = scope.fork(() -> "bar");
619 while (!scope.isCancelled()) {
620 Thread.sleep(20);
621 }
622
623 // subtask1 should be interrupted
624 interrupted.await();
625
626 scope.join();
627 assertEquals(Subtask.State.UNAVAILABLE, subtask1.state());
628 assertEquals("bar", subtask2.get());
629 }
630 }
631
632 /**
633 * Test that timeout interrupts unfinished threads.
634 */
635 @ParameterizedTest
636 @MethodSource("factories")
637 void testTimeoutInterruptsThreads(ThreadFactory factory) throws Exception {
638 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
639 cf -> cf.withThreadFactory(factory)
640 .withTimeout(Duration.ofSeconds(2)))) {
641
642 var started = new AtomicBoolean();
643 var interrupted = new CountDownLatch(1);
644 Subtask<Void> subtask = scope.fork(() -> {
645 started.set(true);
646 try {
647 Thread.sleep(Duration.ofDays(1));
648 } catch (InterruptedException e) {
649 interrupted.countDown();
650 }
651 return null;
652 });
653
654 while (!scope.isCancelled()) {
655 Thread.sleep(50);
656 }
657
658 // if subtask started then it should be interrupted
659 if (started.get()) {
660 interrupted.await();
661 }
662
663 assertThrows(TimeoutException.class, scope::join);
664
665 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
666 }
667 }
668
669 /**
670 * Test close without join, no subtasks forked.
671 */
672 @Test
673 void testCloseWithoutJoin1() {
674 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
675 // do nothing
676 }
677 }
678
679 /**
680 * Test close without join, subtasks forked.
681 */
682 @ParameterizedTest
683 @MethodSource("factories")
684 void testCloseWithoutJoin2(ThreadFactory factory) {
685 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
686 cf -> cf.withThreadFactory(factory))) {
687 Subtask<String> subtask = scope.fork(() -> {
688 Thread.sleep(Duration.ofDays(1));
689 return null;
690 });
691
692 // first call to close should throw
693 assertThrows(IllegalStateException.class, scope::close);
694
695 // subsequent calls to close should not throw
696 for (int i = 0; i < 3; i++) {
697 scope.close();
698 }
699
700 // subtask result/exception not available
701 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
702 assertThrows(IllegalStateException.class, subtask::get);
703 assertThrows(IllegalStateException.class, subtask::exception);
704 }
705 }
706
707 /**
708 * Test close after join throws. Close should not throw as join attempted.
709 */
710 @ParameterizedTest
711 @MethodSource("factories")
712 void testCloseAfterJoinThrows(ThreadFactory factory) throws Exception {
713 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
714 cf -> cf.withThreadFactory(factory))) {
715 var subtask = scope.fork(() -> {
716 Thread.sleep(Duration.ofDays(1));
717 return null;
718 });
719
720 // join throws
721 Thread.currentThread().interrupt();
722 assertThrows(InterruptedException.class, scope::join);
723 assertThrows(IllegalStateException.class, subtask::get);
724
725 } // close should not throw
726 }
727
728 /**
729 * Test close method is owner confined.
730 */
731 @ParameterizedTest
732 @MethodSource("factories")
733 void testCloseConfined(ThreadFactory factory) throws Exception {
734 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
735 cf -> cf.withThreadFactory(factory))) {
736
737 // random thread cannot close scope
738 try (var pool = Executors.newCachedThreadPool(factory)) {
739 Future<Boolean> future = pool.submit(() -> {
740 assertThrows(WrongThreadException.class, scope::close);
741 return null;
742 });
743 future.get();
744 }
745
746 // subtask cannot close
747 Subtask<Boolean> subtask = scope.fork(() -> {
748 assertThrows(WrongThreadException.class, scope::close);
749 return true;
750 });
751 scope.join();
752 assertTrue(subtask.get());
753 }
754 }
755
756 /**
757 * Test close with interrupt status set.
758 */
759 @ParameterizedTest
760 @MethodSource("factories")
761 void testInterruptClose1(ThreadFactory factory) throws Exception {
762 var testJoiner = new CancelAfterOneJoiner<String>();
763 try (var scope = StructuredTaskScope.open(testJoiner,
764 cf -> cf.withThreadFactory(factory))) {
765
766 // fork first subtask, a straggler as it continues after being interrupted
767 var started = new CountDownLatch(1);
768 var done = new AtomicBoolean();
769 scope.fork(() -> {
770 started.countDown();
771 try {
772 Thread.sleep(Duration.ofDays(1));
773 } catch (InterruptedException e) {
774 // interrupted by cancel, expected
775 }
776 Thread.sleep(Duration.ofMillis(100)); // force close to wait
777 done.set(true);
778 return null;
779 });
780 started.await();
781
782 // fork second subtask, the scope should be cancelled when this subtask completes
783 scope.fork(() -> "bar");
784 while (!scope.isCancelled()) {
785 Thread.sleep(20);
786 }
787
788 scope.join();
789
790 // invoke close with interrupt status set
791 Thread.currentThread().interrupt();
792 try {
793 scope.close();
794 } finally {
795 assertTrue(Thread.interrupted()); // clear interrupt status
796 assertTrue(done.get());
797 }
798 }
799 }
800
801 /**
802 * Test interrupting thread waiting in close.
803 */
804 @ParameterizedTest
805 @MethodSource("factories")
806 void testInterruptClose2(ThreadFactory factory) throws Exception {
807 var testJoiner = new CancelAfterOneJoiner<String>();
808 try (var scope = StructuredTaskScope.open(testJoiner,
809 cf -> cf.withThreadFactory(factory))) {
810
811 Thread mainThread = Thread.currentThread();
812
813 // fork first subtask, a straggler as it continues after being interrupted
814 var started = new CountDownLatch(1);
815 var done = new AtomicBoolean();
816 scope.fork(() -> {
817 started.countDown();
818 try {
819 Thread.sleep(Duration.ofDays(1));
820 } catch (InterruptedException e) {
821 // interrupted by cancel, expected
822 }
823
824 // interrupt main thread when it blocks in close
825 interruptThreadAt(mainThread, "java.util.concurrent.StructuredTaskScopeImpl.close");
826
827 Thread.sleep(Duration.ofMillis(100)); // force close to wait
828 done.set(true);
829 return null;
830 });
831 started.await();
832
833 // fork second subtask, the scope should be cancelled when this subtask completes
834 scope.fork(() -> "bar");
835 while (!scope.isCancelled()) {
836 Thread.sleep(20);
837 }
838
839 scope.join();
840
841 // main thread will be interrupted while blocked in close
842 try {
843 scope.close();
844 } finally {
845 assertTrue(Thread.interrupted()); // clear interrupt status
846 assertTrue(done.get());
847 }
848 }
849 }
850
851 /**
852 * Test that closing an enclosing scope closes the thread flock of a nested scope.
853 */
854 @Test
855 void testCloseThrowsStructureViolation() throws Exception {
856 try (var scope1 = StructuredTaskScope.open(Joiner.awaitAll())) {
857 try (var scope2 = StructuredTaskScope.open(Joiner.awaitAll())) {
858
859 // close enclosing scope
860 try {
861 scope1.close();
862 fail("close did not throw");
863 } catch (StructureViolationException expected) { }
864
865 // underlying flock should be closed
866 var executed = new AtomicBoolean();
867 Subtask<?> subtask = scope2.fork(() -> executed.set(true));
868 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
869 scope2.join();
870 assertFalse(executed.get());
871 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
872 }
873 }
874 }
875
876 /**
877 * Test that isCancelled returns true after close.
878 */
879 @Test
880 void testIsCancelledAfterClose() throws Exception {
881 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
882 assertFalse(scope.isCancelled());
883 scope.close();
884 assertTrue(scope.isCancelled());
885 }
886 }
887
888 /**
889 * Test Joiner.onFork throwing exception.
890 */
891 @Test
892 void testOnForkThrows() throws Exception {
893 var joiner = new Joiner<String, Void>() {
894 @Override
895 public boolean onFork(Subtask<? extends String> subtask) {
896 throw new FooException();
897 }
898 @Override
899 public Void result() {
900 return null;
901 }
902 };
903 try (var scope = StructuredTaskScope.open(joiner)) {
904 assertThrows(FooException.class, () -> scope.fork(() -> "foo"));
905 }
906 }
907
908 /**
909 * Test Joiner.onFork returning true to cancel execution.
910 */
911 @Test
912 void testOnForkCancelsExecution() throws Exception {
913 var joiner = new Joiner<String, Void>() {
914 @Override
915 public boolean onFork(Subtask<? extends String> subtask) {
916 return true;
917 }
918 @Override
919 public Void result() {
920 return null;
921 }
922 };
923 try (var scope = StructuredTaskScope.open(joiner)) {
924 assertFalse(scope.isCancelled());
925 scope.fork(() -> "foo");
926 assertTrue(scope.isCancelled());
927 scope.join();
928 }
929 }
930
931 /**
932 * Test Joiner.onComplete throwing exception causes UHE to be invoked.
933 */
934 @Test
935 void testOnCompleteThrows() throws Exception {
936 var joiner = new Joiner<String, Void>() {
937 @Override
938 public boolean onComplete(Subtask<? extends String> subtask) {
939 throw new FooException();
940 }
941 @Override
942 public Void result() {
943 return null;
944 }
945 };
946 var excRef = new AtomicReference<Throwable>();
947 Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e);
948 ThreadFactory factory = Thread.ofVirtual()
949 .uncaughtExceptionHandler(uhe)
950 .factory();
951 try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
952 scope.fork(() -> "foo");
953 scope.join();
954 assertInstanceOf(FooException.class, excRef.get());
955 }
956 }
957
958 /**
959 * Test Joiner.onComplete returning true to cancel execution.
960 */
961 @Test
962 void testOnCompleteCancelsExecution() throws Exception {
963 var joiner = new Joiner<String, Void>() {
964 @Override
965 public boolean onComplete(Subtask<? extends String> subtask) {
966 return true;
967 }
968 @Override
969 public Void result() {
970 return null;
971 }
972 };
973 try (var scope = StructuredTaskScope.open(joiner)) {
974 assertFalse(scope.isCancelled());
975 scope.fork(() -> "foo");
976 while (!scope.isCancelled()) {
977 Thread.sleep(10);
978 }
979 scope.join();
980 }
981 }
982
983 /**
984 * Test toString.
985 */
986 @Test
987 void testToString() throws Exception {
988 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
989 cf -> cf.withName("duke"))) {
990
991 // open
992 assertTrue(scope.toString().contains("duke"));
993
994 // closed
995 scope.close();
996 assertTrue(scope.toString().contains("duke"));
997 }
998 }
999
1000 /**
1001 * Test Subtask with task that completes successfully.
1002 */
1003 @ParameterizedTest
1004 @MethodSource("factories")
1005 void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception {
1006 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1007 cf -> cf.withThreadFactory(factory))) {
1008
1009 Subtask<String> subtask = scope.fork(() -> "foo");
1010
1011 // before join
1012 assertThrows(IllegalStateException.class, subtask::get);
1013 assertThrows(IllegalStateException.class, subtask::exception);
1014
1015 scope.join();
1016
1017 // after join
1018 assertEquals(Subtask.State.SUCCESS, subtask.state());
1019 assertEquals("foo", subtask.get());
1020 assertThrows(IllegalStateException.class, subtask::exception);
1021 }
1022 }
1023
1024 /**
1025 * Test Subtask with task that fails.
1026 */
1027 @ParameterizedTest
1028 @MethodSource("factories")
1029 void testSubtaskWhenFailed(ThreadFactory factory) throws Exception {
1030 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1031 cf -> cf.withThreadFactory(factory))) {
1032
1033 Subtask<String> subtask = scope.fork(() -> { throw new FooException(); });
1034
1035 // before join
1036 assertThrows(IllegalStateException.class, subtask::get);
1037 assertThrows(IllegalStateException.class, subtask::exception);
1038
1039 scope.join();
1040
1041 // after join
1042 assertEquals(Subtask.State.FAILED, subtask.state());
1043 assertThrows(IllegalStateException.class, subtask::get);
1044 assertTrue(subtask.exception() instanceof FooException);
1045 }
1046 }
1047
1048 /**
1049 * Test Subtask with a task that has not completed.
1050 */
1051 @ParameterizedTest
1052 @MethodSource("factories")
1053 void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception {
1054 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
1055 cf -> cf.withThreadFactory(factory))) {
1056 Subtask<Void> subtask = scope.fork(() -> {
1057 Thread.sleep(Duration.ofDays(1));
1058 return null;
1059 });
1060
1061 // before join
1062 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1063 assertThrows(IllegalStateException.class, subtask::get);
1064 assertThrows(IllegalStateException.class, subtask::exception);
1065
1066 // attempt join, join throws
1067 Thread.currentThread().interrupt();
1068 assertThrows(InterruptedException.class, scope::join);
1069
1070 // after join
1071 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1072 assertThrows(IllegalStateException.class, subtask::get);
1073 assertThrows(IllegalStateException.class, subtask::exception);
1074 }
1075 }
1076
1077 /**
1078 * Test Subtask forked after execution cancelled.
1079 */
1080 @ParameterizedTest
1081 @MethodSource("factories")
1082 void testSubtaskWhenCancelled(ThreadFactory factory) throws Exception {
1083 try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) {
1084 scope.fork(() -> "foo");
1085 while (!scope.isCancelled()) {
1086 Thread.sleep(20);
1087 }
1088
1089 var subtask = scope.fork(() -> "foo");
1090
1091 // before join
1092 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1093 assertThrows(IllegalStateException.class, subtask::get);
1094 assertThrows(IllegalStateException.class, subtask::exception);
1095
1096 scope.join();
1097
1098 // after join
1099 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1100 assertThrows(IllegalStateException.class, subtask::get);
1101 assertThrows(IllegalStateException.class, subtask::exception);
1102 }
1103 }
1104
1105 /**
1106 * Test Subtask::toString.
1107 */
1108 @Test
1109 void testSubtaskToString() throws Exception {
1110 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1111 var latch = new CountDownLatch(1);
1112 var subtask1 = scope.fork(() -> {
1113 latch.await();
1114 return "foo";
1115 });
1116 var subtask2 = scope.fork(() -> { throw new FooException(); });
1117
1118 // subtask1 result is unavailable
1119 assertTrue(subtask1.toString().contains("Unavailable"));
1120 latch.countDown();
1121
1122 scope.join();
1123
1124 assertTrue(subtask1.toString().contains("Completed successfully"));
1125 assertTrue(subtask2.toString().contains("Failed"));
1126 }
1127 }
1128
1129 /**
1130 * Test Joiner.allSuccessfulOrThrow() with no subtasks.
1131 */
1132 @Test
1133 void testAllSuccessfulOrThrow1() throws Throwable {
1134 try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow())) {
1135 var subtasks = scope.join().toList();
1136 assertTrue(subtasks.isEmpty());
1137 }
1138 }
1139
1140 /**
1141 * Test Joiner.allSuccessfulOrThrow() with subtasks that complete successfully.
1142 */
1143 @ParameterizedTest
1144 @MethodSource("factories")
1145 void testAllSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
1146 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1147 cf -> cf.withThreadFactory(factory))) {
1148 var subtask1 = scope.fork(() -> "foo");
1149 var subtask2 = scope.fork(() -> "bar");
1150 var subtasks = scope.join().toList();
1151 assertEquals(List.of(subtask1, subtask2), subtasks);
1152 assertEquals("foo", subtask1.get());
1153 assertEquals("bar", subtask2.get());
1154 }
1155 }
1156
1157 /**
1158 * Test Joiner.allSuccessfulOrThrow() with a subtask that complete successfully and
1159 * a subtask that fails.
1160 */
1161 @ParameterizedTest
1162 @MethodSource("factories")
1163 void testAllSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1164 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1165 cf -> cf.withThreadFactory(factory))) {
1166 scope.fork(() -> "foo");
1167 scope.fork(() -> { throw new FooException(); });
1168 try {
1169 scope.join();
1170 } catch (FailedException e) {
1171 assertTrue(e.getCause() instanceof FooException);
1172 }
1173 }
1174 }
1175
1176 /**
1177 * Test Joiner.anySuccessfulResultOrThrow() with no subtasks.
1178 */
1179 @Test
1180 void testAnySuccessfulResultOrThrow1() throws Exception {
1181 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
1182 try {
1183 scope.join();
1184 } catch (FailedException e) {
1185 assertTrue(e.getCause() instanceof NoSuchElementException);
1186 }
1187 }
1188 }
1189
1190 /**
1191 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully.
1192 */
1193 @ParameterizedTest
1194 @MethodSource("factories")
1195 void testAnySuccessfulResultOrThrow2(ThreadFactory factory) throws Exception {
1196 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
1197 cf -> cf.withThreadFactory(factory))) {
1198 scope.fork(() -> "foo");
1199 String result = scope.join();
1200 assertEquals("foo", result);
1201 }
1202 }
1203
1204 /**
1205 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully
1206 * with a null result.
1207 */
1208 @ParameterizedTest
1209 @MethodSource("factories")
1210 void testAnySuccessfulResultOrThrow3(ThreadFactory factory) throws Exception {
1211 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
1212 cf -> cf.withThreadFactory(factory))) {
1213 scope.fork(() -> null);
1214 String result = scope.join();
1215 assertNull(result);
1216 }
1217 }
1218
1219 /**
1220 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that complete succcessfully
1221 * and a subtask that fails.
1222 */
1223 @ParameterizedTest
1224 @MethodSource("factories")
1225 void testAnySuccessfulResultOrThrow4(ThreadFactory factory) throws Exception {
1226 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
1227 cf -> cf.withThreadFactory(factory))) {
1228 scope.fork(() -> "foo");
1229 scope.fork(() -> { throw new FooException(); });
1230 String first = scope.join();
1231 assertEquals("foo", first);
1232 }
1233 }
1234
1235 /**
1236 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that fails.
1237 */
1238 @ParameterizedTest
1239 @MethodSource("factories")
1240 void testAnySuccessfulResultOrThrow5(ThreadFactory factory) throws Exception {
1241 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
1242 cf -> cf.withThreadFactory(factory))) {
1243 scope.fork(() -> { throw new FooException(); });
1244 Throwable ex = assertThrows(FailedException.class, scope::join);
1245 assertTrue(ex.getCause() instanceof FooException);
1246 }
1247 }
1248
1249 /**
1250 * Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks.
1251 */
1252 @Test
1253 void testAwaitSuccessfulOrThrow1() throws Throwable {
1254 try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {
1255 var result = scope.join();
1256 assertNull(result);
1257 }
1258 }
1259
1260 /**
1261 * Test Joiner.awaitAllSuccessfulOrThrow() with subtasks that complete successfully.
1262 */
1263 @ParameterizedTest
1264 @MethodSource("factories")
1265 void testAwaitSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
1266 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1267 cf -> cf.withThreadFactory(factory))) {
1268 var subtask1 = scope.fork(() -> "foo");
1269 var subtask2 = scope.fork(() -> "bar");
1270 var result = scope.join();
1271 assertNull(result);
1272 assertEquals("foo", subtask1.get());
1273 assertEquals("bar", subtask2.get());
1274 }
1275 }
1276
1277 /**
1278 * Test Joiner.awaitAllSuccessfulOrThrow() with a subtask that complete successfully and
1279 * a subtask that fails.
1280 */
1281 @ParameterizedTest
1282 @MethodSource("factories")
1283 void testAwaitSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1284 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1285 cf -> cf.withThreadFactory(factory))) {
1286 scope.fork(() -> "foo");
1287 scope.fork(() -> { throw new FooException(); });
1288 try {
1289 scope.join();
1290 } catch (FailedException e) {
1291 assertTrue(e.getCause() instanceof FooException);
1292 }
1293 }
1294 }
1295
1296 /**
1297 * Test Joiner.awaitAll() with no subtasks.
1298 */
1299 @Test
1300 void testAwaitAll1() throws Throwable {
1301 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1302 var result = scope.join();
1303 assertNull(result);
1304 }
1305 }
1306
1307 /**
1308 * Test Joiner.awaitAll() with subtasks that complete successfully.
1309 */
1310 @ParameterizedTest
1311 @MethodSource("factories")
1312 void testAwaitAll2(ThreadFactory factory) throws Throwable {
1313 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1314 cf -> cf.withThreadFactory(factory))) {
1315 var subtask1 = scope.fork(() -> "foo");
1316 var subtask2 = scope.fork(() -> "bar");
1317 var result = scope.join();
1318 assertNull(result);
1319 assertEquals("foo", subtask1.get());
1320 assertEquals("bar", subtask2.get());
1321 }
1322 }
1323
1324 /**
1325 * Test Joiner.awaitAll() with a subtask that complete successfully and a subtask
1326 * that fails.
1327 */
1328 @ParameterizedTest
1329 @MethodSource("factories")
1330 void testAwaitAll3(ThreadFactory factory) throws Throwable {
1331 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1332 cf -> cf.withThreadFactory(factory))) {
1333 var subtask1 = scope.fork(() -> "foo");
1334 var subtask2 = scope.fork(() -> { throw new FooException(); });
1335 var result = scope.join();
1336 assertNull(result);
1337 assertEquals("foo", subtask1.get());
1338 assertTrue(subtask2.exception() instanceof FooException);
1339 }
1340 }
1341
1342 /**
1343 * Test Joiner.allUntil(Predicate) with no subtasks.
1344 */
1345 @Test
1346 void testAllUntil1() throws Throwable {
1347 try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) {
1348 var subtasks = scope.join();
1349 assertEquals(0, subtasks.count());
1350 }
1351 }
1352
1353 /**
1354 * Test Joiner.allUntil(Predicate) with no cancellation.
1355 */
1356 @ParameterizedTest
1357 @MethodSource("factories")
1358 void testAllUntil2(ThreadFactory factory) throws Exception {
1359 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false),
1360 cf -> cf.withThreadFactory(factory))) {
1361
1362 var subtask1 = scope.fork(() -> "foo");
1363 var subtask2 = scope.fork(() -> { throw new FooException(); });
1364
1365 var subtasks = scope.join().toList();
1366 assertEquals(2, subtasks.size());
1367
1368 assertTrue(subtasks.get(0) == subtask1);
1369 assertTrue(subtasks.get(1) == subtask2);
1370 assertEquals("foo", subtask1.get());
1371 assertTrue(subtask2.exception() instanceof FooException);
1372 }
1373 }
1374
1375 /**
1376 * Test Joiner.allUntil(Predicate) with cancellation after one subtask completes.
1377 */
1378 @ParameterizedTest
1379 @MethodSource("factories")
1380 void testAllUntil3(ThreadFactory factory) throws Exception {
1381 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> true),
1382 cf -> cf.withThreadFactory(factory))) {
1383
1384 var subtask1 = scope.fork(() -> "foo");
1385 var subtask2 = scope.fork(() -> {
1386 Thread.sleep(Duration.ofDays(1));
1387 return "bar";
1388 });
1389
1390 var subtasks = scope.join().toList();
1391
1392 assertEquals(2, subtasks.size());
1393 assertTrue(subtasks.get(0) == subtask1);
1394 assertTrue(subtasks.get(1) == subtask2);
1395 assertEquals("foo", subtask1.get());
1396 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1397 }
1398 }
1399
1400 /**
1401 * Test Joiner.allUntil(Predicate) with cancellation after serveral subtasks complete.
1402 */
1403 @ParameterizedTest
1404 @MethodSource("factories")
1405 void testAllUntil4(ThreadFactory factory) throws Exception {
1406
1407 // cancel execution after two or more failures
1408 class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> {
1409 final AtomicInteger failedCount = new AtomicInteger();
1410 @Override
1411 public boolean test(Subtask<? extends T> subtask) {
1412 return subtask.state() == Subtask.State.FAILED
1413 && failedCount.incrementAndGet() >= 2;
1414 }
1415 }
1416 var joiner = Joiner.allUntil(new CancelAfterTwoFailures<String>());
1417
1418 try (var scope = StructuredTaskScope.open(joiner)) {
1419 int forkCount = 0;
1420
1421 // fork subtasks until execution cancelled
1422 while (!scope.isCancelled()) {
1423 scope.fork(() -> "foo");
1424 scope.fork(() -> { throw new FooException(); });
1425 forkCount += 2;
1426 Thread.sleep(Duration.ofMillis(10));
1427 }
1428
1429 var subtasks = scope.join().toList();
1430 assertEquals(forkCount, subtasks.size());
1431
1432 long failedCount = subtasks.stream()
1433 .filter(s -> s.state() == Subtask.State.FAILED)
1434 .count();
1435 assertTrue(failedCount >= 2);
1436 }
1437 }
1438
1439 /**
1440 * Test Test Joiner.allUntil(Predicate) where the Predicate's test method throws.
1441 */
1442 @Test
1443 void testAllUntil5() throws Exception {
1444 var joiner = Joiner.allUntil(_ -> { throw new FooException(); });
1445 var excRef = new AtomicReference<Throwable>();
1446 Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e);
1447 ThreadFactory factory = Thread.ofVirtual()
1448 .uncaughtExceptionHandler(uhe)
1449 .factory();
1450 try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
1451 scope.fork(() -> "foo");
1452 scope.join();
1453 assertInstanceOf(FooException.class, excRef.get());
1454 }
1455 }
1456
1457 /**
1458 * Test Joiner default methods.
1459 */
1460 @Test
1461 void testJoinerDefaultMethods() throws Exception {
1462 try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) {
1463
1464 // need subtasks to test default methods
1465 var subtask1 = scope.fork(() -> "foo");
1466 while (!scope.isCancelled()) {
1467 Thread.sleep(20);
1468 }
1469 var subtask2 = scope.fork(() -> "bar");
1470 scope.join();
1471
1472 assertEquals(Subtask.State.SUCCESS, subtask1.state());
1473 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1474
1475 // Joiner that does not override default methods
1476 Joiner<Object, Void> joiner = () -> null;
1477 assertThrows(NullPointerException.class, () -> joiner.onFork(null));
1478 assertThrows(NullPointerException.class, () -> joiner.onComplete(null));
1479 assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1));
1480 assertFalse(joiner.onFork(subtask2));
1481 assertFalse(joiner.onComplete(subtask1));
1482 assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2));
1483 }
1484 }
1485
1486 /**
1487 * Test Joiners onFork/onComplete methods with a subtask in an unexpected state.
1488 */
1489 @Test
1490 void testJoinersWithUnavailableResukt() throws Exception {
1491 try (var scope = StructuredTaskScope.open()) {
1492 var done = new CountDownLatch(1);
1493 var subtask = scope.fork(() -> {
1494 done.await();
1495 return null;
1496 });
1497
1498 // onComplete with uncompleted task should throw IAE
1499 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1500 assertThrows(IllegalArgumentException.class,
1501 () -> Joiner.allSuccessfulOrThrow().onComplete(subtask));
1502 assertThrows(IllegalArgumentException.class,
1503 () -> Joiner.anySuccessfulResultOrThrow().onComplete(subtask));
1504 assertThrows(IllegalArgumentException.class,
1505 () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(subtask));
1506 assertThrows(IllegalArgumentException.class,
1507 () -> Joiner.awaitAll().onComplete(subtask));
1508 assertThrows(IllegalArgumentException.class,
1509 () -> Joiner.allUntil(_ -> false).onComplete(subtask));
1510
1511 done.countDown();
1512 scope.join();
1513
1514 // onFork with completed task should throw IAE
1515 assertEquals(Subtask.State.SUCCESS, subtask.state());
1516 assertThrows(IllegalArgumentException.class,
1517 () -> Joiner.allSuccessfulOrThrow().onFork(subtask));
1518 assertThrows(IllegalArgumentException.class,
1519 () -> Joiner.anySuccessfulResultOrThrow().onFork(subtask));
1520 assertThrows(IllegalArgumentException.class,
1521 () -> Joiner.awaitAllSuccessfulOrThrow().onFork(subtask));
1522 assertThrows(IllegalArgumentException.class,
1523 () -> Joiner.awaitAll().onFork(subtask));
1524 assertThrows(IllegalArgumentException.class,
1525 () -> Joiner.allUntil(_ -> false).onFork(subtask));
1526 }
1527
1528 }
1529
1530 /**
1531 * Test the Config function apply method throwing an exception.
1532 */
1533 @Test
1534 void testConfigFunctionThrows() throws Exception {
1535 assertThrows(FooException.class,
1536 () -> StructuredTaskScope.open(Joiner.awaitAll(),
1537 cf -> { throw new FooException(); }));
1538 }
1539
1540 /**
1541 * Test Config equals/hashCode/toString
1542 */
1543 @Test
1544 void testConfigMethods() throws Exception {
1545 Function<Config, Config> testConfig = cf -> {
1546 var name = "duke";
1547 var threadFactory = Thread.ofPlatform().factory();
1548 var timeout = Duration.ofSeconds(10);
1549
1550 assertEquals(cf, cf);
1551 assertEquals(cf.withName(name), cf.withName(name));
1552 assertEquals(cf.withThreadFactory(threadFactory), cf.withThreadFactory(threadFactory));
1553 assertEquals(cf.withTimeout(timeout), cf.withTimeout(timeout));
1554
1555 assertNotEquals(cf, cf.withName(name));
1556 assertNotEquals(cf, cf.withThreadFactory(threadFactory));
1557 assertNotEquals(cf, cf.withTimeout(timeout));
1558
1559 assertEquals(cf.withName(name).hashCode(), cf.withName(name).hashCode());
1560 assertEquals(cf.withThreadFactory(threadFactory).hashCode(),
1561 cf.withThreadFactory(threadFactory).hashCode());
1562 assertEquals(cf.withTimeout(timeout).hashCode(), cf.withTimeout(timeout).hashCode());
1563
1564 assertTrue(cf.withName(name).toString().contains(name));
1565 assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
1566 assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
1567
1568 return cf;
1569 };
1570 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), testConfig)) {
1571 // do nothing
1572 }
1573 }
1574
1575 /**
1576 * Test for NullPointerException.
1577 */
1578 @Test
1579 void testNulls() throws Exception {
1580 assertThrows(NullPointerException.class,
1581 () -> StructuredTaskScope.open(null));
1582 assertThrows(NullPointerException.class,
1583 () -> StructuredTaskScope.open(null, cf -> cf));
1584 assertThrows(NullPointerException.class,
1585 () -> StructuredTaskScope.open(Joiner.awaitAll(), null));
1586
1587 assertThrows(NullPointerException.class, () -> Joiner.allUntil(null));
1588
1589 // fork
1590 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1591 assertThrows(NullPointerException.class, () -> scope.fork((Callable<Object>) null));
1592 assertThrows(NullPointerException.class, () -> scope.fork((Runnable) null));
1593 }
1594
1595 // Config and withXXX methods
1596 assertThrows(NullPointerException.class,
1597 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> null));
1598 assertThrows(NullPointerException.class,
1599 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withName(null)));
1600 assertThrows(NullPointerException.class,
1601 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withThreadFactory(null)));
1602 assertThrows(NullPointerException.class,
1603 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withTimeout(null)));
1604
1605 // Joiner.onFork/onComplete
1606 assertThrows(NullPointerException.class,
1607 () -> Joiner.awaitAllSuccessfulOrThrow().onFork(null));
1608 assertThrows(NullPointerException.class,
1609 () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(null));
1610 assertThrows(NullPointerException.class,
1611 () -> Joiner.awaitAll().onFork(null));
1612 assertThrows(NullPointerException.class,
1613 () -> Joiner.awaitAll().onComplete(null));
1614 assertThrows(NullPointerException.class,
1615 () -> Joiner.allSuccessfulOrThrow().onFork(null));
1616 assertThrows(NullPointerException.class,
1617 () -> Joiner.allSuccessfulOrThrow().onComplete(null));
1618 assertThrows(NullPointerException.class,
1619 () -> Joiner.anySuccessfulResultOrThrow().onFork(null));
1620 assertThrows(NullPointerException.class,
1621 () -> Joiner.anySuccessfulResultOrThrow().onComplete(null));
1622 }
1623
1624 /**
1625 * ThreadFactory that counts usage.
1626 */
1627 private static class CountingThreadFactory implements ThreadFactory {
1628 final ThreadFactory delegate;
1629 final AtomicInteger threadCount = new AtomicInteger();
1630 CountingThreadFactory(ThreadFactory delegate) {
1631 this.delegate = delegate;
1632 }
1633 @Override
1634 public Thread newThread(Runnable task) {
1635 threadCount.incrementAndGet();
1636 return delegate.newThread(task);
1637 }
1638 int threadCount() {
1639 return threadCount.get();
1640 }
1641 }
1642
1643 /**
1644 * A joiner that counts that counts the number of subtasks that are forked and the
1645 * number of subtasks that complete.
1646 */
1647 private static class CountingJoiner<T> implements Joiner<T, Void> {
1648 final AtomicInteger onForkCount = new AtomicInteger();
1649 final AtomicInteger onCompleteCount = new AtomicInteger();
1650 @Override
1651 public boolean onFork(Subtask<? extends T> subtask) {
1652 onForkCount.incrementAndGet();
1653 return false;
1654 }
1655 @Override
1656 public boolean onComplete(Subtask<? extends T> subtask) {
1657 onCompleteCount.incrementAndGet();
1658 return false;
1659 }
1660 @Override
1661 public Void result() {
1662 return null;
1663 }
1664 int onForkCount() {
1665 return onForkCount.get();
1666 }
1667 int onCompleteCount() {
1668 return onCompleteCount.get();
1669 }
1670 }
1671
1672 /**
1673 * A joiner that cancels execution when a subtask completes. It also keeps a count
1674 * of the number of subtasks that are forked and the number of subtasks that complete.
1675 */
1676 private static class CancelAfterOneJoiner<T> implements Joiner<T, Void> {
1677 final AtomicInteger onForkCount = new AtomicInteger();
1678 final AtomicInteger onCompleteCount = new AtomicInteger();
1679 @Override
1680 public boolean onFork(Subtask<? extends T> subtask) {
1681 onForkCount.incrementAndGet();
1682 return false;
1683 }
1684 @Override
1685 public boolean onComplete(Subtask<? extends T> subtask) {
1686 onCompleteCount.incrementAndGet();
1687 return true;
1688 }
1689 @Override
1690 public Void result() {
1691 return null;
1692 }
1693 int onForkCount() {
1694 return onForkCount.get();
1695 }
1696 int onCompleteCount() {
1697 return onCompleteCount.get();
1698 }
1699 }
1700
1701 /**
1702 * A runtime exception for tests.
1703 */
1704 private static class FooException extends RuntimeException {
1705 FooException() { }
1706 FooException(Throwable cause) { super(cause); }
1707 }
1708
1709 /**
1710 * Returns the current time in milliseconds.
1711 */
1712 private long millisTime() {
1713 long now = System.nanoTime();
1714 return TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);
1715 }
1716
1717 /**
|