44 import java.util.concurrent.Callable;
45 import java.util.concurrent.ConcurrentHashMap;
46 import java.util.concurrent.CountDownLatch;
47 import java.util.concurrent.Executors;
48 import java.util.concurrent.Future;
49 import java.util.concurrent.LinkedTransferQueue;
50 import java.util.concurrent.ThreadFactory;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.RejectedExecutionException;
53 import java.util.concurrent.ScheduledExecutorService;
54 import java.util.concurrent.StructuredTaskScope;
55 import java.util.concurrent.StructuredTaskScope.TimeoutException;
56 import java.util.concurrent.StructuredTaskScope.Configuration;
57 import java.util.concurrent.StructuredTaskScope.FailedException;
58 import java.util.concurrent.StructuredTaskScope.Joiner;
59 import java.util.concurrent.StructuredTaskScope.Subtask;
60 import java.util.concurrent.StructureViolationException;
61 import java.util.concurrent.atomic.AtomicBoolean;
62 import java.util.concurrent.atomic.AtomicInteger;
63 import java.util.concurrent.atomic.AtomicReference;
64 import java.util.function.Function;
65 import java.util.function.Predicate;
66 import java.util.stream.Stream;
67 import static java.lang.Thread.State.*;
68
69 import org.junit.jupiter.api.Test;
70 import org.junit.jupiter.api.BeforeAll;
71 import org.junit.jupiter.api.AfterAll;
72 import org.junit.jupiter.params.ParameterizedTest;
73 import org.junit.jupiter.params.provider.MethodSource;
74 import static org.junit.jupiter.api.Assertions.*;
75
76 class StructuredTaskScopeTest {
77 private static ScheduledExecutorService scheduler;
78 private static List<ThreadFactory> threadFactories;
79
80 @BeforeAll
81 static void setup() throws Exception {
82 scheduler = Executors.newSingleThreadScheduledExecutor();
83
84 // thread factories
85 String value = System.getProperty("threadFactory");
192 future.get();
193 }
194
195 // subtask cannot fork
196 Subtask<Boolean> subtask = scope.fork(() -> {
197 assertThrows(WrongThreadException.class, () -> {
198 scope.fork(() -> null);
199 });
200 return true;
201 });
202 scope.join();
203 assertTrue(subtask.get());
204 }
205 }
206
207 /**
208 * Test fork after join, no subtasks forked before join.
209 */
210 @ParameterizedTest
211 @MethodSource("factories")
212 void testForkAfterJoin1(ThreadFactory factory) throws Exception {
213 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
214 cf -> cf.withThreadFactory(factory))) {
215 scope.join();
216 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
217 }
218 }
219
220 /**
221 * Test fork after join, subtasks forked before join.
222 */
223 @ParameterizedTest
224 @MethodSource("factories")
225 void testForkAfterJoin2(ThreadFactory factory) throws Exception {
226 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
227 cf -> cf.withThreadFactory(factory))) {
228 scope.fork(() -> "foo");
229 scope.join();
230 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
231 }
232 }
233
234 /**
235 * Test fork after join throws.
236 */
237 @ParameterizedTest
238 @MethodSource("factories")
239 void testForkAfterJoinThrows(ThreadFactory factory) throws Exception {
240 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
241 cf -> cf.withThreadFactory(factory))) {
242 var latch = new CountDownLatch(1);
243 var subtask1 = scope.fork(() -> {
244 latch.await();
245 return "foo";
246 });
247
248 // join throws
249 Thread.currentThread().interrupt();
250 assertThrows(InterruptedException.class, scope::join);
251
252 // fork should throw
253 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
254 }
255 }
256
257 /**
258 * Test fork after task scope is cancelled. This test uses a custom Joiner to
259 * cancel execution.
260 */
261 @ParameterizedTest
262 @MethodSource("factories")
263 void testForkAfterCancel2(ThreadFactory factory) throws Exception {
264 var countingThreadFactory = new CountingThreadFactory(factory);
265 var testJoiner = new CancelAfterOneJoiner<String>();
266
267 try (var scope = StructuredTaskScope.open(testJoiner,
268 cf -> cf.withThreadFactory(countingThreadFactory))) {
269
270 // fork subtask, the scope should be cancelled when the subtask completes
271 var subtask1 = scope.fork(() -> "foo");
272 awaitCancelled(scope);
273
274 assertEquals(1, countingThreadFactory.threadCount());
275 assertEquals(1, testJoiner.onForkCount());
276 assertEquals(1, testJoiner.onCompleteCount());
279 var subtask2 = scope.fork(() -> "bar");
280
281 // onFork should be invoked, newThread and onComplete should not be invoked
282 assertEquals(1, countingThreadFactory.threadCount());
283 assertEquals(2, testJoiner.onForkCount());
284 assertEquals(1, testJoiner.onCompleteCount());
285
286 scope.join();
287
288 assertEquals(1, countingThreadFactory.threadCount());
289 assertEquals(2, testJoiner.onForkCount());
290 assertEquals(1, testJoiner.onCompleteCount());
291 assertEquals("foo", subtask1.get());
292 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
293 }
294 }
295
296 /**
297 * Test fork after task scope is closed.
298 */
299 @Test
300 void testForkAfterClose() {
301 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
302 scope.close();
303 assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
304 }
305 }
306
307 /**
308 * Test fork with a ThreadFactory that rejects creating a thread.
309 */
310 @Test
311 void testForkRejectedExecutionException() {
312 ThreadFactory factory = task -> null;
313 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
314 cf -> cf.withThreadFactory(factory))) {
315 assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null));
316 }
317 }
318
319 /**
320 * Test join with no subtasks.
321 */
349 @Test
350 void testJoinAfterJoin1() throws Exception {
351 var results = new LinkedTransferQueue<>(List.of("foo", "bar", "baz"));
352 Joiner<Object, String> joiner = results::take;
353 try (var scope = StructuredTaskScope.open(joiner)) {
354 scope.fork(() -> "foo");
355 assertEquals("foo", scope.join());
356
357 // join already called
358 for (int i = 0 ; i < 3; i++) {
359 assertThrows(IllegalStateException.class, scope::join);
360 }
361 }
362 }
363
364 /**
365 * Test join after join completed with an exception.
366 */
367 @Test
368 void testJoinAfterJoin2() throws Exception {
369 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
370 scope.fork(() -> { throw new FooException(); });
371 Throwable ex = assertThrows(FailedException.class, scope::join);
372 assertTrue(ex.getCause() instanceof FooException);
373
374 // join already called
375 for (int i = 0 ; i < 3; i++) {
376 assertThrows(IllegalStateException.class, scope::join);
377 }
378 }
379 }
380
381 /**
382 * Test join after join completed with a timeout.
383 */
384 @Test
385 void testJoinAfterJoin3() throws Exception {
386 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
387 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
388 // wait for scope to be cancelled by timeout
389 awaitCancelled(scope);
390 assertThrows(TimeoutException.class, scope::join);
391
392 // join already called
393 for (int i = 0 ; i < 3; i++) {
394 assertThrows(IllegalStateException.class, scope::join);
395 }
396 }
397 }
398
399 /**
400 * Test join method is owner confined.
401 */
402 @ParameterizedTest
403 @MethodSource("factories")
404 void testJoinConfined(ThreadFactory factory) throws Exception {
405 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
406 cf -> cf.withThreadFactory(factory))) {
407
408 // random thread cannot join
409 try (var pool = Executors.newSingleThreadExecutor()) {
410 Future<Void> future = pool.submit(() -> {
411 assertThrows(WrongThreadException.class, scope::join);
412 return null;
413 });
414 future.get();
415 }
416
417 // subtask cannot join
418 Subtask<Boolean> subtask = scope.fork(() -> {
419 assertThrows(WrongThreadException.class, () -> { scope.join(); });
420 return true;
421 });
422 scope.join();
423 assertTrue(subtask.get());
424 }
425 }
426
427 /**
428 * Test join with interrupt status set.
429 */
430 @ParameterizedTest
431 @MethodSource("factories")
432 void testInterruptJoin1(ThreadFactory factory) throws Exception {
433 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
434 cf -> cf.withThreadFactory(factory))) {
435
436 Subtask<String> subtask = scope.fork(() -> {
437 Thread.sleep(60_000);
438 return "foo";
439 });
440
441 // join should throw
442 Thread.currentThread().interrupt();
443 try {
444 scope.join();
445 fail("join did not throw");
446 } catch (InterruptedException expected) {
447 assertFalse(Thread.interrupted()); // interrupt status should be cleared
448 }
449 }
450 }
451
452 /**
453 * Test interrupt of thread blocked in join.
454 */
455 @ParameterizedTest
456 @MethodSource("factories")
457 void testInterruptJoin2(ThreadFactory factory) throws Exception {
458 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
459 cf -> cf.withThreadFactory(factory))) {
460
461 var latch = new CountDownLatch(1);
462 Subtask<String> subtask = scope.fork(() -> {
463 Thread.sleep(60_000);
464 return "foo";
465 });
466
467 // interrupt main thread when it blocks in join
468 scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join");
469 try {
470 scope.join();
471 fail("join did not throw");
472 } catch (InterruptedException expected) {
473 assertFalse(Thread.interrupted()); // interrupt status should be clear
474 }
475 }
476 }
477
478 /**
479 * Test join when scope is cancelled.
480 */
481 @ParameterizedTest
482 @MethodSource("factories")
483 void testJoinWhenCancelled(ThreadFactory factory) throws Exception {
862
863 /**
864 * Test that isCancelled returns true after close.
865 */
866 @Test
867 void testIsCancelledAfterClose() throws Exception {
868 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
869 assertFalse(scope.isCancelled());
870 scope.close();
871 assertTrue(scope.isCancelled());
872 }
873 }
874
875 /**
876 * Test Joiner.onFork throwing exception.
877 */
878 @Test
879 void testOnForkThrows() throws Exception {
880 var joiner = new Joiner<String, Void>() {
881 @Override
882 public boolean onFork(Subtask<? extends String> subtask) {
883 throw new FooException();
884 }
885 @Override
886 public Void result() {
887 return null;
888 }
889 };
890 try (var scope = StructuredTaskScope.open(joiner)) {
891 assertThrows(FooException.class, () -> scope.fork(() -> "foo"));
892 }
893 }
894
895 /**
896 * Test Joiner.onFork returning true to cancel execution.
897 */
898 @Test
899 void testOnForkCancelsExecution() throws Exception {
900 var joiner = new Joiner<String, Void>() {
901 @Override
902 public boolean onFork(Subtask<? extends String> subtask) {
903 return true;
904 }
905 @Override
906 public Void result() {
907 return null;
908 }
909 };
910 try (var scope = StructuredTaskScope.open(joiner)) {
911 assertFalse(scope.isCancelled());
912 scope.fork(() -> "foo");
913 assertTrue(scope.isCancelled());
914 scope.join();
915 }
916 }
917
918 /**
919 * Test Joiner.onComplete throwing exception causes UHE to be invoked.
920 */
921 @Test
922 void testOnCompleteThrows() throws Exception {
923 var joiner = new Joiner<String, Void>() {
924 @Override
925 public boolean onComplete(Subtask<? extends String> subtask) {
926 throw new FooException();
927 }
928 @Override
929 public Void result() {
930 return null;
931 }
932 };
933 var excRef = new AtomicReference<Throwable>();
934 Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e);
935 ThreadFactory factory = Thread.ofVirtual()
936 .uncaughtExceptionHandler(uhe)
937 .factory();
938 try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
939 scope.fork(() -> "foo");
940 scope.join();
941 assertInstanceOf(FooException.class, excRef.get());
942 }
943 }
944
945 /**
946 * Test Joiner.onComplete returning true to cancel execution.
947 */
948 @Test
949 void testOnCompleteCancelsExecution() throws Exception {
950 var joiner = new Joiner<String, Void>() {
951 @Override
952 public boolean onComplete(Subtask<? extends String> subtask) {
953 return true;
954 }
955 @Override
956 public Void result() {
957 return null;
958 }
959 };
960 try (var scope = StructuredTaskScope.open(joiner)) {
961 assertFalse(scope.isCancelled());
962 scope.fork(() -> "foo");
963 awaitCancelled(scope);
964 scope.join();
965 }
966 }
967
968 /**
969 * Test toString.
970 */
971 @Test
972 void testToString() throws Exception {
973 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
974 cf -> cf.withName("duke"))) {
975
976 // open
977 assertTrue(scope.toString().contains("duke"));
978
979 // closed
980 scope.close();
981 assertTrue(scope.toString().contains("duke"));
982 }
983 }
984
985 /**
986 * Test Subtask with task that completes successfully.
987 */
988 @ParameterizedTest
989 @MethodSource("factories")
990 void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception {
991 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
992 cf -> cf.withThreadFactory(factory))) {
993
994 Subtask<String> subtask = scope.fork(() -> "foo");
995
996 // before join
997 assertThrows(IllegalStateException.class, subtask::get);
998 assertThrows(IllegalStateException.class, subtask::exception);
999
1000 scope.join();
1001
1002 // after join
1003 assertEquals(Subtask.State.SUCCESS, subtask.state());
1004 assertEquals("foo", subtask.get());
1005 assertThrows(IllegalStateException.class, subtask::exception);
1006 }
1007 }
1008
1009 /**
1010 * Test Subtask with task that fails.
1011 */
1012 @ParameterizedTest
1013 @MethodSource("factories")
1014 void testSubtaskWhenFailed(ThreadFactory factory) throws Exception {
1015 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1016 cf -> cf.withThreadFactory(factory))) {
1017
1018 Subtask<String> subtask = scope.fork(() -> { throw new FooException(); });
1019
1020 // before join
1021 assertThrows(IllegalStateException.class, subtask::get);
1022 assertThrows(IllegalStateException.class, subtask::exception);
1023
1024 scope.join();
1025
1026 // after join
1027 assertEquals(Subtask.State.FAILED, subtask.state());
1028 assertThrows(IllegalStateException.class, subtask::get);
1029 assertTrue(subtask.exception() instanceof FooException);
1030 }
1031 }
1032
1033 /**
1034 * Test Subtask with a task that has not completed.
1035 */
1036 @ParameterizedTest
1037 @MethodSource("factories")
1038 void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception {
1039 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
1040 cf -> cf.withThreadFactory(factory))) {
1041 Subtask<Void> subtask = scope.fork(() -> {
1042 Thread.sleep(Duration.ofDays(1));
1043 return null;
1044 });
1045
1046 // before join
1047 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1048 assertThrows(IllegalStateException.class, subtask::get);
1049 assertThrows(IllegalStateException.class, subtask::exception);
1050
1051 // attempt join, join throws
1052 Thread.currentThread().interrupt();
1053 assertThrows(InterruptedException.class, scope::join);
1054
1055 // after join
1056 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1057 assertThrows(IllegalStateException.class, subtask::get);
1058 assertThrows(IllegalStateException.class, subtask::exception);
1059 }
1060 }
1061
1062 /**
1063 * Test Subtask forked after execution cancelled.
1064 */
1065 @ParameterizedTest
1066 @MethodSource("factories")
1067 void testSubtaskWhenCancelled(ThreadFactory factory) throws Exception {
1068 try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) {
1069 scope.fork(() -> "foo");
1070 awaitCancelled(scope);
1071
1072 var subtask = scope.fork(() -> "foo");
1073
1074 // before join
1075 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1076 assertThrows(IllegalStateException.class, subtask::get);
1077 assertThrows(IllegalStateException.class, subtask::exception);
1078
1079 scope.join();
1080
1081 // after join
1082 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1083 assertThrows(IllegalStateException.class, subtask::get);
1084 assertThrows(IllegalStateException.class, subtask::exception);
1085 }
1086 }
1087
1088 /**
1089 * Test Subtask::toString.
1090 */
1091 @Test
1092 void testSubtaskToString() throws Exception {
1093 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1094 var latch = new CountDownLatch(1);
1095 var subtask1 = scope.fork(() -> {
1096 latch.await();
1097 return "foo";
1098 });
1099 var subtask2 = scope.fork(() -> { throw new FooException(); });
1100
1101 // subtask1 result is unavailable
1102 assertTrue(subtask1.toString().contains("Unavailable"));
1103 latch.countDown();
1104
1105 scope.join();
1106
1107 assertTrue(subtask1.toString().contains("Completed successfully"));
1108 assertTrue(subtask2.toString().contains("Failed"));
1109 }
1110 }
1111
1112 /**
1113 * Test Joiner.allSuccessfulOrThrow() with no subtasks.
1114 */
1115 @Test
1116 void testAllSuccessfulOrThrow1() throws Throwable {
1117 try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow())) {
1118 var subtasks = scope.join().toList();
1119 assertTrue(subtasks.isEmpty());
1120 }
1121 }
1122
1123 /**
1124 * Test Joiner.allSuccessfulOrThrow() with subtasks that complete successfully.
1125 */
1126 @ParameterizedTest
1127 @MethodSource("factories")
1128 void testAllSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
1129 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1130 cf -> cf.withThreadFactory(factory))) {
1131 var subtask1 = scope.fork(() -> "foo");
1132 var subtask2 = scope.fork(() -> "bar");
1133 var subtasks = scope.join().toList();
1134 assertEquals(List.of(subtask1, subtask2), subtasks);
1135 assertEquals("foo", subtask1.get());
1136 assertEquals("bar", subtask2.get());
1137 }
1138 }
1139
1140 /**
1141 * Test Joiner.allSuccessfulOrThrow() with a subtask that complete successfully and
1142 * a subtask that fails.
1143 */
1144 @ParameterizedTest
1145 @MethodSource("factories")
1146 void testAllSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1147 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1148 cf -> cf.withThreadFactory(factory))) {
1149 scope.fork(() -> "foo");
1150 scope.fork(() -> { throw new FooException(); });
1151 try {
1152 scope.join();
1153 } catch (FailedException e) {
1154 assertTrue(e.getCause() instanceof FooException);
1155 }
1156 }
1157 }
1158
1159 /**
1160 * Test Joiner.anySuccessfulResultOrThrow() with no subtasks.
1161 */
1162 @Test
1163 void testAnySuccessfulResultOrThrow1() throws Exception {
1164 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
1165 try {
1166 scope.join();
1167 } catch (FailedException e) {
1168 assertTrue(e.getCause() instanceof NoSuchElementException);
1169 }
1170 }
1171 }
1172
1173 /**
1174 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully.
1175 */
1176 @ParameterizedTest
1177 @MethodSource("factories")
1178 void testAnySuccessfulResultOrThrow2(ThreadFactory factory) throws Exception {
1179 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
1180 cf -> cf.withThreadFactory(factory))) {
1181 scope.fork(() -> "foo");
1182 String result = scope.join();
1183 assertEquals("foo", result);
1184 }
1185 }
1186
1187 /**
1188 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully
1189 * with a null result.
1190 */
1191 @ParameterizedTest
1192 @MethodSource("factories")
1193 void testAnySuccessfulResultOrThrow3(ThreadFactory factory) throws Exception {
1194 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
1195 cf -> cf.withThreadFactory(factory))) {
1196 scope.fork(() -> null);
1197 String result = scope.join();
1198 assertNull(result);
1199 }
1200 }
1201
1202 /**
1203 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that complete succcessfully
1204 * and a subtask that fails.
1205 */
1206 @ParameterizedTest
1207 @MethodSource("factories")
1208 void testAnySuccessfulResultOrThrow4(ThreadFactory factory) throws Exception {
1209 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
1210 cf -> cf.withThreadFactory(factory))) {
1211 scope.fork(() -> "foo");
1212 scope.fork(() -> { throw new FooException(); });
1213 String first = scope.join();
1214 assertEquals("foo", first);
1215 }
1216 }
1217
1218 /**
1219 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that fails.
1220 */
1221 @ParameterizedTest
1222 @MethodSource("factories")
1223 void testAnySuccessfulResultOrThrow5(ThreadFactory factory) throws Exception {
1224 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
1225 cf -> cf.withThreadFactory(factory))) {
1226 scope.fork(() -> { throw new FooException(); });
1227 Throwable ex = assertThrows(FailedException.class, scope::join);
1228 assertTrue(ex.getCause() instanceof FooException);
1229 }
1230 }
1231
1232 /**
1233 * Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks.
1234 */
1235 @Test
1236 void testAwaitSuccessfulOrThrow1() throws Throwable {
1237 try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {
1238 var result = scope.join();
1239 assertNull(result);
1240 }
1241 }
1242
1243 /**
1244 * Test Joiner.awaitAllSuccessfulOrThrow() with subtasks that complete successfully.
1245 */
1246 @ParameterizedTest
1247 @MethodSource("factories")
1248 void testAwaitSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
1249 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1250 cf -> cf.withThreadFactory(factory))) {
1251 var subtask1 = scope.fork(() -> "foo");
1259
1260 /**
1261 * Test Joiner.awaitAllSuccessfulOrThrow() with a subtask that complete successfully and
1262 * a subtask that fails.
1263 */
1264 @ParameterizedTest
1265 @MethodSource("factories")
1266 void testAwaitSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1267 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1268 cf -> cf.withThreadFactory(factory))) {
1269 scope.fork(() -> "foo");
1270 scope.fork(() -> { throw new FooException(); });
1271 try {
1272 scope.join();
1273 } catch (FailedException e) {
1274 assertTrue(e.getCause() instanceof FooException);
1275 }
1276 }
1277 }
1278
1279 /**
1280 * Test Joiner.awaitAll() with no subtasks.
1281 */
1282 @Test
1283 void testAwaitAll1() throws Throwable {
1284 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1285 var result = scope.join();
1286 assertNull(result);
1287 }
1288 }
1289
1290 /**
1291 * Test Joiner.awaitAll() with subtasks that complete successfully.
1292 */
1293 @ParameterizedTest
1294 @MethodSource("factories")
1295 void testAwaitAll2(ThreadFactory factory) throws Throwable {
1296 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1297 cf -> cf.withThreadFactory(factory))) {
1298 var subtask1 = scope.fork(() -> "foo");
1305 }
1306
1307 /**
1308 * Test Joiner.awaitAll() with a subtask that complete successfully and a subtask
1309 * that fails.
1310 */
1311 @ParameterizedTest
1312 @MethodSource("factories")
1313 void testAwaitAll3(ThreadFactory factory) throws Throwable {
1314 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1315 cf -> cf.withThreadFactory(factory))) {
1316 var subtask1 = scope.fork(() -> "foo");
1317 var subtask2 = scope.fork(() -> { throw new FooException(); });
1318 var result = scope.join();
1319 assertNull(result);
1320 assertEquals("foo", subtask1.get());
1321 assertTrue(subtask2.exception() instanceof FooException);
1322 }
1323 }
1324
1325 /**
1326 * Test Joiner.allUntil(Predicate) with no subtasks.
1327 */
1328 @Test
1329 void testAllUntil1() throws Throwable {
1330 try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) {
1331 var subtasks = scope.join();
1332 assertEquals(0, subtasks.count());
1333 }
1334 }
1335
1336 /**
1337 * Test Joiner.allUntil(Predicate) with no cancellation.
1338 */
1339 @ParameterizedTest
1340 @MethodSource("factories")
1341 void testAllUntil2(ThreadFactory factory) throws Exception {
1342 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false),
1343 cf -> cf.withThreadFactory(factory))) {
1344
1345 var subtask1 = scope.fork(() -> "foo");
1346 var subtask2 = scope.fork(() -> { throw new FooException(); });
1347
1348 var subtasks = scope.join().toList();
1349 assertEquals(2, subtasks.size());
1350
1351 assertSame(subtask1, subtasks.get(0));
1352 assertSame(subtask2, subtasks.get(1));
1353 assertEquals("foo", subtask1.get());
1354 assertTrue(subtask2.exception() instanceof FooException);
1355 }
1356 }
1357
1358 /**
1359 * Test Joiner.allUntil(Predicate) with cancellation after one subtask completes.
1360 */
1361 @ParameterizedTest
1362 @MethodSource("factories")
1363 void testAllUntil3(ThreadFactory factory) throws Exception {
1364 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> true),
1365 cf -> cf.withThreadFactory(factory))) {
1366
1367 var subtask1 = scope.fork(() -> "foo");
1368 var subtask2 = scope.fork(() -> {
1369 Thread.sleep(Duration.ofDays(1));
1370 return "bar";
1371 });
1372
1373 var subtasks = scope.join().toList();
1374
1375 assertEquals(2, subtasks.size());
1376 assertSame(subtask1, subtasks.get(0));
1377 assertSame(subtask2, subtasks.get(1));
1378 assertEquals("foo", subtask1.get());
1379 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1380 }
1381 }
1382
1383 /**
1384 * Test Joiner.allUntil(Predicate) with cancellation after serveral subtasks complete.
1385 */
1386 @ParameterizedTest
1387 @MethodSource("factories")
1388 void testAllUntil4(ThreadFactory factory) throws Exception {
1389
1390 // cancel execution after two or more failures
1391 class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> {
1392 final AtomicInteger failedCount = new AtomicInteger();
1393 @Override
1394 public boolean test(Subtask<? extends T> subtask) {
1395 return subtask.state() == Subtask.State.FAILED
1396 && failedCount.incrementAndGet() >= 2;
1397 }
1398 }
1399 var joiner = Joiner.allUntil(new CancelAfterTwoFailures<String>());
1400
1401 try (var scope = StructuredTaskScope.open(joiner)) {
1402 int forkCount = 0;
1403
1404 // fork subtasks until execution cancelled
1405 while (!scope.isCancelled()) {
1406 scope.fork(() -> "foo");
1407 scope.fork(() -> { throw new FooException(); });
1408 forkCount += 2;
1409 Thread.sleep(Duration.ofMillis(20));
1410 }
1411
1412 var subtasks = scope.join().toList();
1413 assertEquals(forkCount, subtasks.size());
1414
1415 long failedCount = subtasks.stream()
1416 .filter(s -> s.state() == Subtask.State.FAILED)
1417 .count();
1418 assertTrue(failedCount >= 2);
1419 }
1420 }
1421
1422 /**
1423 * Test Test Joiner.allUntil(Predicate) where the Predicate's test method throws.
1424 */
1425 @Test
1426 void testAllUntil5() throws Exception {
1427 var joiner = Joiner.allUntil(_ -> { throw new FooException(); });
1428 var excRef = new AtomicReference<Throwable>();
1429 Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e);
1430 ThreadFactory factory = Thread.ofVirtual()
1431 .uncaughtExceptionHandler(uhe)
1432 .factory();
1433 try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
1434 scope.fork(() -> "foo");
1435 scope.join();
1436 assertInstanceOf(FooException.class, excRef.get());
1437 }
1438 }
1439
1440 /**
1441 * Test Joiner default methods.
1442 */
1443 @Test
1444 void testJoinerDefaultMethods() throws Exception {
1445 try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) {
1446
1447 // need subtasks to test default methods
1448 var subtask1 = scope.fork(() -> "foo");
1449 awaitCancelled(scope);
1450 var subtask2 = scope.fork(() -> "bar");
1451 scope.join();
1452
1453 assertEquals(Subtask.State.SUCCESS, subtask1.state());
1454 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1455
1456 // Joiner that does not override default methods
1457 Joiner<Object, Void> joiner = () -> null;
1458 assertThrows(NullPointerException.class, () -> joiner.onFork(null));
1459 assertThrows(NullPointerException.class, () -> joiner.onComplete(null));
1460 assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1));
1461 assertFalse(joiner.onFork(subtask2));
1462 assertFalse(joiner.onComplete(subtask1));
1463 assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2));
1464 }
1465 }
1466
1467 /**
1468 * Test Joiners onFork/onComplete methods with a subtask in an unexpected state.
1469 */
1470 @Test
1471 void testJoinersWithUnavailableResult() throws Exception {
1472 try (var scope = StructuredTaskScope.open()) {
1473 var done = new CountDownLatch(1);
1474 var subtask = scope.fork(() -> {
1475 done.await();
1476 return null;
1477 });
1478
1479 // onComplete with uncompleted task should throw IAE
1480 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1481 assertThrows(IllegalArgumentException.class,
1482 () -> Joiner.allSuccessfulOrThrow().onComplete(subtask));
1483 assertThrows(IllegalArgumentException.class,
1484 () -> Joiner.anySuccessfulResultOrThrow().onComplete(subtask));
1485 assertThrows(IllegalArgumentException.class,
1486 () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(subtask));
1487 assertThrows(IllegalArgumentException.class,
1488 () -> Joiner.awaitAll().onComplete(subtask));
1489 assertThrows(IllegalArgumentException.class,
1490 () -> Joiner.allUntil(_ -> false).onComplete(subtask));
1491
1492 done.countDown();
1493 scope.join();
1494
1495 // onFork with completed task should throw IAE
1496 assertEquals(Subtask.State.SUCCESS, subtask.state());
1497 assertThrows(IllegalArgumentException.class,
1498 () -> Joiner.allSuccessfulOrThrow().onFork(subtask));
1499 assertThrows(IllegalArgumentException.class,
1500 () -> Joiner.anySuccessfulResultOrThrow().onFork(subtask));
1501 assertThrows(IllegalArgumentException.class,
1502 () -> Joiner.awaitAllSuccessfulOrThrow().onFork(subtask));
1503 assertThrows(IllegalArgumentException.class,
1504 () -> Joiner.awaitAll().onFork(subtask));
1505 assertThrows(IllegalArgumentException.class,
1506 () -> Joiner.allUntil(_ -> false).onFork(subtask));
1507 }
1508
1509 }
1510
1511 /**
1512 * Test the Configuration function apply method throwing an exception.
1513 */
1514 @Test
1515 void testConfigFunctionThrows() throws Exception {
1516 assertThrows(FooException.class,
1517 () -> StructuredTaskScope.open(Joiner.awaitAll(),
1518 cf -> { throw new FooException(); }));
1519 }
1520
1521 /**
1522 * Test Configuration equals/hashCode/toString
1523 */
1524 @Test
1525 void testConfigMethods() throws Exception {
1526 Function<Configuration, Configuration> testConfig = cf -> {
1527 var name = "duke";
1528 var threadFactory = Thread.ofPlatform().factory();
1529 var timeout = Duration.ofSeconds(10);
1530
1531 assertEquals(cf, cf);
1532 assertEquals(cf.withName(name), cf.withName(name));
1533 assertEquals(cf.withThreadFactory(threadFactory), cf.withThreadFactory(threadFactory));
1534 assertEquals(cf.withTimeout(timeout), cf.withTimeout(timeout));
1535
1536 assertNotEquals(cf, cf.withName(name));
1537 assertNotEquals(cf, cf.withThreadFactory(threadFactory));
1538 assertNotEquals(cf, cf.withTimeout(timeout));
1539
1540 assertEquals(cf.withName(name).hashCode(), cf.withName(name).hashCode());
1541 assertEquals(cf.withThreadFactory(threadFactory).hashCode(),
1542 cf.withThreadFactory(threadFactory).hashCode());
1543 assertEquals(cf.withTimeout(timeout).hashCode(), cf.withTimeout(timeout).hashCode());
1544
1545 assertTrue(cf.withName(name).toString().contains(name));
1546 assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
1547 assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
1548
1549 return cf;
1550 };
1551 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), testConfig)) {
1552 // do nothing
1553 }
1554 }
1555
1556 /**
1557 * Test for NullPointerException.
1558 */
1559 @Test
1560 void testNulls() throws Exception {
1561 assertThrows(NullPointerException.class,
1562 () -> StructuredTaskScope.open(null));
1563 assertThrows(NullPointerException.class,
1564 () -> StructuredTaskScope.open(null, cf -> cf));
1565 assertThrows(NullPointerException.class,
1566 () -> StructuredTaskScope.open(Joiner.awaitAll(), null));
1567
1568 assertThrows(NullPointerException.class, () -> Joiner.allUntil(null));
1569
1570 // fork
1571 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1580 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withName(null)));
1581 assertThrows(NullPointerException.class,
1582 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withThreadFactory(null)));
1583 assertThrows(NullPointerException.class,
1584 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withTimeout(null)));
1585
1586 // Joiner.onFork/onComplete
1587 assertThrows(NullPointerException.class,
1588 () -> Joiner.awaitAllSuccessfulOrThrow().onFork(null));
1589 assertThrows(NullPointerException.class,
1590 () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(null));
1591 assertThrows(NullPointerException.class,
1592 () -> Joiner.awaitAll().onFork(null));
1593 assertThrows(NullPointerException.class,
1594 () -> Joiner.awaitAll().onComplete(null));
1595 assertThrows(NullPointerException.class,
1596 () -> Joiner.allSuccessfulOrThrow().onFork(null));
1597 assertThrows(NullPointerException.class,
1598 () -> Joiner.allSuccessfulOrThrow().onComplete(null));
1599 assertThrows(NullPointerException.class,
1600 () -> Joiner.anySuccessfulResultOrThrow().onFork(null));
1601 assertThrows(NullPointerException.class,
1602 () -> Joiner.anySuccessfulResultOrThrow().onComplete(null));
1603 }
1604
1605 /**
1606 * ThreadFactory that counts usage.
1607 */
1608 private static class CountingThreadFactory implements ThreadFactory {
1609 final ThreadFactory delegate;
1610 final AtomicInteger threadCount = new AtomicInteger();
1611 CountingThreadFactory(ThreadFactory delegate) {
1612 this.delegate = delegate;
1613 }
1614 @Override
1615 public Thread newThread(Runnable task) {
1616 threadCount.incrementAndGet();
1617 return delegate.newThread(task);
1618 }
1619 int threadCount() {
1620 return threadCount.get();
1621 }
1622 }
1623
1624 /**
1625 * A joiner that counts that counts the number of subtasks that are forked and the
1626 * number of subtasks that complete.
1627 */
1628 private static class CountingJoiner<T> implements Joiner<T, Void> {
1629 final AtomicInteger onForkCount = new AtomicInteger();
1630 final AtomicInteger onCompleteCount = new AtomicInteger();
1631 @Override
1632 public boolean onFork(Subtask<? extends T> subtask) {
1633 onForkCount.incrementAndGet();
1634 return false;
1635 }
1636 @Override
1637 public boolean onComplete(Subtask<? extends T> subtask) {
1638 onCompleteCount.incrementAndGet();
1639 return false;
1640 }
1641 @Override
1642 public Void result() {
1643 return null;
1644 }
1645 int onForkCount() {
1646 return onForkCount.get();
1647 }
1648 int onCompleteCount() {
1649 return onCompleteCount.get();
1650 }
1651 }
1652
1653 /**
1654 * A joiner that cancels execution when a subtask completes. It also keeps a count
1655 * of the number of subtasks that are forked and the number of subtasks that complete.
1656 */
1657 private static class CancelAfterOneJoiner<T> implements Joiner<T, Void> {
1658 final AtomicInteger onForkCount = new AtomicInteger();
1659 final AtomicInteger onCompleteCount = new AtomicInteger();
1660 @Override
1661 public boolean onFork(Subtask<? extends T> subtask) {
1662 onForkCount.incrementAndGet();
1663 return false;
1664 }
1665 @Override
1666 public boolean onComplete(Subtask<? extends T> subtask) {
1667 onCompleteCount.incrementAndGet();
1668 return true;
1669 }
1670 @Override
1671 public Void result() {
1672 return null;
1673 }
1674 int onForkCount() {
1675 return onForkCount.get();
1676 }
1677 int onCompleteCount() {
1678 return onCompleteCount.get();
1679 }
1680 }
1681
1682 /**
1683 * A runtime exception for tests.
1684 */
1685 private static class FooException extends RuntimeException {
1686 FooException() { }
1738 found = true;
1739 } else {
1740 Thread.sleep(20);
1741 }
1742 }
1743 target.interrupt();
1744 }
1745
1746 /**
1747 * Schedules the current thread to be interrupted when it waits (timed or untimed)
1748 * at the given location.
1749 */
1750 private void scheduleInterruptAt(String location) {
1751 Thread target = Thread.currentThread();
1752 scheduler.submit(() -> {
1753 interruptThreadAt(target, location);
1754 return null;
1755 });
1756 }
1757
1758 /**
1759 * Returns true if the given stack trace contains an element for the given class
1760 * and method name.
1761 */
1762 private boolean contains(StackTraceElement[] stack, String className, String methodName) {
1763 return Arrays.stream(stack)
1764 .anyMatch(e -> className.equals(e.getClassName())
1765 && methodName.equals(e.getMethodName()));
1766 }
1767 }
|
44 import java.util.concurrent.Callable;
45 import java.util.concurrent.ConcurrentHashMap;
46 import java.util.concurrent.CountDownLatch;
47 import java.util.concurrent.Executors;
48 import java.util.concurrent.Future;
49 import java.util.concurrent.LinkedTransferQueue;
50 import java.util.concurrent.ThreadFactory;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.RejectedExecutionException;
53 import java.util.concurrent.ScheduledExecutorService;
54 import java.util.concurrent.StructuredTaskScope;
55 import java.util.concurrent.StructuredTaskScope.TimeoutException;
56 import java.util.concurrent.StructuredTaskScope.Configuration;
57 import java.util.concurrent.StructuredTaskScope.FailedException;
58 import java.util.concurrent.StructuredTaskScope.Joiner;
59 import java.util.concurrent.StructuredTaskScope.Subtask;
60 import java.util.concurrent.StructureViolationException;
61 import java.util.concurrent.atomic.AtomicBoolean;
62 import java.util.concurrent.atomic.AtomicInteger;
63 import java.util.concurrent.atomic.AtomicReference;
64 import java.util.function.Predicate;
65 import java.util.function.UnaryOperator;
66 import java.util.stream.Stream;
67 import static java.lang.Thread.State.*;
68
69 import org.junit.jupiter.api.Test;
70 import org.junit.jupiter.api.BeforeAll;
71 import org.junit.jupiter.api.AfterAll;
72 import org.junit.jupiter.params.ParameterizedTest;
73 import org.junit.jupiter.params.provider.MethodSource;
74 import static org.junit.jupiter.api.Assertions.*;
75
76 class StructuredTaskScopeTest {
77 private static ScheduledExecutorService scheduler;
78 private static List<ThreadFactory> threadFactories;
79
80 @BeforeAll
81 static void setup() throws Exception {
82 scheduler = Executors.newSingleThreadScheduledExecutor();
83
84 // thread factories
85 String value = System.getProperty("threadFactory");
192 future.get();
193 }
194
195 // subtask cannot fork
196 Subtask<Boolean> subtask = scope.fork(() -> {
197 assertThrows(WrongThreadException.class, () -> {
198 scope.fork(() -> null);
199 });
200 return true;
201 });
202 scope.join();
203 assertTrue(subtask.get());
204 }
205 }
206
207 /**
208 * Test fork after join, no subtasks forked before join.
209 */
210 @ParameterizedTest
211 @MethodSource("factories")
212 void testForkAfterJoinCompleted1(ThreadFactory factory) throws Exception {
213 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
214 cf -> cf.withThreadFactory(factory))) {
215 scope.join();
216 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
217 }
218 }
219
220 /**
221 * Test fork after join, subtasks forked before join.
222 */
223 @ParameterizedTest
224 @MethodSource("factories")
225 void testForkAfterJoinCompleted2(ThreadFactory factory) throws Exception {
226 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
227 cf -> cf.withThreadFactory(factory))) {
228 scope.fork(() -> "foo");
229 scope.join();
230 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
231 }
232 }
233
234 /**
235 * Test fork after join interrupted.
236 */
237 @ParameterizedTest
238 @MethodSource("factories")
239 void testForkAfterJoinInterrupted(ThreadFactory factory) throws Exception {
240 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
241 cf -> cf.withThreadFactory(factory))) {
242 var subtask1 = scope.fork(() -> {
243 Thread.sleep(Duration.ofDays(1));
244 return "foo";
245 });
246
247 // join throws
248 Thread.currentThread().interrupt();
249 assertThrows(InterruptedException.class, scope::join);
250
251 // fork should throw
252 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
253 }
254 }
255
256 /**
257 * Test fork after join timeout.
258 */
259 @ParameterizedTest
260 @MethodSource("factories")
261 void testForkAfterJoinTimeout(ThreadFactory factory) throws Exception {
262 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
263 cf -> cf.withThreadFactory(factory)
264 .withTimeout(Duration.ofMillis(100)))) {
265 awaitCancelled(scope);
266
267 // join throws
268 assertThrows(TimeoutException.class, scope::join);
269
270 // fork should throw
271 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
272 }
273 }
274
275 /**
276 * Test fork after task scope is cancelled. This test uses a custom Joiner to
277 * cancel execution.
278 */
279 @ParameterizedTest
280 @MethodSource("factories")
281 void testForkAfterCancel2(ThreadFactory factory) throws Exception {
282 var countingThreadFactory = new CountingThreadFactory(factory);
283 var testJoiner = new CancelAfterOneJoiner<String>();
284
285 try (var scope = StructuredTaskScope.open(testJoiner,
286 cf -> cf.withThreadFactory(countingThreadFactory))) {
287
288 // fork subtask, the scope should be cancelled when the subtask completes
289 var subtask1 = scope.fork(() -> "foo");
290 awaitCancelled(scope);
291
292 assertEquals(1, countingThreadFactory.threadCount());
293 assertEquals(1, testJoiner.onForkCount());
294 assertEquals(1, testJoiner.onCompleteCount());
297 var subtask2 = scope.fork(() -> "bar");
298
299 // onFork should be invoked, newThread and onComplete should not be invoked
300 assertEquals(1, countingThreadFactory.threadCount());
301 assertEquals(2, testJoiner.onForkCount());
302 assertEquals(1, testJoiner.onCompleteCount());
303
304 scope.join();
305
306 assertEquals(1, countingThreadFactory.threadCount());
307 assertEquals(2, testJoiner.onForkCount());
308 assertEquals(1, testJoiner.onCompleteCount());
309 assertEquals("foo", subtask1.get());
310 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
311 }
312 }
313
314 /**
315 * Test fork after task scope is closed.
316 */
317 @ParameterizedTest
318 @MethodSource("factories")
319 void testForkAfterClose(ThreadFactory factory) {
320 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
321 cf -> cf.withThreadFactory(factory))) {
322 scope.close();
323 assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
324 }
325 }
326
327 /**
328 * Test fork with a ThreadFactory that rejects creating a thread.
329 */
330 @Test
331 void testForkRejectedExecutionException() {
332 ThreadFactory factory = task -> null;
333 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
334 cf -> cf.withThreadFactory(factory))) {
335 assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null));
336 }
337 }
338
339 /**
340 * Test join with no subtasks.
341 */
369 @Test
370 void testJoinAfterJoin1() throws Exception {
371 var results = new LinkedTransferQueue<>(List.of("foo", "bar", "baz"));
372 Joiner<Object, String> joiner = results::take;
373 try (var scope = StructuredTaskScope.open(joiner)) {
374 scope.fork(() -> "foo");
375 assertEquals("foo", scope.join());
376
377 // join already called
378 for (int i = 0 ; i < 3; i++) {
379 assertThrows(IllegalStateException.class, scope::join);
380 }
381 }
382 }
383
384 /**
385 * Test join after join completed with an exception.
386 */
387 @Test
388 void testJoinAfterJoin2() throws Exception {
389 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow())) {
390 scope.fork(() -> { throw new FooException(); });
391 Throwable ex = assertThrows(FailedException.class, scope::join);
392 assertTrue(ex.getCause() instanceof FooException);
393
394 // join already called
395 for (int i = 0 ; i < 3; i++) {
396 assertThrows(IllegalStateException.class, scope::join);
397 }
398 }
399 }
400
401 /**
402 * Test join after join interrupted.
403 */
404 @Test
405 void testJoinAfterJoinInterrupted() throws Exception {
406 try (var scope = StructuredTaskScope.open()) {
407 var latch = new CountDownLatch(1);
408 var subtask = scope.fork(() -> {
409 latch.await();
410 return "foo";
411 });
412
413 // join throws InterruptedException
414 Thread.currentThread().interrupt();
415 assertThrows(InterruptedException.class, scope::join);
416
417 latch.countDown();
418
419 // retry join to get result
420 scope.join();
421 assertEquals("foo", subtask.get());
422
423 // retry after otbaining result
424 assertThrows(IllegalStateException.class, scope::join);
425 }
426 }
427
428 /**
429 * Test join after join completed with a timeout.
430 */
431 @Test
432 void testJoinAfterJoinTimeout() throws Exception {
433 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow(),
434 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
435 // wait for scope to be cancelled by timeout
436 awaitCancelled(scope);
437 assertThrows(TimeoutException.class, scope::join);
438
439 // join already called
440 for (int i = 0 ; i < 3; i++) {
441 assertThrows(IllegalStateException.class, scope::join);
442 }
443 }
444 }
445
446 /**
447 * Test join invoked from Joiner.onTimeout.
448 */
449 @Test
450 void testJoinInOnTimeout() throws Exception {
451 Thread owner = Thread.currentThread();
452 var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>();
453
454 var joiner = new Joiner<String, Void>() {
455 @Override
456 public void onTimeout() {
457 assertTrue(Thread.currentThread() == owner);
458 var scope = scopeRef.get();
459 assertThrows(IllegalStateException.class, scope::join);
460 }
461 @Override
462 public Void result() {
463 return null;
464 }
465 };
466
467 try (var scope = StructuredTaskScope.open(joiner,
468 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
469 awaitCancelled(scope);
470 scopeRef.set(scope);
471 scope.join(); // invokes onTimeout
472 }
473 }
474
475 /**
476 * Test join method is owner confined.
477 */
478 @ParameterizedTest
479 @MethodSource("factories")
480 void testJoinConfined(ThreadFactory factory) throws Exception {
481 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
482 cf -> cf.withThreadFactory(factory))) {
483
484 // random thread cannot join
485 try (var pool = Executors.newSingleThreadExecutor()) {
486 Future<Void> future = pool.submit(() -> {
487 assertThrows(WrongThreadException.class, scope::join);
488 return null;
489 });
490 future.get();
491 }
492
493 // subtask cannot join
494 Subtask<Boolean> subtask = scope.fork(() -> {
495 assertThrows(WrongThreadException.class, () -> { scope.join(); });
496 return true;
497 });
498 scope.join();
499 assertTrue(subtask.get());
500 }
501 }
502
503 /**
504 * Test join with interrupt status set.
505 */
506 @ParameterizedTest
507 @MethodSource("factories")
508 void testInterruptJoin1(ThreadFactory factory) throws Exception {
509 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
510 cf -> cf.withThreadFactory(factory))) {
511
512 Subtask<String> subtask = scope.fork(() -> {
513 Thread.sleep(Duration.ofDays(1));
514 return "foo";
515 });
516
517 // join should throw
518 Thread.currentThread().interrupt();
519 try {
520 scope.join();
521 fail("join did not throw");
522 } catch (InterruptedException expected) {
523 assertFalse(Thread.interrupted()); // interrupt status should be cleared
524 }
525 }
526 }
527
528 /**
529 * Test interrupt of thread blocked in join.
530 */
531 @ParameterizedTest
532 @MethodSource("factories")
533 void testInterruptJoin2(ThreadFactory factory) throws Exception {
534 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
535 cf -> cf.withThreadFactory(factory))) {
536 Subtask<String> subtask = scope.fork(() -> {
537 Thread.sleep(Duration.ofDays(1));
538 return "foo";
539 });
540
541 // interrupt main thread when it blocks in join
542 scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join");
543 try {
544 scope.join();
545 fail("join did not throw");
546 } catch (InterruptedException expected) {
547 assertFalse(Thread.interrupted()); // interrupt status should be clear
548 }
549 }
550 }
551
552 /**
553 * Test join when scope is cancelled.
554 */
555 @ParameterizedTest
556 @MethodSource("factories")
557 void testJoinWhenCancelled(ThreadFactory factory) throws Exception {
936
937 /**
938 * Test that isCancelled returns true after close.
939 */
940 @Test
941 void testIsCancelledAfterClose() throws Exception {
942 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
943 assertFalse(scope.isCancelled());
944 scope.close();
945 assertTrue(scope.isCancelled());
946 }
947 }
948
949 /**
950 * Test Joiner.onFork throwing exception.
951 */
952 @Test
953 void testOnForkThrows() throws Exception {
954 var joiner = new Joiner<String, Void>() {
955 @Override
956 public boolean onFork(Subtask<String> subtask) {
957 throw new FooException();
958 }
959 @Override
960 public Void result() {
961 return null;
962 }
963 };
964 try (var scope = StructuredTaskScope.open(joiner)) {
965 assertThrows(FooException.class, () -> scope.fork(() -> "foo"));
966 }
967 }
968
969 /**
970 * Test Joiner.onFork returning true to cancel execution.
971 */
972 @Test
973 void testOnForkCancelsExecution() throws Exception {
974 var joiner = new Joiner<String, Void>() {
975 @Override
976 public boolean onFork(Subtask<String> subtask) {
977 return true;
978 }
979 @Override
980 public Void result() {
981 return null;
982 }
983 };
984 try (var scope = StructuredTaskScope.open(joiner)) {
985 assertFalse(scope.isCancelled());
986 scope.fork(() -> "foo");
987 assertTrue(scope.isCancelled());
988 scope.join();
989 }
990 }
991
992 /**
993 * Test Joiner.onComplete throwing exception causes UHE to be invoked.
994 */
995 @Test
996 void testOnCompleteThrows() throws Exception {
997 var joiner = new Joiner<String, Void>() {
998 @Override
999 public boolean onComplete(Subtask<String> subtask) {
1000 throw new FooException();
1001 }
1002 @Override
1003 public Void result() {
1004 return null;
1005 }
1006 };
1007 var excRef = new AtomicReference<Throwable>();
1008 Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e);
1009 ThreadFactory factory = Thread.ofVirtual()
1010 .uncaughtExceptionHandler(uhe)
1011 .factory();
1012 try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
1013 scope.fork(() -> "foo");
1014 scope.join();
1015 assertInstanceOf(FooException.class, excRef.get());
1016 }
1017 }
1018
1019 /**
1020 * Test Joiner.onComplete returning true to cancel execution.
1021 */
1022 @Test
1023 void testOnCompleteCancelsExecution() throws Exception {
1024 var joiner = new Joiner<String, Void>() {
1025 @Override
1026 public boolean onComplete(Subtask<String> subtask) {
1027 return true;
1028 }
1029 @Override
1030 public Void result() {
1031 return null;
1032 }
1033 };
1034 try (var scope = StructuredTaskScope.open(joiner)) {
1035 assertFalse(scope.isCancelled());
1036 scope.fork(() -> "foo");
1037 awaitCancelled(scope);
1038 scope.join();
1039 }
1040 }
1041
1042 /**
1043 * Test Joiner.onTimeout invoked by owner thread when timeout expires.
1044 */
1045 @Test
1046 void testOnTimeoutInvoked() throws Exception {
1047 var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>();
1048 Thread owner = Thread.currentThread();
1049 var invokeCount = new AtomicInteger();
1050 var joiner = new Joiner<String, Void>() {
1051 @Override
1052 public void onTimeout() {
1053 assertTrue(Thread.currentThread() == owner);
1054 assertTrue(scopeRef.get().isCancelled());
1055 invokeCount.incrementAndGet();
1056 }
1057 @Override
1058 public Void result() {
1059 return null;
1060 }
1061 };
1062 try (var scope = StructuredTaskScope.open(joiner,
1063 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1064 scopeRef.set(scope);
1065 scope.fork(() -> {
1066 Thread.sleep(Duration.ofDays(1));
1067 return null;
1068 });
1069 scope.join();
1070 assertEquals(1, invokeCount.get());
1071 }
1072 }
1073
1074 /**
1075 * Test Joiner.onTimeout throwing an excepiton.
1076 */
1077 @Test
1078 void testOnTimeoutThrows() throws Exception {
1079 var joiner = new Joiner<String, Void>() {
1080 @Override
1081 public void onTimeout() {
1082 throw new FooException();
1083 }
1084 @Override
1085 public Void result() {
1086 return null;
1087 }
1088 };
1089 try (var scope = StructuredTaskScope.open(joiner,
1090 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1091 // wait for scope to be cancelled by timeout
1092 awaitCancelled(scope);
1093
1094 // join should throw FooException on first usage
1095 assertThrows(FooException.class, scope::join);
1096
1097 // retry after onTimeout fails
1098 assertThrows(IllegalStateException.class, scope::join);
1099 }
1100 }
1101
1102 /**
1103 * Test toString.
1104 */
1105 @Test
1106 void testToString() throws Exception {
1107 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
1108 cf -> cf.withName("duke"))) {
1109
1110 // open
1111 assertTrue(scope.toString().contains("duke"));
1112
1113 // closed
1114 scope.close();
1115 assertTrue(scope.toString().contains("duke"));
1116 }
1117 }
1118
1119 /**
1120 * Test Subtask with task that completes successfully.
1121 */
1122 @ParameterizedTest
1123 @MethodSource("factories")
1124 void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception {
1125 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1126 cf -> cf.withThreadFactory(factory))) {
1127 Subtask<String> subtask = scope.fork(() -> "foo");
1128
1129 // before join, owner thread
1130 assertThrows(IllegalStateException.class, subtask::get);
1131 assertThrows(IllegalStateException.class, subtask::exception);
1132
1133 // before join, another thread
1134 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
1135 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
1136
1137 scope.join();
1138
1139 assertEquals(Subtask.State.SUCCESS, subtask.state());
1140
1141 // after join, owner thread
1142 assertEquals("foo", subtask.get());
1143 assertThrows(IllegalStateException.class, subtask::exception);
1144
1145 // after join, another thread
1146 assertEquals("foo", callInOtherThread(subtask::get));
1147 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
1148 }
1149 }
1150
1151 /**
1152 * Test Subtask with task that fails.
1153 */
1154 @ParameterizedTest
1155 @MethodSource("factories")
1156 void testSubtaskWhenFailed(ThreadFactory factory) throws Exception {
1157 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1158 cf -> cf.withThreadFactory(factory))) {
1159
1160 Subtask<String> subtask = scope.fork(() -> { throw new FooException(); });
1161
1162 // before join, owner thread
1163 assertThrows(IllegalStateException.class, subtask::get);
1164 assertThrows(IllegalStateException.class, subtask::exception);
1165
1166 // before join, another thread
1167 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
1168 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
1169
1170 scope.join();
1171
1172 assertEquals(Subtask.State.FAILED, subtask.state());
1173
1174 // after join, owner thread
1175 assertThrows(IllegalStateException.class, subtask::get);
1176 assertTrue(subtask.exception() instanceof FooException);
1177
1178 // after join, another thread
1179 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
1180 assertTrue(callInOtherThread(subtask::exception) instanceof FooException);
1181 }
1182 }
1183
1184 /**
1185 * Test Subtask with a task that has not completed.
1186 */
1187 @ParameterizedTest
1188 @MethodSource("factories")
1189 void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception {
1190 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
1191 cf -> cf.withThreadFactory(factory))) {
1192 Subtask<Void> subtask = scope.fork(() -> {
1193 Thread.sleep(Duration.ofDays(1));
1194 return null;
1195 });
1196 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1197
1198 // before join, owner thread
1199 assertThrows(IllegalStateException.class, subtask::get);
1200 assertThrows(IllegalStateException.class, subtask::exception);
1201
1202 // before join, another thread
1203 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
1204 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
1205
1206 // attempt join, join throws
1207 Thread.currentThread().interrupt();
1208 assertThrows(InterruptedException.class, scope::join);
1209
1210 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1211
1212 // after join, owner thread
1213 assertThrows(IllegalStateException.class, subtask::get);
1214 assertThrows(IllegalStateException.class, subtask::exception);
1215
1216 // before join, another thread
1217 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
1218 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
1219 }
1220 }
1221
1222 /**
1223 * Test Subtask forked after execution cancelled.
1224 */
1225 @ParameterizedTest
1226 @MethodSource("factories")
1227 void testSubtaskWhenCancelled(ThreadFactory factory) throws Exception {
1228 try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) {
1229 scope.fork(() -> "foo");
1230 awaitCancelled(scope);
1231
1232 var subtask = scope.fork(() -> "foo");
1233
1234 // before join, owner thread
1235 assertThrows(IllegalStateException.class, subtask::get);
1236 assertThrows(IllegalStateException.class, subtask::exception);
1237
1238 // before join, another thread
1239 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
1240 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
1241
1242 scope.join();
1243
1244 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1245
1246 // after join, owner thread
1247 assertThrows(IllegalStateException.class, subtask::get);
1248 assertThrows(IllegalStateException.class, subtask::exception);
1249
1250 // before join, another thread
1251 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
1252 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
1253 }
1254 }
1255
1256 /**
1257 * Test Subtask::toString.
1258 */
1259 @Test
1260 void testSubtaskToString() throws Exception {
1261 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1262 var latch = new CountDownLatch(1);
1263 var subtask1 = scope.fork(() -> {
1264 latch.await();
1265 return "foo";
1266 });
1267 var subtask2 = scope.fork(() -> { throw new FooException(); });
1268
1269 // subtask1 result is unavailable
1270 assertTrue(subtask1.toString().contains("Unavailable"));
1271 latch.countDown();
1272
1273 scope.join();
1274
1275 assertTrue(subtask1.toString().contains("Completed successfully"));
1276 assertTrue(subtask2.toString().contains("Failed"));
1277 }
1278 }
1279
1280 /**
1281 * Test Joiner.allSuccessfulOrThrow() with no subtasks.
1282 */
1283 @Test
1284 void testAllSuccessfulOrThrow1() throws Throwable {
1285 try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow())) {
1286 var results = scope.join();
1287 assertTrue(results.isEmpty());
1288 }
1289 }
1290
1291 /**
1292 * Test Joiner.allSuccessfulOrThrow() with subtasks that complete successfully.
1293 */
1294 @ParameterizedTest
1295 @MethodSource("factories")
1296 void testAllSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
1297 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1298 cf -> cf.withThreadFactory(factory))) {
1299 scope.fork(() -> "foo");
1300 scope.fork(() -> "bar");
1301 var results = scope.join();
1302 assertEquals(List.of("foo", "bar"), results);
1303 }
1304 }
1305
1306 /**
1307 * Test Joiner.allSuccessfulOrThrow() with a subtask that complete successfully and
1308 * a subtask that fails.
1309 */
1310 @ParameterizedTest
1311 @MethodSource("factories")
1312 void testAllSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1313 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1314 cf -> cf.withThreadFactory(factory))) {
1315 scope.fork(() -> "foo");
1316 scope.fork(() -> { throw new FooException(); });
1317 try {
1318 scope.join();
1319 } catch (FailedException e) {
1320 assertTrue(e.getCause() instanceof FooException);
1321 }
1322 }
1323 }
1324
1325 /**
1326 * Test Joiner.allSuccessfulOrThrow() with a timeout.
1327 */
1328 @Test
1329 void testAllSuccessfulOrThrow4() throws Exception {
1330 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1331 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1332 scope.fork(() -> "foo");
1333 scope.fork(() -> {
1334 Thread.sleep(Duration.ofDays(1));
1335 return "bar";
1336 });
1337 assertThrows(TimeoutException.class, scope::join);
1338
1339 // retry after join throws TimeoutException
1340 assertThrows(IllegalStateException.class, scope::join);
1341 }
1342 }
1343
1344 /**
1345 * Test Joiner.allSuccessfulOrThrow() yields an unmodifiable list.
1346 */
1347 @Test
1348 void testAllSuccessfulOrThrow5() throws Exception {
1349 // empty list
1350 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow())) {
1351 var results = scope.join();
1352 assertEquals(0, results.size());
1353 assertThrows(UnsupportedOperationException.class, () -> results.add("foo"));
1354 }
1355
1356 // non-empty list
1357 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow())) {
1358 scope.fork(() -> "foo");
1359 var results = scope.join();
1360 assertEquals(1, results.size());
1361 assertThrows(UnsupportedOperationException.class, () -> results.add("foo"));
1362 assertThrows(UnsupportedOperationException.class, () -> results.add("bar"));
1363 }
1364 }
1365
1366 /**
1367 * Test Joiner.anySuccessfulOrThrow() with no subtasks.
1368 */
1369 @Test
1370 void testAnySuccessfulOrThrow1() throws Exception {
1371 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow())) {
1372 try {
1373 scope.join();
1374 } catch (FailedException e) {
1375 assertTrue(e.getCause() instanceof NoSuchElementException);
1376 }
1377 }
1378 }
1379
1380 /**
1381 * Test Joiner.anySuccessfulOrThrow() with a subtask that completes successfully.
1382 */
1383 @ParameterizedTest
1384 @MethodSource("factories")
1385 void testAnySuccessfulOrThrow2(ThreadFactory factory) throws Exception {
1386 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulOrThrow(),
1387 cf -> cf.withThreadFactory(factory))) {
1388 scope.fork(() -> "foo");
1389 String result = scope.join();
1390 assertEquals("foo", result);
1391 }
1392 }
1393
1394 /**
1395 * Test Joiner.anySuccessfulOrThrow() with a subtask that completes successfully
1396 * with a null result.
1397 */
1398 @ParameterizedTest
1399 @MethodSource("factories")
1400 void testAnySuccessfulOrThrow3(ThreadFactory factory) throws Exception {
1401 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulOrThrow(),
1402 cf -> cf.withThreadFactory(factory))) {
1403 scope.fork(() -> null);
1404 String result = scope.join();
1405 assertNull(result);
1406 }
1407 }
1408
1409 /**
1410 * Test Joiner.anySuccessfulOrThrow() with a subtask that complete succcessfully
1411 * and a subtask that fails.
1412 */
1413 @ParameterizedTest
1414 @MethodSource("factories")
1415 void testAnySuccessfulOrThrow4(ThreadFactory factory) throws Exception {
1416 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulOrThrow(),
1417 cf -> cf.withThreadFactory(factory))) {
1418 scope.fork(() -> "foo");
1419 scope.fork(() -> { throw new FooException(); });
1420 String first = scope.join();
1421 assertEquals("foo", first);
1422 }
1423 }
1424
1425 /**
1426 * Test Joiner.anySuccessfulOrThrow() with a subtask that fails.
1427 */
1428 @ParameterizedTest
1429 @MethodSource("factories")
1430 void testAnySuccessfulOrThrow5(ThreadFactory factory) throws Exception {
1431 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow(),
1432 cf -> cf.withThreadFactory(factory))) {
1433 scope.fork(() -> { throw new FooException(); });
1434 Throwable ex = assertThrows(FailedException.class, scope::join);
1435 assertTrue(ex.getCause() instanceof FooException);
1436 }
1437 }
1438
1439 /**
1440 * Test Joiner.anySuccessfulOrThrow() with a timeout.
1441 */
1442 @Test
1443 void anySuccessfulOrThrow6() throws Exception {
1444 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulOrThrow(),
1445 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1446 scope.fork(() -> { throw new FooException(); });
1447 scope.fork(() -> {
1448 Thread.sleep(Duration.ofDays(1));
1449 return "bar";
1450 });
1451 assertThrows(TimeoutException.class, scope::join);
1452
1453 // retry after join throws TimeoutException
1454 assertThrows(IllegalStateException.class, scope::join);
1455 }
1456 }
1457
1458 /**
1459 * Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks.
1460 */
1461 @Test
1462 void testAwaitSuccessfulOrThrow1() throws Throwable {
1463 try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {
1464 var result = scope.join();
1465 assertNull(result);
1466 }
1467 }
1468
1469 /**
1470 * Test Joiner.awaitAllSuccessfulOrThrow() with subtasks that complete successfully.
1471 */
1472 @ParameterizedTest
1473 @MethodSource("factories")
1474 void testAwaitSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
1475 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1476 cf -> cf.withThreadFactory(factory))) {
1477 var subtask1 = scope.fork(() -> "foo");
1485
1486 /**
1487 * Test Joiner.awaitAllSuccessfulOrThrow() with a subtask that complete successfully and
1488 * a subtask that fails.
1489 */
1490 @ParameterizedTest
1491 @MethodSource("factories")
1492 void testAwaitSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1493 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1494 cf -> cf.withThreadFactory(factory))) {
1495 scope.fork(() -> "foo");
1496 scope.fork(() -> { throw new FooException(); });
1497 try {
1498 scope.join();
1499 } catch (FailedException e) {
1500 assertTrue(e.getCause() instanceof FooException);
1501 }
1502 }
1503 }
1504
1505 /**
1506 * Test Joiner.awaitAllSuccessfulOrThrow() with a timeout.
1507 */
1508 @Test
1509 void testAwaitSuccessfulOrThrow4() throws Exception {
1510 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1511 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1512 scope.fork(() -> "foo");
1513 scope.fork(() -> {
1514 Thread.sleep(Duration.ofDays(1));
1515 return "bar";
1516 });
1517 assertThrows(TimeoutException.class, scope::join);
1518
1519 // retry after join throws TimeoutException
1520 assertThrows(IllegalStateException.class, scope::join);
1521 }
1522 }
1523
1524 /**
1525 * Test Joiner.awaitAll() with no subtasks.
1526 */
1527 @Test
1528 void testAwaitAll1() throws Throwable {
1529 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1530 var result = scope.join();
1531 assertNull(result);
1532 }
1533 }
1534
1535 /**
1536 * Test Joiner.awaitAll() with subtasks that complete successfully.
1537 */
1538 @ParameterizedTest
1539 @MethodSource("factories")
1540 void testAwaitAll2(ThreadFactory factory) throws Throwable {
1541 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1542 cf -> cf.withThreadFactory(factory))) {
1543 var subtask1 = scope.fork(() -> "foo");
1550 }
1551
1552 /**
1553 * Test Joiner.awaitAll() with a subtask that complete successfully and a subtask
1554 * that fails.
1555 */
1556 @ParameterizedTest
1557 @MethodSource("factories")
1558 void testAwaitAll3(ThreadFactory factory) throws Throwable {
1559 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1560 cf -> cf.withThreadFactory(factory))) {
1561 var subtask1 = scope.fork(() -> "foo");
1562 var subtask2 = scope.fork(() -> { throw new FooException(); });
1563 var result = scope.join();
1564 assertNull(result);
1565 assertEquals("foo", subtask1.get());
1566 assertTrue(subtask2.exception() instanceof FooException);
1567 }
1568 }
1569
1570 /**
1571 * Test Joiner.awaitAll() with a timeout.
1572 */
1573 @Test
1574 void testAwaitAll4() throws Exception {
1575 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1576 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1577 scope.fork(() -> "foo");
1578 scope.fork(() -> {
1579 Thread.sleep(Duration.ofDays(1));
1580 return "bar";
1581 });
1582 assertThrows(TimeoutException.class, scope::join);
1583
1584 // retry after join throws TimeoutException
1585 assertThrows(IllegalStateException.class, scope::join);
1586 }
1587 }
1588
1589 /**
1590 * Test Joiner.allUntil(Predicate) with no subtasks.
1591 */
1592 @Test
1593 void testAllUntil1() throws Throwable {
1594 try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) {
1595 var subtasks = scope.join();
1596 assertEquals(0, subtasks.size());
1597 }
1598 }
1599
1600 /**
1601 * Test Joiner.allUntil(Predicate) with no cancellation.
1602 */
1603 @ParameterizedTest
1604 @MethodSource("factories")
1605 void testAllUntil2(ThreadFactory factory) throws Exception {
1606 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false),
1607 cf -> cf.withThreadFactory(factory))) {
1608
1609 var subtask1 = scope.fork(() -> "foo");
1610 var subtask2 = scope.fork(() -> { throw new FooException(); });
1611
1612 var subtasks = scope.join();
1613 assertEquals(List.of(subtask1, subtask2), subtasks);
1614
1615 assertEquals("foo", subtask1.get());
1616 assertTrue(subtask2.exception() instanceof FooException);
1617 }
1618 }
1619
1620 /**
1621 * Test Joiner.allUntil(Predicate) with cancellation after one subtask completes.
1622 */
1623 @ParameterizedTest
1624 @MethodSource("factories")
1625 void testAllUntil3(ThreadFactory factory) throws Exception {
1626 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> true),
1627 cf -> cf.withThreadFactory(factory))) {
1628
1629 var subtask1 = scope.fork(() -> "foo");
1630 var subtask2 = scope.fork(() -> {
1631 Thread.sleep(Duration.ofDays(1));
1632 return "bar";
1633 });
1634
1635 var subtasks = scope.join();
1636 assertEquals(List.of(subtask1, subtask2), subtasks);
1637
1638 assertEquals("foo", subtask1.get());
1639 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1640 }
1641 }
1642
1643 /**
1644 * Test Joiner.allUntil(Predicate) with cancellation after serveral subtasks complete.
1645 */
1646 @ParameterizedTest
1647 @MethodSource("factories")
1648 void testAllUntil4(ThreadFactory factory) throws Exception {
1649
1650 // cancel execution after two or more failures
1651 class CancelAfterTwoFailures<T> implements Predicate<Subtask<T>> {
1652 final AtomicInteger failedCount = new AtomicInteger();
1653 @Override
1654 public boolean test(Subtask<T> subtask) {
1655 return subtask.state() == Subtask.State.FAILED
1656 && failedCount.incrementAndGet() >= 2;
1657 }
1658 }
1659 var joiner = Joiner.allUntil(new CancelAfterTwoFailures<String>());
1660
1661 try (var scope = StructuredTaskScope.open(joiner)) {
1662 int forkCount = 0;
1663
1664 // fork subtasks until execution cancelled
1665 while (!scope.isCancelled()) {
1666 scope.fork(() -> "foo");
1667 scope.fork(() -> { throw new FooException(); });
1668 forkCount += 2;
1669 Thread.sleep(Duration.ofMillis(20));
1670 }
1671
1672 var subtasks = scope.join();
1673 assertEquals(forkCount, subtasks.size());
1674
1675 long failedCount = subtasks.stream()
1676 .filter(s -> s.state() == Subtask.State.FAILED)
1677 .count();
1678 assertTrue(failedCount >= 2);
1679 }
1680 }
1681
1682 /**
1683 * Test Test Joiner.allUntil(Predicate) where the Predicate's test method throws.
1684 */
1685 @Test
1686 void testAllUntil5() throws Exception {
1687 var joiner = Joiner.allUntil(_ -> { throw new FooException(); });
1688 var excRef = new AtomicReference<Throwable>();
1689 Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e);
1690 ThreadFactory factory = Thread.ofVirtual()
1691 .uncaughtExceptionHandler(uhe)
1692 .factory();
1693 try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
1694 scope.fork(() -> "foo");
1695 scope.join();
1696 assertInstanceOf(FooException.class, excRef.get());
1697 }
1698 }
1699
1700 /**
1701 * Test Joiner.allUntil(Predicate) with a timeout.
1702 */
1703 @Test
1704 void testAllUntil6() throws Exception {
1705 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false),
1706 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1707 var subtask1 = scope.fork(() -> "foo");
1708 var subtask2 = scope.fork(() -> {
1709 Thread.sleep(Duration.ofDays(1));
1710 return "bar";
1711 });
1712
1713 // TimeoutException should not be thrown
1714 var subtasks = scope.join();
1715
1716 // stream should have two elements, subtask1 may or may not have completed
1717 assertEquals(List.of(subtask1, subtask2), subtasks);
1718 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1719
1720 // retry after join throws TimeoutException
1721 assertThrows(IllegalStateException.class, scope::join);
1722 }
1723 }
1724
1725 /**
1726 * Test Joiner.allUntil(Predicate) yields an unmodifiable list.
1727 */
1728 @Test
1729 void testAllUntil7() throws Exception {
1730 Subtask<String> subtask1;
1731 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false))) {
1732 subtask1 = scope.fork(() -> "?");
1733 scope.join();
1734 }
1735
1736 // empty list
1737 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false))) {
1738 var subtasks = scope.join();
1739 assertEquals(0, subtasks.size());
1740 assertThrows(UnsupportedOperationException.class, () -> subtasks.add(subtask1));
1741 }
1742
1743 // non-empty list
1744 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false))) {
1745 var subtask2 = scope.fork(() -> "foo");
1746 var subtasks = scope.join();
1747 assertEquals(1, subtasks.size());
1748 assertThrows(UnsupportedOperationException.class, () -> subtasks.add(subtask1));
1749 assertThrows(UnsupportedOperationException.class, () -> subtasks.add(subtask2));
1750 }
1751 }
1752
1753 /**
1754 * Test Joiner default methods.
1755 */
1756 @Test
1757 void testJoinerDefaultMethods() throws Exception {
1758 try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) {
1759
1760 // need subtasks to test default methods
1761 var subtask1 = scope.fork(() -> "foo");
1762 awaitCancelled(scope);
1763 var subtask2 = scope.fork(() -> "bar");
1764 scope.join();
1765
1766 assertEquals(Subtask.State.SUCCESS, subtask1.state());
1767 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1768
1769 // Joiner that does not override default methods
1770 Joiner<String, Void> joiner = () -> null;
1771 assertThrows(NullPointerException.class, () -> joiner.onFork(null));
1772 assertThrows(NullPointerException.class, () -> joiner.onComplete(null));
1773 assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1));
1774 assertFalse(joiner.onFork(subtask2));
1775 assertFalse(joiner.onComplete(subtask1));
1776 assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2));
1777 assertThrows(TimeoutException.class, joiner::onTimeout);
1778 }
1779 }
1780
1781 /**
1782 * Test Joiners onFork/onComplete methods with a subtask in an unexpected state.
1783 */
1784 @Test
1785 void testJoinersWithUnavailableResult() throws Exception {
1786 try (var scope = StructuredTaskScope.open()) {
1787 var done = new CountDownLatch(1);
1788 var subtask = scope.fork(() -> {
1789 done.await();
1790 return null;
1791 });
1792
1793 // onComplete with uncompleted task should throw IAE
1794 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1795 assertThrows(IllegalArgumentException.class,
1796 () -> Joiner.allSuccessfulOrThrow().onComplete(subtask));
1797 assertThrows(IllegalArgumentException.class,
1798 () -> Joiner.anySuccessfulOrThrow().onComplete(subtask));
1799 assertThrows(IllegalArgumentException.class,
1800 () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(subtask));
1801 assertThrows(IllegalArgumentException.class,
1802 () -> Joiner.awaitAll().onComplete(subtask));
1803 assertThrows(IllegalArgumentException.class,
1804 () -> Joiner.allUntil(_ -> false).onComplete(subtask));
1805
1806 done.countDown();
1807 scope.join();
1808
1809 // onFork with completed task should throw IAE
1810 assertEquals(Subtask.State.SUCCESS, subtask.state());
1811 assertThrows(IllegalArgumentException.class,
1812 () -> Joiner.allSuccessfulOrThrow().onFork(subtask));
1813 assertThrows(IllegalArgumentException.class,
1814 () -> Joiner.anySuccessfulOrThrow().onFork(subtask));
1815 assertThrows(IllegalArgumentException.class,
1816 () -> Joiner.awaitAllSuccessfulOrThrow().onFork(subtask));
1817 assertThrows(IllegalArgumentException.class,
1818 () -> Joiner.awaitAll().onFork(subtask));
1819 assertThrows(IllegalArgumentException.class,
1820 () -> Joiner.allUntil(_ -> false).onFork(subtask));
1821 }
1822
1823 }
1824
1825 /**
1826 * Test the Configuration function apply method throwing an exception.
1827 */
1828 @Test
1829 void testConfigFunctionThrows() throws Exception {
1830 assertThrows(FooException.class,
1831 () -> StructuredTaskScope.open(Joiner.awaitAll(),
1832 cf -> { throw new FooException(); }));
1833 }
1834
1835 /**
1836 * Test Configuration equals/hashCode/toString
1837 */
1838 @Test
1839 void testConfigMethods() throws Exception {
1840 UnaryOperator<Configuration> configOperator = cf -> {
1841 var name = "duke";
1842 var threadFactory = Thread.ofPlatform().factory();
1843 var timeout = Duration.ofSeconds(10);
1844
1845 assertEquals(cf, cf);
1846 assertEquals(cf.withName(name), cf.withName(name));
1847 assertEquals(cf.withThreadFactory(threadFactory), cf.withThreadFactory(threadFactory));
1848 assertEquals(cf.withTimeout(timeout), cf.withTimeout(timeout));
1849
1850 assertNotEquals(cf, cf.withName(name));
1851 assertNotEquals(cf, cf.withThreadFactory(threadFactory));
1852 assertNotEquals(cf, cf.withTimeout(timeout));
1853
1854 assertEquals(cf.withName(name).hashCode(), cf.withName(name).hashCode());
1855 assertEquals(cf.withThreadFactory(threadFactory).hashCode(),
1856 cf.withThreadFactory(threadFactory).hashCode());
1857 assertEquals(cf.withTimeout(timeout).hashCode(), cf.withTimeout(timeout).hashCode());
1858
1859 assertTrue(cf.withName(name).toString().contains(name));
1860 assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
1861 assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
1862
1863 return cf;
1864 };
1865 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), configOperator)) {
1866 // do nothing
1867 }
1868 }
1869
1870 /**
1871 * Test for NullPointerException.
1872 */
1873 @Test
1874 void testNulls() throws Exception {
1875 assertThrows(NullPointerException.class,
1876 () -> StructuredTaskScope.open(null));
1877 assertThrows(NullPointerException.class,
1878 () -> StructuredTaskScope.open(null, cf -> cf));
1879 assertThrows(NullPointerException.class,
1880 () -> StructuredTaskScope.open(Joiner.awaitAll(), null));
1881
1882 assertThrows(NullPointerException.class, () -> Joiner.allUntil(null));
1883
1884 // fork
1885 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1894 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withName(null)));
1895 assertThrows(NullPointerException.class,
1896 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withThreadFactory(null)));
1897 assertThrows(NullPointerException.class,
1898 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withTimeout(null)));
1899
1900 // Joiner.onFork/onComplete
1901 assertThrows(NullPointerException.class,
1902 () -> Joiner.awaitAllSuccessfulOrThrow().onFork(null));
1903 assertThrows(NullPointerException.class,
1904 () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(null));
1905 assertThrows(NullPointerException.class,
1906 () -> Joiner.awaitAll().onFork(null));
1907 assertThrows(NullPointerException.class,
1908 () -> Joiner.awaitAll().onComplete(null));
1909 assertThrows(NullPointerException.class,
1910 () -> Joiner.allSuccessfulOrThrow().onFork(null));
1911 assertThrows(NullPointerException.class,
1912 () -> Joiner.allSuccessfulOrThrow().onComplete(null));
1913 assertThrows(NullPointerException.class,
1914 () -> Joiner.anySuccessfulOrThrow().onFork(null));
1915 assertThrows(NullPointerException.class,
1916 () -> Joiner.anySuccessfulOrThrow().onComplete(null));
1917 }
1918
1919 /**
1920 * ThreadFactory that counts usage.
1921 */
1922 private static class CountingThreadFactory implements ThreadFactory {
1923 final ThreadFactory delegate;
1924 final AtomicInteger threadCount = new AtomicInteger();
1925 CountingThreadFactory(ThreadFactory delegate) {
1926 this.delegate = delegate;
1927 }
1928 @Override
1929 public Thread newThread(Runnable task) {
1930 threadCount.incrementAndGet();
1931 return delegate.newThread(task);
1932 }
1933 int threadCount() {
1934 return threadCount.get();
1935 }
1936 }
1937
1938 /**
1939 * A joiner that counts that counts the number of subtasks that are forked and the
1940 * number of subtasks that complete.
1941 */
1942 private static class CountingJoiner<T> implements Joiner<T, Void> {
1943 final AtomicInteger onForkCount = new AtomicInteger();
1944 final AtomicInteger onCompleteCount = new AtomicInteger();
1945 @Override
1946 public boolean onFork(Subtask<T> subtask) {
1947 onForkCount.incrementAndGet();
1948 return false;
1949 }
1950 @Override
1951 public boolean onComplete(Subtask<T> subtask) {
1952 onCompleteCount.incrementAndGet();
1953 return false;
1954 }
1955 @Override
1956 public Void result() {
1957 return null;
1958 }
1959 int onForkCount() {
1960 return onForkCount.get();
1961 }
1962 int onCompleteCount() {
1963 return onCompleteCount.get();
1964 }
1965 }
1966
1967 /**
1968 * A joiner that cancels execution when a subtask completes. It also keeps a count
1969 * of the number of subtasks that are forked and the number of subtasks that complete.
1970 */
1971 private static class CancelAfterOneJoiner<T> implements Joiner<T, Void> {
1972 final AtomicInteger onForkCount = new AtomicInteger();
1973 final AtomicInteger onCompleteCount = new AtomicInteger();
1974 @Override
1975 public boolean onFork(Subtask<T> subtask) {
1976 onForkCount.incrementAndGet();
1977 return false;
1978 }
1979 @Override
1980 public boolean onComplete(Subtask<T> subtask) {
1981 onCompleteCount.incrementAndGet();
1982 return true;
1983 }
1984 @Override
1985 public Void result() {
1986 return null;
1987 }
1988 int onForkCount() {
1989 return onForkCount.get();
1990 }
1991 int onCompleteCount() {
1992 return onCompleteCount.get();
1993 }
1994 }
1995
1996 /**
1997 * A runtime exception for tests.
1998 */
1999 private static class FooException extends RuntimeException {
2000 FooException() { }
2052 found = true;
2053 } else {
2054 Thread.sleep(20);
2055 }
2056 }
2057 target.interrupt();
2058 }
2059
2060 /**
2061 * Schedules the current thread to be interrupted when it waits (timed or untimed)
2062 * at the given location.
2063 */
2064 private void scheduleInterruptAt(String location) {
2065 Thread target = Thread.currentThread();
2066 scheduler.submit(() -> {
2067 interruptThreadAt(target, location);
2068 return null;
2069 });
2070 }
2071
2072 /**
2073 * Calls a result returning task from another thread.
2074 */
2075 private <V> V callInOtherThread(Callable<V> task) throws Exception {
2076 var result = new AtomicReference<V>();
2077 var exc = new AtomicReference<Exception>();
2078 Thread thread = Thread.ofVirtual().start(() -> {
2079 try {
2080 result.set(task.call());
2081 } catch (Exception e) {
2082 exc.set(e);
2083 }
2084 });
2085 boolean interrupted = false;
2086 boolean terminated = false;
2087 while (!terminated) {
2088 try {
2089 thread.join();
2090 terminated = true;
2091 } catch (InterruptedException e) {
2092 interrupted = true;
2093 }
2094 }
2095 if (interrupted) {
2096 Thread.currentThread().interrupt();
2097 }
2098 Exception e = exc.get();
2099 if (e != null) {
2100 throw e;
2101 } else {
2102 return result.get();
2103 }
2104 }
2105
2106 /**
2107 * Returns true if the given stack trace contains an element for the given class
2108 * and method name.
2109 */
2110 private boolean contains(StackTraceElement[] stack, String className, String methodName) {
2111 return Arrays.stream(stack)
2112 .anyMatch(e -> className.equals(e.getClassName())
2113 && methodName.equals(e.getMethodName()));
2114 }
2115 }
|