< prev index next >

test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java

Print this page

  44 import java.util.concurrent.Callable;
  45 import java.util.concurrent.ConcurrentHashMap;
  46 import java.util.concurrent.CountDownLatch;
  47 import java.util.concurrent.Executors;
  48 import java.util.concurrent.Future;
  49 import java.util.concurrent.LinkedTransferQueue;
  50 import java.util.concurrent.ThreadFactory;
  51 import java.util.concurrent.TimeUnit;
  52 import java.util.concurrent.RejectedExecutionException;
  53 import java.util.concurrent.ScheduledExecutorService;
  54 import java.util.concurrent.StructuredTaskScope;
  55 import java.util.concurrent.StructuredTaskScope.TimeoutException;
  56 import java.util.concurrent.StructuredTaskScope.Configuration;
  57 import java.util.concurrent.StructuredTaskScope.FailedException;
  58 import java.util.concurrent.StructuredTaskScope.Joiner;
  59 import java.util.concurrent.StructuredTaskScope.Subtask;
  60 import java.util.concurrent.StructureViolationException;
  61 import java.util.concurrent.atomic.AtomicBoolean;
  62 import java.util.concurrent.atomic.AtomicInteger;
  63 import java.util.concurrent.atomic.AtomicReference;
  64 import java.util.function.Function;
  65 import java.util.function.Predicate;

  66 import java.util.stream.Stream;
  67 import static java.lang.Thread.State.*;
  68 
  69 import org.junit.jupiter.api.Test;
  70 import org.junit.jupiter.api.BeforeAll;
  71 import org.junit.jupiter.api.AfterAll;
  72 import org.junit.jupiter.params.ParameterizedTest;
  73 import org.junit.jupiter.params.provider.MethodSource;
  74 import static org.junit.jupiter.api.Assertions.*;
  75 
  76 class StructuredTaskScopeTest {
  77     private static ScheduledExecutorService scheduler;
  78     private static List<ThreadFactory> threadFactories;
  79 
  80     @BeforeAll
  81     static void setup() throws Exception {
  82         scheduler = Executors.newSingleThreadScheduledExecutor();
  83 
  84         // thread factories
  85         String value = System.getProperty("threadFactory");

 192                 future.get();
 193             }
 194 
 195             // subtask cannot fork
 196             Subtask<Boolean> subtask = scope.fork(() -> {
 197                 assertThrows(WrongThreadException.class, () -> {
 198                     scope.fork(() -> null);
 199                 });
 200                 return true;
 201             });
 202             scope.join();
 203             assertTrue(subtask.get());
 204         }
 205     }
 206 
 207     /**
 208      * Test fork after join, no subtasks forked before join.
 209      */
 210     @ParameterizedTest
 211     @MethodSource("factories")
 212     void testForkAfterJoin1(ThreadFactory factory) throws Exception {
 213         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 214                 cf -> cf.withThreadFactory(factory))) {
 215             scope.join();
 216             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 217         }
 218     }
 219 
 220     /**
 221      * Test fork after join, subtasks forked before join.
 222      */
 223     @ParameterizedTest
 224     @MethodSource("factories")
 225     void testForkAfterJoin2(ThreadFactory factory) throws Exception {
 226         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 227                 cf -> cf.withThreadFactory(factory))) {
 228             scope.fork(() -> "foo");
 229             scope.join();
 230             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 231         }
 232     }
 233 
 234     /**
 235      * Test fork after join throws.
 236      */
 237     @ParameterizedTest
 238     @MethodSource("factories")
 239     void testForkAfterJoinThrows(ThreadFactory factory) throws Exception {
 240         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 241                 cf -> cf.withThreadFactory(factory))) {
 242             var latch = new CountDownLatch(1);
 243             var subtask1 = scope.fork(() -> {
 244                 latch.await();
 245                 return "foo";
 246             });
 247 
 248             // join throws
 249             Thread.currentThread().interrupt();
 250             assertThrows(InterruptedException.class, scope::join);
 251 
 252             // fork should throw
 253             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 254         }
 255     }
 256 



















 257     /**
 258      * Test fork after task scope is cancelled. This test uses a custom Joiner to
 259      * cancel execution.
 260      */
 261     @ParameterizedTest
 262     @MethodSource("factories")
 263     void testForkAfterCancel2(ThreadFactory factory) throws Exception {
 264         var countingThreadFactory = new CountingThreadFactory(factory);
 265         var testJoiner = new CancelAfterOneJoiner<String>();
 266 
 267         try (var scope = StructuredTaskScope.open(testJoiner,
 268                 cf -> cf.withThreadFactory(countingThreadFactory))) {
 269 
 270             // fork subtask, the scope should be cancelled when the subtask completes
 271             var subtask1 = scope.fork(() -> "foo");
 272             awaitCancelled(scope);
 273 
 274             assertEquals(1, countingThreadFactory.threadCount());
 275             assertEquals(1, testJoiner.onForkCount());
 276             assertEquals(1, testJoiner.onCompleteCount());

 279             var subtask2 = scope.fork(() -> "bar");
 280 
 281             // onFork should be invoked, newThread and onComplete should not be invoked
 282             assertEquals(1, countingThreadFactory.threadCount());
 283             assertEquals(2, testJoiner.onForkCount());
 284             assertEquals(1, testJoiner.onCompleteCount());
 285 
 286             scope.join();
 287 
 288             assertEquals(1, countingThreadFactory.threadCount());
 289             assertEquals(2, testJoiner.onForkCount());
 290             assertEquals(1, testJoiner.onCompleteCount());
 291             assertEquals("foo", subtask1.get());
 292             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
 293         }
 294     }
 295 
 296     /**
 297      * Test fork after task scope is closed.
 298      */
 299     @Test
 300     void testForkAfterClose() {
 301         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {


 302             scope.close();
 303             assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
 304         }
 305     }
 306 
 307     /**
 308      * Test fork with a ThreadFactory that rejects creating a thread.
 309      */
 310     @Test
 311     void testForkRejectedExecutionException() {
 312         ThreadFactory factory = task -> null;
 313         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 314                 cf -> cf.withThreadFactory(factory))) {
 315             assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null));
 316         }
 317     }
 318 
 319     /**
 320      * Test join with no subtasks.
 321      */

 365      * Test join after join completed with an exception.
 366      */
 367     @Test
 368     void testJoinAfterJoin2() throws Exception {
 369         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
 370             scope.fork(() -> { throw new FooException(); });
 371             Throwable ex = assertThrows(FailedException.class, scope::join);
 372             assertTrue(ex.getCause() instanceof FooException);
 373 
 374             // join already called
 375             for (int i = 0 ; i < 3; i++) {
 376                 assertThrows(IllegalStateException.class, scope::join);
 377             }
 378         }
 379     }
 380 
 381     /**
 382      * Test join after join completed with a timeout.
 383      */
 384     @Test
 385     void testJoinAfterJoin3() throws Exception {



























 386         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
 387                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
 388             // wait for scope to be cancelled by timeout
 389             awaitCancelled(scope);
 390             assertThrows(TimeoutException.class, scope::join);
 391 
 392             // join already called
 393             for (int i = 0 ; i < 3; i++) {
 394                 assertThrows(IllegalStateException.class, scope::join);
 395             }
 396         }
 397     }
 398 





























 399     /**
 400      * Test join method is owner confined.
 401      */
 402     @ParameterizedTest
 403     @MethodSource("factories")
 404     void testJoinConfined(ThreadFactory factory) throws Exception {
 405         try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
 406                 cf -> cf.withThreadFactory(factory))) {
 407 
 408             // random thread cannot join
 409             try (var pool = Executors.newSingleThreadExecutor()) {
 410                 Future<Void> future = pool.submit(() -> {
 411                     assertThrows(WrongThreadException.class, scope::join);
 412                     return null;
 413                 });
 414                 future.get();
 415             }
 416 
 417             // subtask cannot join
 418             Subtask<Boolean> subtask = scope.fork(() -> {
 419                 assertThrows(WrongThreadException.class, () -> { scope.join(); });
 420                 return true;
 421             });
 422             scope.join();
 423             assertTrue(subtask.get());
 424         }
 425     }
 426 
 427     /**
 428      * Test join with interrupt status set.
 429      */
 430     @ParameterizedTest
 431     @MethodSource("factories")
 432     void testInterruptJoin1(ThreadFactory factory) throws Exception {
 433         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 434                 cf -> cf.withThreadFactory(factory))) {
 435 
 436             Subtask<String> subtask = scope.fork(() -> {
 437                 Thread.sleep(60_000);
 438                 return "foo";
 439             });
 440 
 441             // join should throw
 442             Thread.currentThread().interrupt();
 443             try {
 444                 scope.join();
 445                 fail("join did not throw");
 446             } catch (InterruptedException expected) {
 447                 assertFalse(Thread.interrupted());   // interrupt status should be cleared
 448             }
 449         }
 450     }
 451 
 452     /**
 453      * Test interrupt of thread blocked in join.
 454      */
 455     @ParameterizedTest
 456     @MethodSource("factories")
 457     void testInterruptJoin2(ThreadFactory factory) throws Exception {
 458         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 459                 cf -> cf.withThreadFactory(factory))) {
 460 
 461             var latch = new CountDownLatch(1);
 462             Subtask<String> subtask = scope.fork(() -> {
 463                 Thread.sleep(60_000);
 464                 return "foo";
 465             });
 466 
 467             // interrupt main thread when it blocks in join
 468             scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join");
 469             try {
 470                 scope.join();
 471                 fail("join did not throw");
 472             } catch (InterruptedException expected) {
 473                 assertFalse(Thread.interrupted());   // interrupt status should be clear
 474             }
 475         }
 476     }
 477 
 478     /**
 479      * Test join when scope is cancelled.
 480      */
 481     @ParameterizedTest
 482     @MethodSource("factories")
 483     void testJoinWhenCancelled(ThreadFactory factory) throws Exception {

 948     @Test
 949     void testOnCompleteCancelsExecution() throws Exception {
 950         var joiner = new Joiner<String, Void>() {
 951             @Override
 952             public boolean onComplete(Subtask<? extends String> subtask) {
 953                 return true;
 954             }
 955             @Override
 956             public Void result() {
 957                 return null;
 958             }
 959         };
 960         try (var scope = StructuredTaskScope.open(joiner)) {
 961             assertFalse(scope.isCancelled());
 962             scope.fork(() -> "foo");
 963             awaitCancelled(scope);
 964             scope.join();
 965         }
 966     }
 967 




























































 968     /**
 969      * Test toString.
 970      */
 971     @Test
 972     void testToString() throws Exception {
 973         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 974                 cf -> cf.withName("duke"))) {
 975 
 976             // open
 977             assertTrue(scope.toString().contains("duke"));
 978 
 979             // closed
 980             scope.close();
 981             assertTrue(scope.toString().contains("duke"));
 982         }
 983     }
 984 
 985     /**
 986      * Test Subtask with task that completes successfully.
 987      */

