44 import java.util.concurrent.Callable;
45 import java.util.concurrent.ConcurrentHashMap;
46 import java.util.concurrent.CountDownLatch;
47 import java.util.concurrent.Executors;
48 import java.util.concurrent.Future;
49 import java.util.concurrent.LinkedTransferQueue;
50 import java.util.concurrent.ThreadFactory;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.RejectedExecutionException;
53 import java.util.concurrent.ScheduledExecutorService;
54 import java.util.concurrent.StructuredTaskScope;
55 import java.util.concurrent.StructuredTaskScope.TimeoutException;
56 import java.util.concurrent.StructuredTaskScope.Configuration;
57 import java.util.concurrent.StructuredTaskScope.FailedException;
58 import java.util.concurrent.StructuredTaskScope.Joiner;
59 import java.util.concurrent.StructuredTaskScope.Subtask;
60 import java.util.concurrent.StructureViolationException;
61 import java.util.concurrent.atomic.AtomicBoolean;
62 import java.util.concurrent.atomic.AtomicInteger;
63 import java.util.concurrent.atomic.AtomicReference;
64 import java.util.function.Function;
65 import java.util.function.Predicate;
66 import java.util.stream.Stream;
67 import static java.lang.Thread.State.*;
68
69 import org.junit.jupiter.api.Test;
70 import org.junit.jupiter.api.BeforeAll;
71 import org.junit.jupiter.api.AfterAll;
72 import org.junit.jupiter.params.ParameterizedTest;
73 import org.junit.jupiter.params.provider.MethodSource;
74 import static org.junit.jupiter.api.Assertions.*;
75
76 class StructuredTaskScopeTest {
77 private static ScheduledExecutorService scheduler;
78 private static List<ThreadFactory> threadFactories;
79
80 @BeforeAll
81 static void setup() throws Exception {
82 scheduler = Executors.newSingleThreadScheduledExecutor();
83
84 // thread factories
85 String value = System.getProperty("threadFactory");
192 future.get();
193 }
194
195 // subtask cannot fork
196 Subtask<Boolean> subtask = scope.fork(() -> {
197 assertThrows(WrongThreadException.class, () -> {
198 scope.fork(() -> null);
199 });
200 return true;
201 });
202 scope.join();
203 assertTrue(subtask.get());
204 }
205 }
206
207 /**
208 * Test fork after join, no subtasks forked before join.
209 */
210 @ParameterizedTest
211 @MethodSource("factories")
212 void testForkAfterJoin1(ThreadFactory factory) throws Exception {
213 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
214 cf -> cf.withThreadFactory(factory))) {
215 scope.join();
216 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
217 }
218 }
219
220 /**
221 * Test fork after join, subtasks forked before join.
222 */
223 @ParameterizedTest
224 @MethodSource("factories")
225 void testForkAfterJoin2(ThreadFactory factory) throws Exception {
226 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
227 cf -> cf.withThreadFactory(factory))) {
228 scope.fork(() -> "foo");
229 scope.join();
230 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
231 }
232 }
233
234 /**
235 * Test fork after join throws.
236 */
237 @ParameterizedTest
238 @MethodSource("factories")
239 void testForkAfterJoinThrows(ThreadFactory factory) throws Exception {
240 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
241 cf -> cf.withThreadFactory(factory))) {
242 var latch = new CountDownLatch(1);
243 var subtask1 = scope.fork(() -> {
244 latch.await();
245 return "foo";
246 });
247
248 // join throws
249 Thread.currentThread().interrupt();
250 assertThrows(InterruptedException.class, scope::join);
251
252 // fork should throw
253 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
254 }
255 }
256
257 /**
258 * Test fork after task scope is cancelled. This test uses a custom Joiner to
259 * cancel execution.
260 */
261 @ParameterizedTest
262 @MethodSource("factories")
263 void testForkAfterCancel2(ThreadFactory factory) throws Exception {
264 var countingThreadFactory = new CountingThreadFactory(factory);
265 var testJoiner = new CancelAfterOneJoiner<String>();
266
267 try (var scope = StructuredTaskScope.open(testJoiner,
268 cf -> cf.withThreadFactory(countingThreadFactory))) {
269
270 // fork subtask, the scope should be cancelled when the subtask completes
271 var subtask1 = scope.fork(() -> "foo");
272 awaitCancelled(scope);
273
274 assertEquals(1, countingThreadFactory.threadCount());
275 assertEquals(1, testJoiner.onForkCount());
276 assertEquals(1, testJoiner.onCompleteCount());
279 var subtask2 = scope.fork(() -> "bar");
280
281 // onFork should be invoked, newThread and onComplete should not be invoked
282 assertEquals(1, countingThreadFactory.threadCount());
283 assertEquals(2, testJoiner.onForkCount());
284 assertEquals(1, testJoiner.onCompleteCount());
285
286 scope.join();
287
288 assertEquals(1, countingThreadFactory.threadCount());
289 assertEquals(2, testJoiner.onForkCount());
290 assertEquals(1, testJoiner.onCompleteCount());
291 assertEquals("foo", subtask1.get());
292 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
293 }
294 }
295
296 /**
297 * Test fork after task scope is closed.
298 */
299 @Test
300 void testForkAfterClose() {
301 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
302 scope.close();
303 assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
304 }
305 }
306
307 /**
308 * Test fork with a ThreadFactory that rejects creating a thread.
309 */
310 @Test
311 void testForkRejectedExecutionException() {
312 ThreadFactory factory = task -> null;
313 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
314 cf -> cf.withThreadFactory(factory))) {
315 assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null));
316 }
317 }
318
319 /**
320 * Test join with no subtasks.
321 */
365 * Test join after join completed with an exception.
366 */
367 @Test
368 void testJoinAfterJoin2() throws Exception {
369 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
370 scope.fork(() -> { throw new FooException(); });
371 Throwable ex = assertThrows(FailedException.class, scope::join);
372 assertTrue(ex.getCause() instanceof FooException);
373
374 // join already called
375 for (int i = 0 ; i < 3; i++) {
376 assertThrows(IllegalStateException.class, scope::join);
377 }
378 }
379 }
380
381 /**
382 * Test join after join completed with a timeout.
383 */
384 @Test
385 void testJoinAfterJoin3() throws Exception {
386 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
387 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
388 // wait for scope to be cancelled by timeout
389 awaitCancelled(scope);
390 assertThrows(TimeoutException.class, scope::join);
391
392 // join already called
393 for (int i = 0 ; i < 3; i++) {
394 assertThrows(IllegalStateException.class, scope::join);
395 }
396 }
397 }
398
399 /**
400 * Test join method is owner confined.
401 */
402 @ParameterizedTest
403 @MethodSource("factories")
404 void testJoinConfined(ThreadFactory factory) throws Exception {
405 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
406 cf -> cf.withThreadFactory(factory))) {
407
408 // random thread cannot join
409 try (var pool = Executors.newSingleThreadExecutor()) {
410 Future<Void> future = pool.submit(() -> {
411 assertThrows(WrongThreadException.class, scope::join);
412 return null;
413 });
414 future.get();
415 }
416
417 // subtask cannot join
418 Subtask<Boolean> subtask = scope.fork(() -> {
419 assertThrows(WrongThreadException.class, () -> { scope.join(); });
420 return true;
421 });
422 scope.join();
423 assertTrue(subtask.get());
424 }
425 }
426
427 /**
428 * Test join with interrupt status set.
429 */
430 @ParameterizedTest
431 @MethodSource("factories")
432 void testInterruptJoin1(ThreadFactory factory) throws Exception {
433 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
434 cf -> cf.withThreadFactory(factory))) {
435
436 Subtask<String> subtask = scope.fork(() -> {
437 Thread.sleep(60_000);
438 return "foo";
439 });
440
441 // join should throw
442 Thread.currentThread().interrupt();
443 try {
444 scope.join();
445 fail("join did not throw");
446 } catch (InterruptedException expected) {
447 assertFalse(Thread.interrupted()); // interrupt status should be cleared
448 }
449 }
450 }
451
452 /**
453 * Test interrupt of thread blocked in join.
454 */
455 @ParameterizedTest
456 @MethodSource("factories")
457 void testInterruptJoin2(ThreadFactory factory) throws Exception {
458 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
459 cf -> cf.withThreadFactory(factory))) {
460
461 var latch = new CountDownLatch(1);
462 Subtask<String> subtask = scope.fork(() -> {
463 Thread.sleep(60_000);
464 return "foo";
465 });
466
467 // interrupt main thread when it blocks in join
468 scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join");
469 try {
470 scope.join();
471 fail("join did not throw");
472 } catch (InterruptedException expected) {
473 assertFalse(Thread.interrupted()); // interrupt status should be clear
474 }
475 }
476 }
477
478 /**
479 * Test join when scope is cancelled.
480 */
481 @ParameterizedTest
482 @MethodSource("factories")
483 void testJoinWhenCancelled(ThreadFactory factory) throws Exception {
948 @Test
949 void testOnCompleteCancelsExecution() throws Exception {
950 var joiner = new Joiner<String, Void>() {
951 @Override
952 public boolean onComplete(Subtask<? extends String> subtask) {
953 return true;
954 }
955 @Override
956 public Void result() {
957 return null;
958 }
959 };
960 try (var scope = StructuredTaskScope.open(joiner)) {
961 assertFalse(scope.isCancelled());
962 scope.fork(() -> "foo");
963 awaitCancelled(scope);
964 scope.join();
965 }
966 }
967
968 /**
969 * Test toString.
970 */
971 @Test
972 void testToString() throws Exception {
973 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
974 cf -> cf.withName("duke"))) {
975
976 // open
977 assertTrue(scope.toString().contains("duke"));
978
979 // closed
980 scope.close();
981 assertTrue(scope.toString().contains("duke"));
982 }
983 }
984
985 /**
986 * Test Subtask with task that completes successfully.
987 */
1139
1140 /**
1141 * Test Joiner.allSuccessfulOrThrow() with a subtask that complete successfully and
1142 * a subtask that fails.
1143 */
1144 @ParameterizedTest
1145 @MethodSource("factories")
1146 void testAllSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1147 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1148 cf -> cf.withThreadFactory(factory))) {
1149 scope.fork(() -> "foo");
1150 scope.fork(() -> { throw new FooException(); });
1151 try {
1152 scope.join();
1153 } catch (FailedException e) {
1154 assertTrue(e.getCause() instanceof FooException);
1155 }
1156 }
1157 }
1158
1159 /**
1160 * Test Joiner.anySuccessfulResultOrThrow() with no subtasks.
1161 */
1162 @Test
1163 void testAnySuccessfulResultOrThrow1() throws Exception {
1164 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
1165 try {
1166 scope.join();
1167 } catch (FailedException e) {
1168 assertTrue(e.getCause() instanceof NoSuchElementException);
1169 }
1170 }
1171 }
1172
1173 /**
1174 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully.
1175 */
1176 @ParameterizedTest
1177 @MethodSource("factories")
1178 void testAnySuccessfulResultOrThrow2(ThreadFactory factory) throws Exception {
1212 scope.fork(() -> { throw new FooException(); });
1213 String first = scope.join();
1214 assertEquals("foo", first);
1215 }
1216 }
1217
1218 /**
1219 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that fails.
1220 */
1221 @ParameterizedTest
1222 @MethodSource("factories")
1223 void testAnySuccessfulResultOrThrow5(ThreadFactory factory) throws Exception {
1224 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
1225 cf -> cf.withThreadFactory(factory))) {
1226 scope.fork(() -> { throw new FooException(); });
1227 Throwable ex = assertThrows(FailedException.class, scope::join);
1228 assertTrue(ex.getCause() instanceof FooException);
1229 }
1230 }
1231
1232 /**
1233 * Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks.
1234 */
1235 @Test
1236 void testAwaitSuccessfulOrThrow1() throws Throwable {
1237 try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {
1238 var result = scope.join();
1239 assertNull(result);
1240 }
1241 }
1242
1243 /**
1244 * Test Joiner.awaitAllSuccessfulOrThrow() with subtasks that complete successfully.
1245 */
1246 @ParameterizedTest
1247 @MethodSource("factories")
1248 void testAwaitSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
1249 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1250 cf -> cf.withThreadFactory(factory))) {
1251 var subtask1 = scope.fork(() -> "foo");
1259
1260 /**
1261 * Test Joiner.awaitAllSuccessfulOrThrow() with a subtask that complete successfully and
1262 * a subtask that fails.
1263 */
1264 @ParameterizedTest
1265 @MethodSource("factories")
1266 void testAwaitSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1267 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1268 cf -> cf.withThreadFactory(factory))) {
1269 scope.fork(() -> "foo");
1270 scope.fork(() -> { throw new FooException(); });
1271 try {
1272 scope.join();
1273 } catch (FailedException e) {
1274 assertTrue(e.getCause() instanceof FooException);
1275 }
1276 }
1277 }
1278
1279 /**
1280 * Test Joiner.awaitAll() with no subtasks.
1281 */
1282 @Test
1283 void testAwaitAll1() throws Throwable {
1284 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1285 var result = scope.join();
1286 assertNull(result);
1287 }
1288 }
1289
1290 /**
1291 * Test Joiner.awaitAll() with subtasks that complete successfully.
1292 */
1293 @ParameterizedTest
1294 @MethodSource("factories")
1295 void testAwaitAll2(ThreadFactory factory) throws Throwable {
1296 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1297 cf -> cf.withThreadFactory(factory))) {
1298 var subtask1 = scope.fork(() -> "foo");
1305 }
1306
1307 /**
1308 * Test Joiner.awaitAll() with a subtask that complete successfully and a subtask
1309 * that fails.
1310 */
1311 @ParameterizedTest
1312 @MethodSource("factories")
1313 void testAwaitAll3(ThreadFactory factory) throws Throwable {
1314 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1315 cf -> cf.withThreadFactory(factory))) {
1316 var subtask1 = scope.fork(() -> "foo");
1317 var subtask2 = scope.fork(() -> { throw new FooException(); });
1318 var result = scope.join();
1319 assertNull(result);
1320 assertEquals("foo", subtask1.get());
1321 assertTrue(subtask2.exception() instanceof FooException);
1322 }
1323 }
1324
1325 /**
1326 * Test Joiner.allUntil(Predicate) with no subtasks.
1327 */
1328 @Test
1329 void testAllUntil1() throws Throwable {
1330 try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) {
1331 var subtasks = scope.join();
1332 assertEquals(0, subtasks.count());
1333 }
1334 }
1335
1336 /**
1337 * Test Joiner.allUntil(Predicate) with no cancellation.
1338 */
1339 @ParameterizedTest
1340 @MethodSource("factories")
1341 void testAllUntil2(ThreadFactory factory) throws Exception {
1342 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false),
1343 cf -> cf.withThreadFactory(factory))) {
1344
1420 }
1421
1422 /**
1423 * Test Test Joiner.allUntil(Predicate) where the Predicate's test method throws.
1424 */
1425 @Test
1426 void testAllUntil5() throws Exception {
1427 var joiner = Joiner.allUntil(_ -> { throw new FooException(); });
1428 var excRef = new AtomicReference<Throwable>();
1429 Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e);
1430 ThreadFactory factory = Thread.ofVirtual()
1431 .uncaughtExceptionHandler(uhe)
1432 .factory();
1433 try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
1434 scope.fork(() -> "foo");
1435 scope.join();
1436 assertInstanceOf(FooException.class, excRef.get());
1437 }
1438 }
1439
1440 /**
1441 * Test Joiner default methods.
1442 */
1443 @Test
1444 void testJoinerDefaultMethods() throws Exception {
1445 try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) {
1446
1447 // need subtasks to test default methods
1448 var subtask1 = scope.fork(() -> "foo");
1449 awaitCancelled(scope);
1450 var subtask2 = scope.fork(() -> "bar");
1451 scope.join();
1452
1453 assertEquals(Subtask.State.SUCCESS, subtask1.state());
1454 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1455
1456 // Joiner that does not override default methods
1457 Joiner<Object, Void> joiner = () -> null;
1458 assertThrows(NullPointerException.class, () -> joiner.onFork(null));
1459 assertThrows(NullPointerException.class, () -> joiner.onComplete(null));
1460 assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1));
1461 assertFalse(joiner.onFork(subtask2));
1462 assertFalse(joiner.onComplete(subtask1));
1463 assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2));
1464 }
1465 }
1466
1467 /**
1468 * Test Joiners onFork/onComplete methods with a subtask in an unexpected state.
1469 */
1470 @Test
1471 void testJoinersWithUnavailableResult() throws Exception {
1472 try (var scope = StructuredTaskScope.open()) {
1473 var done = new CountDownLatch(1);
1474 var subtask = scope.fork(() -> {
1475 done.await();
1476 return null;
1477 });
1478
1479 // onComplete with uncompleted task should throw IAE
1480 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1481 assertThrows(IllegalArgumentException.class,
1482 () -> Joiner.allSuccessfulOrThrow().onComplete(subtask));
1483 assertThrows(IllegalArgumentException.class,
1506 () -> Joiner.allUntil(_ -> false).onFork(subtask));
1507 }
1508
1509 }
1510
1511 /**
1512 * Test the Configuration function apply method throwing an exception.
1513 */
1514 @Test
1515 void testConfigFunctionThrows() throws Exception {
1516 assertThrows(FooException.class,
1517 () -> StructuredTaskScope.open(Joiner.awaitAll(),
1518 cf -> { throw new FooException(); }));
1519 }
1520
1521 /**
1522 * Test Configuration equals/hashCode/toString
1523 */
1524 @Test
1525 void testConfigMethods() throws Exception {
1526 Function<Configuration, Configuration> testConfig = cf -> {
1527 var name = "duke";
1528 var threadFactory = Thread.ofPlatform().factory();
1529 var timeout = Duration.ofSeconds(10);
1530
1531 assertEquals(cf, cf);
1532 assertEquals(cf.withName(name), cf.withName(name));
1533 assertEquals(cf.withThreadFactory(threadFactory), cf.withThreadFactory(threadFactory));
1534 assertEquals(cf.withTimeout(timeout), cf.withTimeout(timeout));
1535
1536 assertNotEquals(cf, cf.withName(name));
1537 assertNotEquals(cf, cf.withThreadFactory(threadFactory));
1538 assertNotEquals(cf, cf.withTimeout(timeout));
1539
1540 assertEquals(cf.withName(name).hashCode(), cf.withName(name).hashCode());
1541 assertEquals(cf.withThreadFactory(threadFactory).hashCode(),
1542 cf.withThreadFactory(threadFactory).hashCode());
1543 assertEquals(cf.withTimeout(timeout).hashCode(), cf.withTimeout(timeout).hashCode());
1544
1545 assertTrue(cf.withName(name).toString().contains(name));
1546 assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
1547 assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
1548
1549 return cf;
1550 };
1551 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), testConfig)) {
1552 // do nothing
1553 }
1554 }
1555
1556 /**
1557 * Test for NullPointerException.
1558 */
1559 @Test
1560 void testNulls() throws Exception {
1561 assertThrows(NullPointerException.class,
1562 () -> StructuredTaskScope.open(null));
1563 assertThrows(NullPointerException.class,
1564 () -> StructuredTaskScope.open(null, cf -> cf));
1565 assertThrows(NullPointerException.class,
1566 () -> StructuredTaskScope.open(Joiner.awaitAll(), null));
1567
1568 assertThrows(NullPointerException.class, () -> Joiner.allUntil(null));
1569
1570 // fork
1571 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
|
44 import java.util.concurrent.Callable;
45 import java.util.concurrent.ConcurrentHashMap;
46 import java.util.concurrent.CountDownLatch;
47 import java.util.concurrent.Executors;
48 import java.util.concurrent.Future;
49 import java.util.concurrent.LinkedTransferQueue;
50 import java.util.concurrent.ThreadFactory;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.RejectedExecutionException;
53 import java.util.concurrent.ScheduledExecutorService;
54 import java.util.concurrent.StructuredTaskScope;
55 import java.util.concurrent.StructuredTaskScope.TimeoutException;
56 import java.util.concurrent.StructuredTaskScope.Configuration;
57 import java.util.concurrent.StructuredTaskScope.FailedException;
58 import java.util.concurrent.StructuredTaskScope.Joiner;
59 import java.util.concurrent.StructuredTaskScope.Subtask;
60 import java.util.concurrent.StructureViolationException;
61 import java.util.concurrent.atomic.AtomicBoolean;
62 import java.util.concurrent.atomic.AtomicInteger;
63 import java.util.concurrent.atomic.AtomicReference;
64 import java.util.function.Predicate;
65 import java.util.function.UnaryOperator;
66 import java.util.stream.Stream;
67 import static java.lang.Thread.State.*;
68
69 import org.junit.jupiter.api.Test;
70 import org.junit.jupiter.api.BeforeAll;
71 import org.junit.jupiter.api.AfterAll;
72 import org.junit.jupiter.params.ParameterizedTest;
73 import org.junit.jupiter.params.provider.MethodSource;
74 import static org.junit.jupiter.api.Assertions.*;
75
76 class StructuredTaskScopeTest {
77 private static ScheduledExecutorService scheduler;
78 private static List<ThreadFactory> threadFactories;
79
80 @BeforeAll
81 static void setup() throws Exception {
82 scheduler = Executors.newSingleThreadScheduledExecutor();
83
84 // thread factories
85 String value = System.getProperty("threadFactory");
192 future.get();
193 }
194
195 // subtask cannot fork
196 Subtask<Boolean> subtask = scope.fork(() -> {
197 assertThrows(WrongThreadException.class, () -> {
198 scope.fork(() -> null);
199 });
200 return true;
201 });
202 scope.join();
203 assertTrue(subtask.get());
204 }
205 }
206
207 /**
208 * Test fork after join, no subtasks forked before join.
209 */
210 @ParameterizedTest
211 @MethodSource("factories")
212 void testForkAfterJoinCompleted1(ThreadFactory factory) throws Exception {
213 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
214 cf -> cf.withThreadFactory(factory))) {
215 scope.join();
216 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
217 }
218 }
219
220 /**
221 * Test fork after join, subtasks forked before join.
222 */
223 @ParameterizedTest
224 @MethodSource("factories")
225 void testForkAfterJoinCompleted2(ThreadFactory factory) throws Exception {
226 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
227 cf -> cf.withThreadFactory(factory))) {
228 scope.fork(() -> "foo");
229 scope.join();
230 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
231 }
232 }
233
234 /**
235 * Test fork after join interrupted.
236 */
237 @ParameterizedTest
238 @MethodSource("factories")
239 void testForkAfterJoinInterrupted(ThreadFactory factory) throws Exception {
240 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
241 cf -> cf.withThreadFactory(factory))) {
242 var subtask1 = scope.fork(() -> {
243 Thread.sleep(Duration.ofDays(1));
244 return "foo";
245 });
246
247 // join throws
248 Thread.currentThread().interrupt();
249 assertThrows(InterruptedException.class, scope::join);
250
251 // fork should throw
252 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
253 }
254 }
255
256 /**
257 * Test fork after join timeout.
258 */
259 @ParameterizedTest
260 @MethodSource("factories")
261 void testForkAfterJoinTimeout(ThreadFactory factory) throws Exception {
262 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
263 cf -> cf.withThreadFactory(factory)
264 .withTimeout(Duration.ofMillis(100)))) {
265 awaitCancelled(scope);
266
267 // join throws
268 assertThrows(TimeoutException.class, scope::join);
269
270 // fork should throw
271 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
272 }
273 }
274
275 /**
276 * Test fork after task scope is cancelled. This test uses a custom Joiner to
277 * cancel execution.
278 */
279 @ParameterizedTest
280 @MethodSource("factories")
281 void testForkAfterCancel2(ThreadFactory factory) throws Exception {
282 var countingThreadFactory = new CountingThreadFactory(factory);
283 var testJoiner = new CancelAfterOneJoiner<String>();
284
285 try (var scope = StructuredTaskScope.open(testJoiner,
286 cf -> cf.withThreadFactory(countingThreadFactory))) {
287
288 // fork subtask, the scope should be cancelled when the subtask completes
289 var subtask1 = scope.fork(() -> "foo");
290 awaitCancelled(scope);
291
292 assertEquals(1, countingThreadFactory.threadCount());
293 assertEquals(1, testJoiner.onForkCount());
294 assertEquals(1, testJoiner.onCompleteCount());
297 var subtask2 = scope.fork(() -> "bar");
298
299 // onFork should be invoked, newThread and onComplete should not be invoked
300 assertEquals(1, countingThreadFactory.threadCount());
301 assertEquals(2, testJoiner.onForkCount());
302 assertEquals(1, testJoiner.onCompleteCount());
303
304 scope.join();
305
306 assertEquals(1, countingThreadFactory.threadCount());
307 assertEquals(2, testJoiner.onForkCount());
308 assertEquals(1, testJoiner.onCompleteCount());
309 assertEquals("foo", subtask1.get());
310 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
311 }
312 }
313
314 /**
315 * Test fork after task scope is closed.
316 */
317 @ParameterizedTest
318 @MethodSource("factories")
319 void testForkAfterClose(ThreadFactory factory) {
320 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
321 cf -> cf.withThreadFactory(factory))) {
322 scope.close();
323 assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
324 }
325 }
326
327 /**
328 * Test fork with a ThreadFactory that rejects creating a thread.
329 */
330 @Test
331 void testForkRejectedExecutionException() {
332 ThreadFactory factory = task -> null;
333 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
334 cf -> cf.withThreadFactory(factory))) {
335 assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null));
336 }
337 }
338
339 /**
340 * Test join with no subtasks.
341 */
385 * Test join after join completed with an exception.
386 */
387 @Test
388 void testJoinAfterJoin2() throws Exception {
389 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
390 scope.fork(() -> { throw new FooException(); });
391 Throwable ex = assertThrows(FailedException.class, scope::join);
392 assertTrue(ex.getCause() instanceof FooException);
393
394 // join already called
395 for (int i = 0 ; i < 3; i++) {
396 assertThrows(IllegalStateException.class, scope::join);
397 }
398 }
399 }
400
401 /**
402 * Test join after join completed with a timeout.
403 */
404 @Test
405 void testJoinAfterJoinInterrupted() throws Exception {
406 try (var scope = StructuredTaskScope.open()) {
407 var latch = new CountDownLatch(1);
408 var subtask = scope.fork(() -> {
409 latch.await();
410 return "foo";
411 });
412
413 // join throws InterruptedException
414 Thread.currentThread().interrupt();
415 assertThrows(InterruptedException.class, scope::join);
416
417 latch.countDown();
418
419 // retry join to get result
420 scope.join();
421 assertEquals("foo", subtask.get());
422
423 // retry after otbaining result
424 assertThrows(IllegalStateException.class, scope::join);
425 }
426 }
427
428 /**
429 * Test join after join completed with a timeout.
430 */
431 @Test
432 void testJoinAfterJoinTimeout() throws Exception {
433 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
434 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
435 // wait for scope to be cancelled by timeout
436 awaitCancelled(scope);
437 assertThrows(TimeoutException.class, scope::join);
438
439 // join already called
440 for (int i = 0 ; i < 3; i++) {
441 assertThrows(IllegalStateException.class, scope::join);
442 }
443 }
444 }
445
446 /**
447 * Test join invoked from Joiner.onTimeout.
448 */
449 @Test
450 void testJoinInOnTimeout() throws Exception {
451 Thread owner = Thread.currentThread();
452 var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>();
453
454 var joiner = new Joiner<String, Void>() {
455 @Override
456 public void onTimeout() {
457 assertTrue(Thread.currentThread() == owner);
458 var scope = scopeRef.get();
459 assertThrows(IllegalStateException.class, scope::join);
460 }
461 @Override
462 public Void result() {
463 return null;
464 }
465 };
466
467 try (var scope = StructuredTaskScope.open(joiner,
468 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
469 awaitCancelled(scope);
470 scopeRef.set(scope);
471 scope.join(); // invokes onTimeout
472 }
473 }
474
475 /**
476 * Test join method is owner confined.
477 */
478 @ParameterizedTest
479 @MethodSource("factories")
480 void testJoinConfined(ThreadFactory factory) throws Exception {
481 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
482 cf -> cf.withThreadFactory(factory))) {
483
484 // random thread cannot join
485 try (var pool = Executors.newSingleThreadExecutor()) {
486 Future<Void> future = pool.submit(() -> {
487 assertThrows(WrongThreadException.class, scope::join);
488 return null;
489 });
490 future.get();
491 }
492
493 // subtask cannot join
494 Subtask<Boolean> subtask = scope.fork(() -> {
495 assertThrows(WrongThreadException.class, () -> { scope.join(); });
496 return true;
497 });
498 scope.join();
499 assertTrue(subtask.get());
500 }
501 }
502
503 /**
504 * Test join with interrupt status set.
505 */
506 @ParameterizedTest
507 @MethodSource("factories")
508 void testInterruptJoin1(ThreadFactory factory) throws Exception {
509 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
510 cf -> cf.withThreadFactory(factory))) {
511
512 Subtask<String> subtask = scope.fork(() -> {
513 Thread.sleep(Duration.ofDays(1));
514 return "foo";
515 });
516
517 // join should throw
518 Thread.currentThread().interrupt();
519 try {
520 scope.join();
521 fail("join did not throw");
522 } catch (InterruptedException expected) {
523 assertFalse(Thread.interrupted()); // interrupt status should be cleared
524 }
525 }
526 }
527
528 /**
529 * Test interrupt of thread blocked in join.
530 */
531 @ParameterizedTest
532 @MethodSource("factories")
533 void testInterruptJoin2(ThreadFactory factory) throws Exception {
534 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
535 cf -> cf.withThreadFactory(factory))) {
536 Subtask<String> subtask = scope.fork(() -> {
537 Thread.sleep(Duration.ofDays(1));
538 return "foo";
539 });
540
541 // interrupt main thread when it blocks in join
542 scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join");
543 try {
544 scope.join();
545 fail("join did not throw");
546 } catch (InterruptedException expected) {
547 assertFalse(Thread.interrupted()); // interrupt status should be clear
548 }
549 }
550 }
551
552 /**
553 * Test join when scope is cancelled.
554 */
555 @ParameterizedTest
556 @MethodSource("factories")
557 void testJoinWhenCancelled(ThreadFactory factory) throws Exception {
1022 @Test
1023 void testOnCompleteCancelsExecution() throws Exception {
1024 var joiner = new Joiner<String, Void>() {
1025 @Override
1026 public boolean onComplete(Subtask<? extends String> subtask) {
1027 return true;
1028 }
1029 @Override
1030 public Void result() {
1031 return null;
1032 }
1033 };
1034 try (var scope = StructuredTaskScope.open(joiner)) {
1035 assertFalse(scope.isCancelled());
1036 scope.fork(() -> "foo");
1037 awaitCancelled(scope);
1038 scope.join();
1039 }
1040 }
1041
1042 /**
1043 * Test Joiner.onTimeout invoked by owner thread when timeout expires.
1044 */
1045 @Test
1046 void testOnTimeoutInvoked() throws Exception {
1047 var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>();
1048 Thread owner = Thread.currentThread();
1049 var invokeCount = new AtomicInteger();
1050 var joiner = new Joiner<String, Void>() {
1051 @Override
1052 public void onTimeout() {
1053 assertTrue(Thread.currentThread() == owner);
1054 assertTrue(scopeRef.get().isCancelled());
1055 invokeCount.incrementAndGet();
1056 }
1057 @Override
1058 public Void result() {
1059 return null;
1060 }
1061 };
1062 try (var scope = StructuredTaskScope.open(joiner,
1063 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1064 scopeRef.set(scope);
1065 scope.fork(() -> {
1066 Thread.sleep(Duration.ofDays(1));
1067 return null;
1068 });
1069 scope.join();
1070 assertEquals(1, invokeCount.get());
1071 }
1072 }
1073
1074 /**
1075 * Test Joiner.onTimeout throwing an excepiton.
1076 */
1077 @Test
1078 void testOnTimeoutThrows() throws Exception {
1079 var joiner = new Joiner<String, Void>() {
1080 @Override
1081 public void onTimeout() {
1082 throw new FooException();
1083 }
1084 @Override
1085 public Void result() {
1086 return null;
1087 }
1088 };
1089 try (var scope = StructuredTaskScope.open(joiner,
1090 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1091 // wait for scope to be cancelled by timeout
1092 awaitCancelled(scope);
1093
1094 // join should throw FooException on first usage
1095 assertThrows(FooException.class, scope::join);
1096
1097 // retry after onTimeout fails
1098 assertThrows(IllegalStateException.class, scope::join);
1099 }
1100 }
1101
1102 /**
1103 * Test toString.
1104 */
1105 @Test
1106 void testToString() throws Exception {
1107 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
1108 cf -> cf.withName("duke"))) {
1109
1110 // open
1111 assertTrue(scope.toString().contains("duke"));
1112
1113 // closed
1114 scope.close();
1115 assertTrue(scope.toString().contains("duke"));
1116 }
1117 }
1118
1119 /**
1120 * Test Subtask with task that completes successfully.
1121 */
1273
1274 /**
1275 * Test Joiner.allSuccessfulOrThrow() with a subtask that complete successfully and
1276 * a subtask that fails.
1277 */
1278 @ParameterizedTest
1279 @MethodSource("factories")
1280 void testAllSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1281 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1282 cf -> cf.withThreadFactory(factory))) {
1283 scope.fork(() -> "foo");
1284 scope.fork(() -> { throw new FooException(); });
1285 try {
1286 scope.join();
1287 } catch (FailedException e) {
1288 assertTrue(e.getCause() instanceof FooException);
1289 }
1290 }
1291 }
1292
1293 /**
1294 * Test Joiner.allSuccessfulOrThrow() with a timeout.
1295 */
1296 @Test
1297 void testAllSuccessfulOrThrow4() throws Exception {
1298 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1299 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1300 scope.fork(() -> "foo");
1301 scope.fork(() -> {
1302 Thread.sleep(Duration.ofDays(1));
1303 return "bar";
1304 });
1305 assertThrows(TimeoutException.class, scope::join);
1306
1307 // retry after join throws TimeoutException
1308 assertThrows(IllegalStateException.class, scope::join);
1309 }
1310 }
1311
1312 /**
1313 * Test Joiner.anySuccessfulResultOrThrow() with no subtasks.
1314 */
1315 @Test
1316 void testAnySuccessfulResultOrThrow1() throws Exception {
1317 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
1318 try {
1319 scope.join();
1320 } catch (FailedException e) {
1321 assertTrue(e.getCause() instanceof NoSuchElementException);
1322 }
1323 }
1324 }
1325
1326 /**
1327 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully.
1328 */
1329 @ParameterizedTest
1330 @MethodSource("factories")
1331 void testAnySuccessfulResultOrThrow2(ThreadFactory factory) throws Exception {
1365 scope.fork(() -> { throw new FooException(); });
1366 String first = scope.join();
1367 assertEquals("foo", first);
1368 }
1369 }
1370
1371 /**
1372 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that fails.
1373 */
1374 @ParameterizedTest
1375 @MethodSource("factories")
1376 void testAnySuccessfulResultOrThrow5(ThreadFactory factory) throws Exception {
1377 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
1378 cf -> cf.withThreadFactory(factory))) {
1379 scope.fork(() -> { throw new FooException(); });
1380 Throwable ex = assertThrows(FailedException.class, scope::join);
1381 assertTrue(ex.getCause() instanceof FooException);
1382 }
1383 }
1384
1385 /**
1386 * Test Joiner.allSuccessfulOrThrow() with a timeout.
1387 */
1388 @Test
1389 void anySuccessfulResultOrThrow6() throws Exception {
1390 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
1391 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1392 scope.fork(() -> { throw new FooException(); });
1393 scope.fork(() -> {
1394 Thread.sleep(Duration.ofDays(1));
1395 return "bar";
1396 });
1397 assertThrows(TimeoutException.class, scope::join);
1398
1399 // retry after join throws TimeoutException
1400 assertThrows(IllegalStateException.class, scope::join);
1401 }
1402 }
1403
1404 /**
1405 * Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks.
1406 */
1407 @Test
1408 void testAwaitSuccessfulOrThrow1() throws Throwable {
1409 try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {
1410 var result = scope.join();
1411 assertNull(result);
1412 }
1413 }
1414
1415 /**
1416 * Test Joiner.awaitAllSuccessfulOrThrow() with subtasks that complete successfully.
1417 */
1418 @ParameterizedTest
1419 @MethodSource("factories")
1420 void testAwaitSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
1421 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1422 cf -> cf.withThreadFactory(factory))) {
1423 var subtask1 = scope.fork(() -> "foo");
1431
1432 /**
1433 * Test Joiner.awaitAllSuccessfulOrThrow() with a subtask that complete successfully and
1434 * a subtask that fails.
1435 */
1436 @ParameterizedTest
1437 @MethodSource("factories")
1438 void testAwaitSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1439 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1440 cf -> cf.withThreadFactory(factory))) {
1441 scope.fork(() -> "foo");
1442 scope.fork(() -> { throw new FooException(); });
1443 try {
1444 scope.join();
1445 } catch (FailedException e) {
1446 assertTrue(e.getCause() instanceof FooException);
1447 }
1448 }
1449 }
1450
1451 /**
1452 * Test Joiner.awaitAllSuccessfulOrThrow() with a timeout.
1453 */
1454 @Test
1455 void testAwaitSuccessfulOrThrow4() throws Exception {
1456 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1457 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1458 scope.fork(() -> "foo");
1459 scope.fork(() -> {
1460 Thread.sleep(Duration.ofDays(1));
1461 return "bar";
1462 });
1463 assertThrows(TimeoutException.class, scope::join);
1464
1465 // retry after join throws TimeoutException
1466 assertThrows(IllegalStateException.class, scope::join);
1467 }
1468 }
1469
1470 /**
1471 * Test Joiner.awaitAll() with no subtasks.
1472 */
1473 @Test
1474 void testAwaitAll1() throws Throwable {
1475 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1476 var result = scope.join();
1477 assertNull(result);
1478 }
1479 }
1480
1481 /**
1482 * Test Joiner.awaitAll() with subtasks that complete successfully.
1483 */
1484 @ParameterizedTest
1485 @MethodSource("factories")
1486 void testAwaitAll2(ThreadFactory factory) throws Throwable {
1487 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1488 cf -> cf.withThreadFactory(factory))) {
1489 var subtask1 = scope.fork(() -> "foo");
1496 }
1497
1498 /**
1499 * Test Joiner.awaitAll() with a subtask that complete successfully and a subtask
1500 * that fails.
1501 */
1502 @ParameterizedTest
1503 @MethodSource("factories")
1504 void testAwaitAll3(ThreadFactory factory) throws Throwable {
1505 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1506 cf -> cf.withThreadFactory(factory))) {
1507 var subtask1 = scope.fork(() -> "foo");
1508 var subtask2 = scope.fork(() -> { throw new FooException(); });
1509 var result = scope.join();
1510 assertNull(result);
1511 assertEquals("foo", subtask1.get());
1512 assertTrue(subtask2.exception() instanceof FooException);
1513 }
1514 }
1515
1516 /**
1517 * Test Joiner.awaitAll() with a timeout.
1518 */
1519 @Test
1520 void testAwaitAll4() throws Exception {
1521 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1522 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1523 scope.fork(() -> "foo");
1524 scope.fork(() -> {
1525 Thread.sleep(Duration.ofDays(1));
1526 return "bar";
1527 });
1528 assertThrows(TimeoutException.class, scope::join);
1529
1530 // retry after join throws TimeoutException
1531 assertThrows(IllegalStateException.class, scope::join);
1532 }
1533 }
1534
1535 /**
1536 * Test Joiner.allUntil(Predicate) with no subtasks.
1537 */
1538 @Test
1539 void testAllUntil1() throws Throwable {
1540 try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) {
1541 var subtasks = scope.join();
1542 assertEquals(0, subtasks.count());
1543 }
1544 }
1545
1546 /**
1547 * Test Joiner.allUntil(Predicate) with no cancellation.
1548 */
1549 @ParameterizedTest
1550 @MethodSource("factories")
1551 void testAllUntil2(ThreadFactory factory) throws Exception {
1552 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false),
1553 cf -> cf.withThreadFactory(factory))) {
1554
1630 }
1631
1632 /**
1633 * Test Test Joiner.allUntil(Predicate) where the Predicate's test method throws.
1634 */
1635 @Test
1636 void testAllUntil5() throws Exception {
1637 var joiner = Joiner.allUntil(_ -> { throw new FooException(); });
1638 var excRef = new AtomicReference<Throwable>();
1639 Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e);
1640 ThreadFactory factory = Thread.ofVirtual()
1641 .uncaughtExceptionHandler(uhe)
1642 .factory();
1643 try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
1644 scope.fork(() -> "foo");
1645 scope.join();
1646 assertInstanceOf(FooException.class, excRef.get());
1647 }
1648 }
1649
1650 /**
1651 * Test Joiner.allUntil(Predicate) with a timeout.
1652 */
1653 @Test
1654 void testAllUntil6() throws Exception {
1655 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false),
1656 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1657 var subtask1 = scope.fork(() -> "foo");
1658 var subtask2 = scope.fork(() -> {
1659 Thread.sleep(Duration.ofDays(1));
1660 return "bar";
1661 });
1662
1663 // TimeoutException should not be thrown
1664 var subtasks = scope.join().toList();
1665
1666 // stream should have two elements, subtask1 may or may not have completed
1667 assertEquals(2, subtasks.size());
1668 assertSame(subtask1, subtasks.get(0));
1669 assertSame(subtask2, subtasks.get(1));
1670 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1671
1672 // retry after join throws TimeoutException
1673 assertThrows(IllegalStateException.class, scope::join);
1674 }
1675 }
1676
1677 /**
1678 * Test Joiner default methods.
1679 */
1680 @Test
1681 void testJoinerDefaultMethods() throws Exception {
1682 try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) {
1683
1684 // need subtasks to test default methods
1685 var subtask1 = scope.fork(() -> "foo");
1686 awaitCancelled(scope);
1687 var subtask2 = scope.fork(() -> "bar");
1688 scope.join();
1689
1690 assertEquals(Subtask.State.SUCCESS, subtask1.state());
1691 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1692
1693 // Joiner that does not override default methods
1694 Joiner<Object, Void> joiner = () -> null;
1695 assertThrows(NullPointerException.class, () -> joiner.onFork(null));
1696 assertThrows(NullPointerException.class, () -> joiner.onComplete(null));
1697 assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1));
1698 assertFalse(joiner.onFork(subtask2));
1699 assertFalse(joiner.onComplete(subtask1));
1700 assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2));
1701 assertThrows(TimeoutException.class, joiner::onTimeout);
1702 }
1703 }
1704
1705 /**
1706 * Test Joiners onFork/onComplete methods with a subtask in an unexpected state.
1707 */
1708 @Test
1709 void testJoinersWithUnavailableResult() throws Exception {
1710 try (var scope = StructuredTaskScope.open()) {
1711 var done = new CountDownLatch(1);
1712 var subtask = scope.fork(() -> {
1713 done.await();
1714 return null;
1715 });
1716
1717 // onComplete with uncompleted task should throw IAE
1718 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1719 assertThrows(IllegalArgumentException.class,
1720 () -> Joiner.allSuccessfulOrThrow().onComplete(subtask));
1721 assertThrows(IllegalArgumentException.class,
1744 () -> Joiner.allUntil(_ -> false).onFork(subtask));
1745 }
1746
1747 }
1748
1749 /**
1750 * Test the Configuration function apply method throwing an exception.
1751 */
1752 @Test
1753 void testConfigFunctionThrows() throws Exception {
1754 assertThrows(FooException.class,
1755 () -> StructuredTaskScope.open(Joiner.awaitAll(),
1756 cf -> { throw new FooException(); }));
1757 }
1758
1759 /**
1760 * Test Configuration equals/hashCode/toString
1761 */
1762 @Test
1763 void testConfigMethods() throws Exception {
1764 UnaryOperator<Configuration> configOperator = cf -> {
1765 var name = "duke";
1766 var threadFactory = Thread.ofPlatform().factory();
1767 var timeout = Duration.ofSeconds(10);
1768
1769 assertEquals(cf, cf);
1770 assertEquals(cf.withName(name), cf.withName(name));
1771 assertEquals(cf.withThreadFactory(threadFactory), cf.withThreadFactory(threadFactory));
1772 assertEquals(cf.withTimeout(timeout), cf.withTimeout(timeout));
1773
1774 assertNotEquals(cf, cf.withName(name));
1775 assertNotEquals(cf, cf.withThreadFactory(threadFactory));
1776 assertNotEquals(cf, cf.withTimeout(timeout));
1777
1778 assertEquals(cf.withName(name).hashCode(), cf.withName(name).hashCode());
1779 assertEquals(cf.withThreadFactory(threadFactory).hashCode(),
1780 cf.withThreadFactory(threadFactory).hashCode());
1781 assertEquals(cf.withTimeout(timeout).hashCode(), cf.withTimeout(timeout).hashCode());
1782
1783 assertTrue(cf.withName(name).toString().contains(name));
1784 assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
1785 assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
1786
1787 return cf;
1788 };
1789 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), configOperator)) {
1790 // do nothing
1791 }
1792 }
1793
1794 /**
1795 * Test for NullPointerException.
1796 */
1797 @Test
1798 void testNulls() throws Exception {
1799 assertThrows(NullPointerException.class,
1800 () -> StructuredTaskScope.open(null));
1801 assertThrows(NullPointerException.class,
1802 () -> StructuredTaskScope.open(null, cf -> cf));
1803 assertThrows(NullPointerException.class,
1804 () -> StructuredTaskScope.open(Joiner.awaitAll(), null));
1805
1806 assertThrows(NullPointerException.class, () -> Joiner.allUntil(null));
1807
1808 // fork
1809 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
|