1139 
1140     /**
1141      * Test Joiner.allSuccessfulOrThrow() with a subtask that complete successfully and
1142      * a subtask that fails.
1143      */
1144     @ParameterizedTest
1145     @MethodSource("factories")
1146     void testAllSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1147         try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1148                 cf -> cf.withThreadFactory(factory))) {
1149             scope.fork(() -> "foo");
1150             scope.fork(() -> { throw new FooException(); });
1151             try {
1152                 scope.join();
1153             } catch (FailedException e) {
1154                 assertTrue(e.getCause() instanceof FooException);
1155             }
1156         }
1157     }
1158 



















1159     /**
1160      * Test Joiner.anySuccessfulResultOrThrow() with no subtasks.
1161      */
1162     @Test
1163     void testAnySuccessfulResultOrThrow1() throws Exception {
1164         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
1165             try {
1166                 scope.join();
1167             } catch (FailedException e) {
1168                 assertTrue(e.getCause() instanceof NoSuchElementException);
1169             }
1170         }
1171     }
1172 
1173     /**
1174      * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully.
1175      */
1176     @ParameterizedTest
1177     @MethodSource("factories")
1178     void testAnySuccessfulResultOrThrow2(ThreadFactory factory) throws Exception {

1212             scope.fork(() -> { throw new FooException(); });
1213             String first = scope.join();
1214             assertEquals("foo", first);
1215         }
1216     }
1217 
1218     /**
1219      * Test Joiner.anySuccessfulResultOrThrow() with a subtask that fails.
1220      */
1221     @ParameterizedTest
1222     @MethodSource("factories")
1223     void testAnySuccessfulResultOrThrow5(ThreadFactory factory) throws Exception {
1224         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
1225                 cf -> cf.withThreadFactory(factory))) {
1226             scope.fork(() -> { throw new FooException(); });
1227             Throwable ex = assertThrows(FailedException.class, scope::join);
1228             assertTrue(ex.getCause() instanceof FooException);
1229         }
1230     }
1231 



















1232     /**
1233      * Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks.
1234      */
1235     @Test
1236     void testAwaitSuccessfulOrThrow1() throws Throwable {
1237         try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {
1238             var result = scope.join();
1239             assertNull(result);
1240         }
1241     }
1242 
1243     /**
1244      * Test Joiner.awaitAllSuccessfulOrThrow() with subtasks that complete successfully.
1245      */
1246     @ParameterizedTest
1247     @MethodSource("factories")
1248     void testAwaitSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
1249         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1250                 cf -> cf.withThreadFactory(factory))) {
1251             var subtask1 = scope.fork(() -> "foo");

1259 
1260     /**
1261      * Test Joiner.awaitAllSuccessfulOrThrow() with a subtask that complete successfully and
1262      * a subtask that fails.
1263      */
1264     @ParameterizedTest
1265     @MethodSource("factories")
1266     void testAwaitSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1267         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1268                 cf -> cf.withThreadFactory(factory))) {
1269             scope.fork(() -> "foo");
1270             scope.fork(() -> { throw new FooException(); });
1271             try {
1272                 scope.join();
1273             } catch (FailedException e) {
1274                 assertTrue(e.getCause() instanceof FooException);
1275             }
1276         }
1277     }
1278 



















1279     /**
1280      * Test Joiner.awaitAll() with no subtasks.
1281      */
1282     @Test
1283     void testAwaitAll1() throws Throwable {
1284         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1285             var result = scope.join();
1286             assertNull(result);
1287         }
1288     }
1289 
1290     /**
1291      * Test Joiner.awaitAll() with subtasks that complete successfully.
1292      */
1293     @ParameterizedTest
1294     @MethodSource("factories")
1295     void testAwaitAll2(ThreadFactory factory) throws Throwable {
1296         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1297                 cf -> cf.withThreadFactory(factory))) {
1298             var subtask1 = scope.fork(() -> "foo");

1305     }
1306 
1307     /**
1308      * Test Joiner.awaitAll() with a subtask that complete successfully and a subtask
1309      * that fails.
1310      */
1311     @ParameterizedTest
1312     @MethodSource("factories")
1313     void testAwaitAll3(ThreadFactory factory) throws Throwable {
1314         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1315                 cf -> cf.withThreadFactory(factory))) {
1316             var subtask1 = scope.fork(() -> "foo");
1317             var subtask2 = scope.fork(() -> { throw new FooException(); });
1318             var result = scope.join();
1319             assertNull(result);
1320             assertEquals("foo", subtask1.get());
1321             assertTrue(subtask2.exception() instanceof FooException);
1322         }
1323     }
1324 



















1325     /**
1326      * Test Joiner.allUntil(Predicate) with no subtasks.
1327      */
1328     @Test
1329     void testAllUntil1() throws Throwable {
1330         try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) {
1331             var subtasks = scope.join();
1332             assertEquals(0, subtasks.count());
1333         }
1334     }
1335 
1336     /**
1337      * Test Joiner.allUntil(Predicate) with no cancellation.
1338      */
1339     @ParameterizedTest
1340     @MethodSource("factories")
1341     void testAllUntil2(ThreadFactory factory) throws Exception {
1342         try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false),
1343                 cf -> cf.withThreadFactory(factory))) {
1344 

1420     }
1421 
1422     /**
1423      * Test Test Joiner.allUntil(Predicate) where the Predicate's test method throws.
1424      */
1425     @Test
1426     void testAllUntil5() throws Exception {
1427         var joiner = Joiner.allUntil(_ -> { throw new FooException(); });
1428         var excRef = new AtomicReference<Throwable>();
1429         Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e);
1430         ThreadFactory factory = Thread.ofVirtual()
1431                 .uncaughtExceptionHandler(uhe)
1432                 .factory();
1433         try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
1434             scope.fork(() -> "foo");
1435             scope.join();
1436             assertInstanceOf(FooException.class, excRef.get());
1437         }
1438     }
1439 



























1440     /**
1441      * Test Joiner default methods.
1442      */
1443     @Test
1444     void testJoinerDefaultMethods() throws Exception {
1445         try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) {
1446 
1447             // need subtasks to test default methods
1448             var subtask1 = scope.fork(() -> "foo");
1449             awaitCancelled(scope);
1450             var subtask2 = scope.fork(() -> "bar");
1451             scope.join();
1452 
1453             assertEquals(Subtask.State.SUCCESS, subtask1.state());
1454             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1455 
1456             // Joiner that does not override default methods
1457             Joiner<Object, Void> joiner = () -> null;
1458             assertThrows(NullPointerException.class, () -> joiner.onFork(null));
1459             assertThrows(NullPointerException.class, () -> joiner.onComplete(null));
1460             assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1));
1461             assertFalse(joiner.onFork(subtask2));
1462             assertFalse(joiner.onComplete(subtask1));
1463             assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2));

1464         }
1465     }
1466 
1467     /**
1468      * Test Joiners onFork/onComplete methods with a subtask in an unexpected state.
1469      */
1470     @Test
1471     void testJoinersWithUnavailableResult() throws Exception {
1472         try (var scope = StructuredTaskScope.open()) {
1473             var done = new CountDownLatch(1);
1474             var subtask = scope.fork(() -> {
1475                 done.await();
1476                 return null;
1477             });
1478 
1479             // onComplete with uncompleted task should throw IAE
1480             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1481             assertThrows(IllegalArgumentException.class,
1482                     () -> Joiner.allSuccessfulOrThrow().onComplete(subtask));
1483             assertThrows(IllegalArgumentException.class,

1506                     () -> Joiner.allUntil(_ -> false).onFork(subtask));
1507         }
1508 
1509     }
1510 
1511     /**
1512      * Test the Configuration function apply method throwing an exception.
1513      */
1514     @Test
1515     void testConfigFunctionThrows() throws Exception {
1516         assertThrows(FooException.class,
1517                 () -> StructuredTaskScope.open(Joiner.awaitAll(),
1518                                                cf -> { throw new FooException(); }));
1519     }
1520 
1521     /**
1522      * Test Configuration equals/hashCode/toString
1523      */
1524     @Test
1525     void testConfigMethods() throws Exception {
1526         Function<Configuration, Configuration> testConfig = cf -> {
1527             var name = "duke";
1528             var threadFactory = Thread.ofPlatform().factory();
1529             var timeout = Duration.ofSeconds(10);
1530 
1531             assertEquals(cf, cf);
1532             assertEquals(cf.withName(name), cf.withName(name));
1533             assertEquals(cf.withThreadFactory(threadFactory), cf.withThreadFactory(threadFactory));
1534             assertEquals(cf.withTimeout(timeout), cf.withTimeout(timeout));
1535 
1536             assertNotEquals(cf, cf.withName(name));
1537             assertNotEquals(cf, cf.withThreadFactory(threadFactory));
1538             assertNotEquals(cf, cf.withTimeout(timeout));
1539 
1540             assertEquals(cf.withName(name).hashCode(), cf.withName(name).hashCode());
1541             assertEquals(cf.withThreadFactory(threadFactory).hashCode(),
1542                     cf.withThreadFactory(threadFactory).hashCode());
1543             assertEquals(cf.withTimeout(timeout).hashCode(), cf.withTimeout(timeout).hashCode());
1544 
1545             assertTrue(cf.withName(name).toString().contains(name));
1546             assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
1547             assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
1548 
1549             return cf;
1550         };
1551         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), testConfig)) {
1552             // do nothing
1553         }
1554     }
1555 
1556     /**
1557      * Test for NullPointerException.
1558      */
1559     @Test
1560     void testNulls() throws Exception {
1561         assertThrows(NullPointerException.class,
1562                 () -> StructuredTaskScope.open(null));
1563         assertThrows(NullPointerException.class,
1564                 () -> StructuredTaskScope.open(null, cf -> cf));
1565         assertThrows(NullPointerException.class,
1566                 () -> StructuredTaskScope.open(Joiner.awaitAll(), null));
1567 
1568         assertThrows(NullPointerException.class, () -> Joiner.allUntil(null));
1569 
1570         // fork
1571         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {

  44 import java.util.concurrent.Callable;
  45 import java.util.concurrent.ConcurrentHashMap;
  46 import java.util.concurrent.CountDownLatch;
  47 import java.util.concurrent.Executors;
  48 import java.util.concurrent.Future;
  49 import java.util.concurrent.LinkedTransferQueue;
  50 import java.util.concurrent.ThreadFactory;
  51 import java.util.concurrent.TimeUnit;
  52 import java.util.concurrent.RejectedExecutionException;
  53 import java.util.concurrent.ScheduledExecutorService;
  54 import java.util.concurrent.StructuredTaskScope;
  55 import java.util.concurrent.StructuredTaskScope.TimeoutException;
  56 import java.util.concurrent.StructuredTaskScope.Configuration;
  57 import java.util.concurrent.StructuredTaskScope.FailedException;
  58 import java.util.concurrent.StructuredTaskScope.Joiner;
  59 import java.util.concurrent.StructuredTaskScope.Subtask;
  60 import java.util.concurrent.StructureViolationException;
  61 import java.util.concurrent.atomic.AtomicBoolean;
  62 import java.util.concurrent.atomic.AtomicInteger;
  63 import java.util.concurrent.atomic.AtomicReference;

  64 import java.util.function.Predicate;
  65 import java.util.function.UnaryOperator;
  66 import java.util.stream.Stream;
  67 import static java.lang.Thread.State.*;
  68 
  69 import org.junit.jupiter.api.Test;
  70 import org.junit.jupiter.api.BeforeAll;
  71 import org.junit.jupiter.api.AfterAll;
  72 import org.junit.jupiter.params.ParameterizedTest;
  73 import org.junit.jupiter.params.provider.MethodSource;
  74 import static org.junit.jupiter.api.Assertions.*;
  75 
  76 class StructuredTaskScopeTest {
  77     private static ScheduledExecutorService scheduler;
  78     private static List<ThreadFactory> threadFactories;
  79 
  80     @BeforeAll
  81     static void setup() throws Exception {
  82         scheduler = Executors.newSingleThreadScheduledExecutor();
  83 
  84         // thread factories
  85         String value = System.getProperty("threadFactory");

 192                 future.get();
 193             }
 194 
 195             // subtask cannot fork
 196             Subtask<Boolean> subtask = scope.fork(() -> {
 197                 assertThrows(WrongThreadException.class, () -> {
 198                     scope.fork(() -> null);
 199                 });
 200                 return true;
 201             });
 202             scope.join();
 203             assertTrue(subtask.get());
 204         }
 205     }
 206 
 207     /**
 208      * Test fork after join, no subtasks forked before join.
 209      */
 210     @ParameterizedTest
 211     @MethodSource("factories")
 212     void testForkAfterJoinCompleted1(ThreadFactory factory) throws Exception {
 213         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 214                 cf -> cf.withThreadFactory(factory))) {
 215             scope.join();
 216             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 217         }
 218     }
 219 
 220     /**
 221      * Test fork after join, subtasks forked before join.
 222      */
 223     @ParameterizedTest
 224     @MethodSource("factories")
 225     void testForkAfterJoinCompleted2(ThreadFactory factory) throws Exception {
 226         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 227                 cf -> cf.withThreadFactory(factory))) {
 228             scope.fork(() -> "foo");
 229             scope.join();
 230             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 231         }
 232     }
 233 
 234     /**
 235      * Test fork after join interrupted.
 236      */
 237     @ParameterizedTest
 238     @MethodSource("factories")
 239     void testForkAfterJoinInterrupted(ThreadFactory factory) throws Exception {
 240         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 241                 cf -> cf.withThreadFactory(factory))) {

 242             var subtask1 = scope.fork(() -> {
 243                 Thread.sleep(Duration.ofDays(1));
 244                 return "foo";
 245             });
 246 
 247             // join throws
 248             Thread.currentThread().interrupt();
 249             assertThrows(InterruptedException.class, scope::join);
 250 
 251             // fork should throw
 252             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 253         }
 254     }
 255 
 256     /**
 257      * Test fork after join timeout.
 258      */
 259     @ParameterizedTest
 260     @MethodSource("factories")
 261     void testForkAfterJoinTimeout(ThreadFactory factory) throws Exception {
 262         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 263                 cf -> cf.withThreadFactory(factory)
 264                         .withTimeout(Duration.ofMillis(100)))) {
 265             awaitCancelled(scope);
 266 
 267             // join throws
 268             assertThrows(TimeoutException.class, scope::join);
 269 
 270             // fork should throw
 271             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 272         }
 273     }
 274 
 275     /**
 276      * Test fork after task scope is cancelled. This test uses a custom Joiner to
 277      * cancel execution.
 278      */
 279     @ParameterizedTest
 280     @MethodSource("factories")
 281     void testForkAfterCancel2(ThreadFactory factory) throws Exception {
 282         var countingThreadFactory = new CountingThreadFactory(factory);
 283         var testJoiner = new CancelAfterOneJoiner<String>();
 284 
 285         try (var scope = StructuredTaskScope.open(testJoiner,
 286                 cf -> cf.withThreadFactory(countingThreadFactory))) {
 287 
 288             // fork subtask, the scope should be cancelled when the subtask completes
 289             var subtask1 = scope.fork(() -> "foo");
 290             awaitCancelled(scope);
 291 
 292             assertEquals(1, countingThreadFactory.threadCount());
 293             assertEquals(1, testJoiner.onForkCount());
 294             assertEquals(1, testJoiner.onCompleteCount());

 297             var subtask2 = scope.fork(() -> "bar");
 298 
 299             // onFork should be invoked, newThread and onComplete should not be invoked
 300             assertEquals(1, countingThreadFactory.threadCount());
 301             assertEquals(2, testJoiner.onForkCount());
 302             assertEquals(1, testJoiner.onCompleteCount());
 303 
 304             scope.join();
 305 
 306             assertEquals(1, countingThreadFactory.threadCount());
 307             assertEquals(2, testJoiner.onForkCount());
 308             assertEquals(1, testJoiner.onCompleteCount());
 309             assertEquals("foo", subtask1.get());
 310             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
 311         }
 312     }
 313 
 314     /**
 315      * Test fork after task scope is closed.
 316      */
 317     @ParameterizedTest
 318     @MethodSource("factories")
 319     void testForkAfterClose(ThreadFactory factory) {
 320         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 321                 cf -> cf.withThreadFactory(factory))) {
 322             scope.close();
 323             assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
 324         }
 325     }
 326 
 327     /**
 328      * Test fork with a ThreadFactory that rejects creating a thread.
 329      */
 330     @Test
 331     void testForkRejectedExecutionException() {
 332         ThreadFactory factory = task -> null;
 333         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 334                 cf -> cf.withThreadFactory(factory))) {
 335             assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null));
 336         }
 337     }
 338 
 339     /**
 340      * Test join with no subtasks.
 341      */

 385      * Test join after join completed with an exception.
 386      */
 387     @Test
 388     void testJoinAfterJoin2() throws Exception {
 389         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
 390             scope.fork(() -> { throw new FooException(); });
 391             Throwable ex = assertThrows(FailedException.class, scope::join);
 392             assertTrue(ex.getCause() instanceof FooException);
 393 
 394             // join already called
 395             for (int i = 0 ; i < 3; i++) {
 396                 assertThrows(IllegalStateException.class, scope::join);
 397             }
 398         }
 399     }
 400 
 401     /**
 402      * Test join after join completed with a timeout.
 403      */
 404     @Test
 405     void testJoinAfterJoinInterrupted() throws Exception {
 406         try (var scope = StructuredTaskScope.open()) {
 407             var latch = new CountDownLatch(1);
 408             var subtask = scope.fork(() -> {
 409                 latch.await();
 410                 return "foo";
 411             });
 412 
 413             // join throws InterruptedException
 414             Thread.currentThread().interrupt();
 415             assertThrows(InterruptedException.class, scope::join);
 416 
 417             latch.countDown();
 418 
 419             // retry join to get result
 420             scope.join();
 421             assertEquals("foo", subtask.get());
 422 
 423             // retry after otbaining result
 424             assertThrows(IllegalStateException.class, scope::join);
 425         }
 426     }
 427 
 428     /**
 429      * Test join after join completed with a timeout.
 430      */
 431     @Test
 432     void testJoinAfterJoinTimeout() throws Exception {
 433         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
 434                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
 435             // wait for scope to be cancelled by timeout
 436             awaitCancelled(scope);
 437             assertThrows(TimeoutException.class, scope::join);
 438 
 439             // join already called
 440             for (int i = 0 ; i < 3; i++) {
 441                 assertThrows(IllegalStateException.class, scope::join);
 442             }
 443         }
 444     }
 445 
 446     /**
 447      * Test join invoked from Joiner.onTimeout.
 448      */
 449     @Test
 450     void testJoinInOnTimeout() throws Exception {
 451         Thread owner = Thread.currentThread();
 452         var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>();
 453 
 454         var joiner = new Joiner<String, Void>() {
 455             @Override
 456             public void onTimeout() {
 457                 assertTrue(Thread.currentThread() == owner);
 458                 var scope = scopeRef.get();
 459                 assertThrows(IllegalStateException.class, scope::join);
 460             }
 461             @Override
 462             public Void result() {
 463                 return null;
 464             }
 465         };
 466 
 467         try (var scope = StructuredTaskScope.open(joiner,
 468                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
 469             awaitCancelled(scope);
 470             scopeRef.set(scope);
 471             scope.join();  // invokes onTimeout
 472         }
 473     }
 474 
 475     /**
 476      * Test join method is owner confined.
 477      */
 478     @ParameterizedTest
 479     @MethodSource("factories")
 480     void testJoinConfined(ThreadFactory factory) throws Exception {
 481         try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
 482                 cf -> cf.withThreadFactory(factory))) {
 483 
 484             // random thread cannot join
 485             try (var pool = Executors.newSingleThreadExecutor()) {
 486                 Future<Void> future = pool.submit(() -> {
 487                     assertThrows(WrongThreadException.class, scope::join);
 488                     return null;
 489                 });
 490                 future.get();
 491             }
 492 
 493             // subtask cannot join
 494             Subtask<Boolean> subtask = scope.fork(() -> {
 495                 assertThrows(WrongThreadException.class, () -> { scope.join(); });
 496                 return true;
 497             });
 498             scope.join();
 499             assertTrue(subtask.get());
 500         }
 501     }
 502 
 503     /**
 504      * Test join with interrupt status set.
 505      */
 506     @ParameterizedTest
 507     @MethodSource("factories")
 508     void testInterruptJoin1(ThreadFactory factory) throws Exception {
 509         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 510                 cf -> cf.withThreadFactory(factory))) {
 511 
 512             Subtask<String> subtask = scope.fork(() -> {
 513                 Thread.sleep(Duration.ofDays(1));
 514                 return "foo";
 515             });
 516 
 517             // join should throw
 518             Thread.currentThread().interrupt();
 519             try {
 520                 scope.join();
 521                 fail("join did not throw");
 522             } catch (InterruptedException expected) {
 523                 assertFalse(Thread.interrupted());   // interrupt status should be cleared
 524             }
 525         }
 526     }
 527 
 528     /**
 529      * Test interrupt of thread blocked in join.
 530      */
 531     @ParameterizedTest
 532     @MethodSource("factories")
 533     void testInterruptJoin2(ThreadFactory factory) throws Exception {
 534         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 535                 cf -> cf.withThreadFactory(factory))) {


 536             Subtask<String> subtask = scope.fork(() -> {
 537                 Thread.sleep(Duration.ofDays(1));
 538                 return "foo";
 539             });
 540 
 541             // interrupt main thread when it blocks in join
 542             scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join");
 543             try {
 544                 scope.join();
 545                 fail("join did not throw");
 546             } catch (InterruptedException expected) {
 547                 assertFalse(Thread.interrupted());   // interrupt status should be clear
 548             }
 549         }
 550     }
 551 
 552     /**
 553      * Test join when scope is cancelled.
 554      */
 555     @ParameterizedTest
 556     @MethodSource("factories")
 557     void testJoinWhenCancelled(ThreadFactory factory) throws Exception {

1022     @Test
1023     void testOnCompleteCancelsExecution() throws Exception {
1024         var joiner = new Joiner<String, Void>() {
1025             @Override
1026             public boolean onComplete(Subtask<? extends String> subtask) {
1027                 return true;
1028             }
1029             @Override
1030             public Void result() {
1031                 return null;
1032             }
1033         };
1034         try (var scope = StructuredTaskScope.open(joiner)) {
1035             assertFalse(scope.isCancelled());
1036             scope.fork(() -> "foo");
1037             awaitCancelled(scope);
1038             scope.join();
1039         }
1040     }
1041 
1042     /**
1043      * Test Joiner.onTimeout invoked by owner thread when timeout expires.
1044      */
1045     @Test
1046     void testOnTimeoutInvoked() throws Exception {
1047         var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>();
1048         Thread owner = Thread.currentThread();
1049         var invokeCount = new AtomicInteger();
1050         var joiner = new Joiner<String, Void>() {
1051             @Override
1052             public void onTimeout() {
1053                 assertTrue(Thread.currentThread() == owner);
1054                 assertTrue(scopeRef.get().isCancelled());
1055                 invokeCount.incrementAndGet();
1056             }
1057             @Override
1058             public Void result() {
1059                 return null;
1060             }
1061         };
1062         try (var scope = StructuredTaskScope.open(joiner,
1063                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1064             scopeRef.set(scope);
1065             scope.fork(() -> {
1066                 Thread.sleep(Duration.ofDays(1));
1067                 return null;
1068             });
1069             scope.join();
1070             assertEquals(1, invokeCount.get());
1071         }
1072     }
1073 
1074     /**
1075      * Test Joiner.onTimeout throwing an excepiton.
1076      */
1077     @Test
1078     void testOnTimeoutThrows() throws Exception {
1079         var joiner = new Joiner<String, Void>() {
1080             @Override
1081             public void onTimeout() {
1082                 throw new FooException();
1083             }
1084             @Override
1085             public Void result() {
1086                 return null;
1087             }
1088         };
1089         try (var scope = StructuredTaskScope.open(joiner,
1090                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1091             // wait for scope to be cancelled by timeout
1092             awaitCancelled(scope);
1093 
1094             // join should throw FooException on first usage
1095             assertThrows(FooException.class, scope::join);
1096 
1097             // retry after onTimeout fails
1098             assertThrows(IllegalStateException.class, scope::join);
1099         }
1100     }
1101 
1102     /**
1103      * Test toString.
1104      */
1105     @Test
1106     void testToString() throws Exception {
1107         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
1108                 cf -> cf.withName("duke"))) {
1109 
1110             // open
1111             assertTrue(scope.toString().contains("duke"));
1112 
1113             // closed
1114             scope.close();
1115             assertTrue(scope.toString().contains("duke"));
1116         }
1117     }
1118 
1119     /**
1120      * Test Subtask with task that completes successfully.
1121      */

1273 
1274     /**
1275      * Test Joiner.allSuccessfulOrThrow() with a subtask that complete successfully and
1276      * a subtask that fails.
1277      */
1278     @ParameterizedTest
1279     @MethodSource("factories")
1280     void testAllSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1281         try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1282                 cf -> cf.withThreadFactory(factory))) {
1283             scope.fork(() -> "foo");
1284             scope.fork(() -> { throw new FooException(); });
1285             try {
1286                 scope.join();
1287             } catch (FailedException e) {
1288                 assertTrue(e.getCause() instanceof FooException);
1289             }
1290         }
1291     }
1292 
1293     /**
1294      * Test Joiner.allSuccessfulOrThrow() with a timeout.
1295      */
1296     @Test
1297     void testAllSuccessfulOrThrow4() throws Exception {
1298         try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1299                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1300             scope.fork(() -> "foo");
1301             scope.fork(() -> {
1302                 Thread.sleep(Duration.ofDays(1));
1303                 return "bar";
1304             });
1305             assertThrows(TimeoutException.class, scope::join);
1306 
1307             // retry after join throws TimeoutException
1308             assertThrows(IllegalStateException.class, scope::join);
1309         }
1310     }
1311 
1312     /**
1313      * Test Joiner.anySuccessfulResultOrThrow() with no subtasks.
1314      */
1315     @Test
1316     void testAnySuccessfulResultOrThrow1() throws Exception {
1317         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
1318             try {
1319                 scope.join();
1320             } catch (FailedException e) {
1321                 assertTrue(e.getCause() instanceof NoSuchElementException);
1322             }
1323         }
1324     }
1325 
1326     /**
1327      * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully.
1328      */
1329     @ParameterizedTest
1330     @MethodSource("factories")
1331     void testAnySuccessfulResultOrThrow2(ThreadFactory factory) throws Exception {

1365             scope.fork(() -> { throw new FooException(); });
1366             String first = scope.join();
1367             assertEquals("foo", first);
1368         }
1369     }
1370 
1371     /**
1372      * Test Joiner.anySuccessfulResultOrThrow() with a subtask that fails.
1373      */
1374     @ParameterizedTest
1375     @MethodSource("factories")
1376     void testAnySuccessfulResultOrThrow5(ThreadFactory factory) throws Exception {
1377         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
1378                 cf -> cf.withThreadFactory(factory))) {
1379             scope.fork(() -> { throw new FooException(); });
1380             Throwable ex = assertThrows(FailedException.class, scope::join);
1381             assertTrue(ex.getCause() instanceof FooException);
1382         }
1383     }
1384 
1385     /**
1386      * Test Joiner.allSuccessfulOrThrow() with a timeout.
1387      */
1388     @Test
1389     void anySuccessfulResultOrThrow6() throws Exception {
1390         try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
1391                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1392             scope.fork(() -> { throw new FooException(); });
1393             scope.fork(() -> {
1394                 Thread.sleep(Duration.ofDays(1));
1395                 return "bar";
1396             });
1397             assertThrows(TimeoutException.class, scope::join);
1398 
1399             // retry after join throws TimeoutException
1400             assertThrows(IllegalStateException.class, scope::join);
1401         }
1402     }
1403 
1404     /**
1405      * Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks.
1406      */
1407     @Test
1408     void testAwaitSuccessfulOrThrow1() throws Throwable {
1409         try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {
1410             var result = scope.join();
1411             assertNull(result);
1412         }
1413     }
1414 
1415     /**
1416      * Test Joiner.awaitAllSuccessfulOrThrow() with subtasks that complete successfully.
1417      */
1418     @ParameterizedTest
1419     @MethodSource("factories")
1420     void testAwaitSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
1421         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1422                 cf -> cf.withThreadFactory(factory))) {
1423             var subtask1 = scope.fork(() -> "foo");

1431 
1432     /**
1433      * Test Joiner.awaitAllSuccessfulOrThrow() with a subtask that complete successfully and
1434      * a subtask that fails.
1435      */
1436     @ParameterizedTest
1437     @MethodSource("factories")
1438     void testAwaitSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1439         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1440                 cf -> cf.withThreadFactory(factory))) {
1441             scope.fork(() -> "foo");
1442             scope.fork(() -> { throw new FooException(); });
1443             try {
1444                 scope.join();
1445             } catch (FailedException e) {
1446                 assertTrue(e.getCause() instanceof FooException);
1447             }
1448         }
1449     }
1450 
1451     /**
1452      * Test Joiner.awaitAllSuccessfulOrThrow() with a timeout.
1453      */
1454     @Test
1455     void testAwaitSuccessfulOrThrow4() throws Exception {
1456         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1457                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1458             scope.fork(() -> "foo");
1459             scope.fork(() -> {
1460                 Thread.sleep(Duration.ofDays(1));
1461                 return "bar";
1462             });
1463             assertThrows(TimeoutException.class, scope::join);
1464 
1465             // retry after join throws TimeoutException
1466             assertThrows(IllegalStateException.class, scope::join);
1467         }
1468     }
1469 
1470     /**
1471      * Test Joiner.awaitAll() with no subtasks.
1472      */
1473     @Test
1474     void testAwaitAll1() throws Throwable {
1475         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1476             var result = scope.join();
1477             assertNull(result);
1478         }
1479     }
1480 
1481     /**
1482      * Test Joiner.awaitAll() with subtasks that complete successfully.
1483      */
1484     @ParameterizedTest
1485     @MethodSource("factories")
1486     void testAwaitAll2(ThreadFactory factory) throws Throwable {
1487         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1488                 cf -> cf.withThreadFactory(factory))) {
1489             var subtask1 = scope.fork(() -> "foo");

1496     }
1497 
1498     /**
1499      * Test Joiner.awaitAll() with a subtask that complete successfully and a subtask
1500      * that fails.
1501      */
1502     @ParameterizedTest
1503     @MethodSource("factories")
1504     void testAwaitAll3(ThreadFactory factory) throws Throwable {
1505         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1506                 cf -> cf.withThreadFactory(factory))) {
1507             var subtask1 = scope.fork(() -> "foo");
1508             var subtask2 = scope.fork(() -> { throw new FooException(); });
1509             var result = scope.join();
1510             assertNull(result);
1511             assertEquals("foo", subtask1.get());
1512             assertTrue(subtask2.exception() instanceof FooException);
1513         }
1514     }
1515 
1516     /**
1517      * Test Joiner.awaitAll() with a timeout.
1518      */
1519     @Test
1520     void testAwaitAll4() throws Exception {
1521         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1522                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1523             scope.fork(() -> "foo");
1524             scope.fork(() -> {
1525                 Thread.sleep(Duration.ofDays(1));
1526                 return "bar";
1527             });
1528             assertThrows(TimeoutException.class, scope::join);
1529 
1530             // retry after join throws TimeoutException
1531             assertThrows(IllegalStateException.class, scope::join);
1532         }
1533     }
1534 
1535     /**
1536      * Test Joiner.allUntil(Predicate) with no subtasks.
1537      */
1538     @Test
1539     void testAllUntil1() throws Throwable {
1540         try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) {
1541             var subtasks = scope.join();
1542             assertEquals(0, subtasks.count());
1543         }
1544     }
1545 
1546     /**
1547      * Test Joiner.allUntil(Predicate) with no cancellation.
1548      */
1549     @ParameterizedTest
1550     @MethodSource("factories")
1551     void testAllUntil2(ThreadFactory factory) throws Exception {
1552         try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false),
1553                 cf -> cf.withThreadFactory(factory))) {
1554 

1630     }
1631 
1632     /**
1633      * Test Test Joiner.allUntil(Predicate) where the Predicate's test method throws.
1634      */
1635     @Test
1636     void testAllUntil5() throws Exception {
1637         var joiner = Joiner.allUntil(_ -> { throw new FooException(); });
1638         var excRef = new AtomicReference<Throwable>();
1639         Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e);
1640         ThreadFactory factory = Thread.ofVirtual()
1641                 .uncaughtExceptionHandler(uhe)
1642                 .factory();
1643         try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
1644             scope.fork(() -> "foo");
1645             scope.join();
1646             assertInstanceOf(FooException.class, excRef.get());
1647         }
1648     }
1649 
1650     /**
1651      * Test Joiner.allUntil(Predicate) with a timeout.
1652      */
1653     @Test
1654     void testAllUntil6() throws Exception {
1655         try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false),
1656                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1657             var subtask1 = scope.fork(() -> "foo");
1658             var subtask2 = scope.fork(() -> {
1659                 Thread.sleep(Duration.ofDays(1));
1660                 return "bar";
1661             });
1662 
1663             // TimeoutException should not be thrown
1664             var subtasks = scope.join().toList();
1665 
1666             // stream should have two elements, subtask1 may or may not have completed
1667             assertEquals(2, subtasks.size());
1668             assertSame(subtask1, subtasks.get(0));
1669             assertSame(subtask2, subtasks.get(1));
1670             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1671 
1672             // retry after join throws TimeoutException
1673             assertThrows(IllegalStateException.class, scope::join);
1674         }
1675     }
1676 
1677     /**
1678      * Test Joiner default methods.
1679      */
1680     @Test
1681     void testJoinerDefaultMethods() throws Exception {
1682         try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) {
1683 
1684             // need subtasks to test default methods
1685             var subtask1 = scope.fork(() -> "foo");
1686             awaitCancelled(scope);
1687             var subtask2 = scope.fork(() -> "bar");
1688             scope.join();
1689 
1690             assertEquals(Subtask.State.SUCCESS, subtask1.state());
1691             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1692 
1693             // Joiner that does not override default methods
1694             Joiner<Object, Void> joiner = () -> null;
1695             assertThrows(NullPointerException.class, () -> joiner.onFork(null));
1696             assertThrows(NullPointerException.class, () -> joiner.onComplete(null));
1697             assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1));
1698             assertFalse(joiner.onFork(subtask2));
1699             assertFalse(joiner.onComplete(subtask1));
1700             assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2));
1701             assertThrows(TimeoutException.class, joiner::onTimeout);
1702         }
1703     }
1704 
1705     /**
1706      * Test Joiners onFork/onComplete methods with a subtask in an unexpected state.
1707      */
1708     @Test
1709     void testJoinersWithUnavailableResult() throws Exception {
1710         try (var scope = StructuredTaskScope.open()) {
1711             var done = new CountDownLatch(1);
1712             var subtask = scope.fork(() -> {
1713                 done.await();
1714                 return null;
1715             });
1716 
1717             // onComplete with uncompleted task should throw IAE
1718             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1719             assertThrows(IllegalArgumentException.class,
1720                     () -> Joiner.allSuccessfulOrThrow().onComplete(subtask));
1721             assertThrows(IllegalArgumentException.class,

1744                     () -> Joiner.allUntil(_ -> false).onFork(subtask));
1745         }
1746 
1747     }
1748 
1749     /**
1750      * Test the Configuration function apply method throwing an exception.
1751      */
1752     @Test
1753     void testConfigFunctionThrows() throws Exception {
1754         assertThrows(FooException.class,
1755                 () -> StructuredTaskScope.open(Joiner.awaitAll(),
1756                                                cf -> { throw new FooException(); }));
1757     }
1758 
1759     /**
1760      * Test Configuration equals/hashCode/toString
1761      */
1762     @Test
1763     void testConfigMethods() throws Exception {
1764         UnaryOperator<Configuration> configOperator = cf -> {
1765             var name = "duke";
1766             var threadFactory = Thread.ofPlatform().factory();
1767             var timeout = Duration.ofSeconds(10);
1768 
1769             assertEquals(cf, cf);
1770             assertEquals(cf.withName(name), cf.withName(name));
1771             assertEquals(cf.withThreadFactory(threadFactory), cf.withThreadFactory(threadFactory));
1772             assertEquals(cf.withTimeout(timeout), cf.withTimeout(timeout));
1773 
1774             assertNotEquals(cf, cf.withName(name));
1775             assertNotEquals(cf, cf.withThreadFactory(threadFactory));
1776             assertNotEquals(cf, cf.withTimeout(timeout));
1777 
1778             assertEquals(cf.withName(name).hashCode(), cf.withName(name).hashCode());
1779             assertEquals(cf.withThreadFactory(threadFactory).hashCode(),
1780                     cf.withThreadFactory(threadFactory).hashCode());
1781             assertEquals(cf.withTimeout(timeout).hashCode(), cf.withTimeout(timeout).hashCode());
1782 
1783             assertTrue(cf.withName(name).toString().contains(name));
1784             assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
1785             assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
1786 
1787             return cf;
1788         };
1789         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), configOperator)) {
1790             // do nothing
1791         }
1792     }
1793 
1794     /**
1795      * Test for NullPointerException.
1796      */
1797     @Test
1798     void testNulls() throws Exception {
1799         assertThrows(NullPointerException.class,
1800                 () -> StructuredTaskScope.open(null));
1801         assertThrows(NullPointerException.class,
1802                 () -> StructuredTaskScope.open(null, cf -> cf));
1803         assertThrows(NullPointerException.class,
1804                 () -> StructuredTaskScope.open(Joiner.awaitAll(), null));
1805 
1806         assertThrows(NullPointerException.class, () -> Joiner.allUntil(null));
1807 
1808         // fork
1809         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
< prev index next >