< prev index next >

test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java

Print this page

  44 import java.util.concurrent.Callable;
  45 import java.util.concurrent.ConcurrentHashMap;
  46 import java.util.concurrent.CountDownLatch;
  47 import java.util.concurrent.Executors;
  48 import java.util.concurrent.Future;
  49 import java.util.concurrent.LinkedTransferQueue;
  50 import java.util.concurrent.ThreadFactory;
  51 import java.util.concurrent.TimeUnit;
  52 import java.util.concurrent.RejectedExecutionException;
  53 import java.util.concurrent.ScheduledExecutorService;
  54 import java.util.concurrent.StructuredTaskScope;
  55 import java.util.concurrent.StructuredTaskScope.TimeoutException;
  56 import java.util.concurrent.StructuredTaskScope.Configuration;
  57 import java.util.concurrent.StructuredTaskScope.FailedException;
  58 import java.util.concurrent.StructuredTaskScope.Joiner;
  59 import java.util.concurrent.StructuredTaskScope.Subtask;
  60 import java.util.concurrent.StructureViolationException;
  61 import java.util.concurrent.atomic.AtomicBoolean;
  62 import java.util.concurrent.atomic.AtomicInteger;
  63 import java.util.concurrent.atomic.AtomicReference;
  64 import java.util.function.Function;
  65 import java.util.function.Predicate;

  66 import java.util.stream.Stream;
  67 import static java.lang.Thread.State.*;
  68 
  69 import org.junit.jupiter.api.Test;
  70 import org.junit.jupiter.api.BeforeAll;
  71 import org.junit.jupiter.api.AfterAll;
  72 import org.junit.jupiter.params.ParameterizedTest;
  73 import org.junit.jupiter.params.provider.MethodSource;
  74 import static org.junit.jupiter.api.Assertions.*;
  75 
  76 class StructuredTaskScopeTest {
  77     private static ScheduledExecutorService scheduler;
  78     private static List<ThreadFactory> threadFactories;
  79 
  80     @BeforeAll
  81     static void setup() throws Exception {
  82         scheduler = Executors.newSingleThreadScheduledExecutor();
  83 
  84         // thread factories
  85         String value = System.getProperty("threadFactory");

 192                 future.get();
 193             }
 194 
 195             // subtask cannot fork
 196             Subtask<Boolean> subtask = scope.fork(() -> {
 197                 assertThrows(WrongThreadException.class, () -> {
 198                     scope.fork(() -> null);
 199                 });
 200                 return true;
 201             });
 202             scope.join();
 203             assertTrue(subtask.get());
 204         }
 205     }
 206 
 207     /**
 208      * Test fork after join, no subtasks forked before join.
 209      */
 210     @ParameterizedTest
 211     @MethodSource("factories")
 212     void testForkAfterJoin1(ThreadFactory factory) throws Exception {
 213         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 214                 cf -> cf.withThreadFactory(factory))) {
 215             scope.join();
 216             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 217         }
 218     }
 219 
 220     /**
 221      * Test fork after join, subtasks forked before join.
 222      */
 223     @ParameterizedTest
 224     @MethodSource("factories")
 225     void testForkAfterJoin2(ThreadFactory factory) throws Exception {
 226         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 227                 cf -> cf.withThreadFactory(factory))) {
 228             scope.fork(() -> "foo");
 229             scope.join();
 230             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 231         }
 232     }
 233 
 234     /**
 235      * Test fork after join throws.
 236      */
 237     @ParameterizedTest
 238     @MethodSource("factories")
 239     void testForkAfterJoinThrows(ThreadFactory factory) throws Exception {
 240         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 241                 cf -> cf.withThreadFactory(factory))) {
 242             var latch = new CountDownLatch(1);
 243             var subtask1 = scope.fork(() -> {
 244                 latch.await();
 245                 return "foo";
 246             });
 247 
 248             // join throws
 249             Thread.currentThread().interrupt();
 250             assertThrows(InterruptedException.class, scope::join);
 251 
 252             // fork should throw
 253             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 254         }
 255     }
 256 



















 257     /**
 258      * Test fork after task scope is cancelled. This test uses a custom Joiner to
 259      * cancel execution.
 260      */
 261     @ParameterizedTest
 262     @MethodSource("factories")
 263     void testForkAfterCancel2(ThreadFactory factory) throws Exception {
 264         var countingThreadFactory = new CountingThreadFactory(factory);
 265         var testJoiner = new CancelAfterOneJoiner<String>();
 266 
 267         try (var scope = StructuredTaskScope.open(testJoiner,
 268                 cf -> cf.withThreadFactory(countingThreadFactory))) {
 269 
 270             // fork subtask, the scope should be cancelled when the subtask completes
 271             var subtask1 = scope.fork(() -> "foo");
 272             awaitCancelled(scope);
 273 
 274             assertEquals(1, countingThreadFactory.threadCount());
 275             assertEquals(1, testJoiner.onForkCount());
 276             assertEquals(1, testJoiner.onCompleteCount());

 279             var subtask2 = scope.fork(() -> "bar");
 280 
 281             // onFork should be invoked, newThread and onComplete should not be invoked
 282             assertEquals(1, countingThreadFactory.threadCount());
 283             assertEquals(2, testJoiner.onForkCount());
 284             assertEquals(1, testJoiner.onCompleteCount());
 285 
 286             scope.join();
 287 
 288             assertEquals(1, countingThreadFactory.threadCount());
 289             assertEquals(2, testJoiner.onForkCount());
 290             assertEquals(1, testJoiner.onCompleteCount());
 291             assertEquals("foo", subtask1.get());
 292             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
 293         }
 294     }
 295 
 296     /**
 297      * Test fork after task scope is closed.
 298      */
 299     @Test
 300     void testForkAfterClose() {
 301         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {


 302             scope.close();
 303             assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
 304         }
 305     }
 306 
 307     /**
 308      * Test fork with a ThreadFactory that rejects creating a thread.
 309      */
 310     @Test
 311     void testForkRejectedExecutionException() {
 312         ThreadFactory factory = task -> null;
 313         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 314                 cf -> cf.withThreadFactory(factory))) {
 315             assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null));
 316         }
 317     }
 318 
 319     /**
 320      * Test join with no subtasks.
 321      */

 349     @Test
 350     void testJoinAfterJoin1() throws Exception {
 351         var results = new LinkedTransferQueue<>(List.of("foo", "bar", "baz"));
 352         Joiner<Object, String> joiner = results::take;
 353         try (var scope = StructuredTaskScope.open(joiner)) {
 354             scope.fork(() -> "foo");
 355             assertEquals("foo", scope.join());
 356 
 357             // join already called
 358             for (int i = 0 ; i < 3; i++) {
 359                 assertThrows(IllegalStateException.class, scope::join);
 360             }
 361         }
 362     }
 363 
 364     /**
 365      * Test join after join completed with an exception.
 366      */
 367     @Test
 368     void testJoinAfterJoin2() throws Exception {
 369         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
 370             scope.fork(() -> { throw new FooException(); });
 371             Throwable ex = assertThrows(FailedException.class, scope::join);
 372             assertTrue(ex.getCause() instanceof FooException);
 373 
 374             // join already called
 375             for (int i = 0 ; i < 3; i++) {
 376                 assertThrows(IllegalStateException.class, scope::join);
 377             }
 378         }
 379     }
 380 



























 381     /**
 382      * Test join after join completed with a timeout.
 383      */
 384     @Test
 385     void testJoinAfterJoin3() throws Exception {
 386         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
 387                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
 388             // wait for scope to be cancelled by timeout
 389             awaitCancelled(scope);
 390             assertThrows(TimeoutException.class, scope::join);
 391 
 392             // join already called
 393             for (int i = 0 ; i < 3; i++) {
 394                 assertThrows(IllegalStateException.class, scope::join);
 395             }
 396         }
 397     }
 398 





























 399     /**
 400      * Test join method is owner confined.
 401      */
 402     @ParameterizedTest
 403     @MethodSource("factories")
 404     void testJoinConfined(ThreadFactory factory) throws Exception {
 405         try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
 406                 cf -> cf.withThreadFactory(factory))) {
 407 
 408             // random thread cannot join
 409             try (var pool = Executors.newSingleThreadExecutor()) {
 410                 Future<Void> future = pool.submit(() -> {
 411                     assertThrows(WrongThreadException.class, scope::join);
 412                     return null;
 413                 });
 414                 future.get();
 415             }
 416 
 417             // subtask cannot join
 418             Subtask<Boolean> subtask = scope.fork(() -> {
 419                 assertThrows(WrongThreadException.class, () -> { scope.join(); });
 420                 return true;
 421             });
 422             scope.join();
 423             assertTrue(subtask.get());
 424         }
 425     }
 426 
 427     /**
 428      * Test join with interrupt status set.
 429      */
 430     @ParameterizedTest
 431     @MethodSource("factories")
 432     void testInterruptJoin1(ThreadFactory factory) throws Exception {
 433         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 434                 cf -> cf.withThreadFactory(factory))) {
 435 
 436             Subtask<String> subtask = scope.fork(() -> {
 437                 Thread.sleep(60_000);
 438                 return "foo";
 439             });
 440 
 441             // join should throw
 442             Thread.currentThread().interrupt();
 443             try {
 444                 scope.join();
 445                 fail("join did not throw");
 446             } catch (InterruptedException expected) {
 447                 assertFalse(Thread.interrupted());   // interrupt status should be cleared
 448             }
 449         }
 450     }
 451 
 452     /**
 453      * Test interrupt of thread blocked in join.
 454      */
 455     @ParameterizedTest
 456     @MethodSource("factories")
 457     void testInterruptJoin2(ThreadFactory factory) throws Exception {
 458         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 459                 cf -> cf.withThreadFactory(factory))) {
 460 
 461             var latch = new CountDownLatch(1);
 462             Subtask<String> subtask = scope.fork(() -> {
 463                 Thread.sleep(60_000);
 464                 return "foo";
 465             });
 466 
 467             // interrupt main thread when it blocks in join
 468             scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join");
 469             try {
 470                 scope.join();
 471                 fail("join did not throw");
 472             } catch (InterruptedException expected) {
 473                 assertFalse(Thread.interrupted());   // interrupt status should be clear
 474             }
 475         }
 476     }
 477 
 478     /**
 479      * Test join when scope is cancelled.
 480      */
 481     @ParameterizedTest
 482     @MethodSource("factories")
 483     void testJoinWhenCancelled(ThreadFactory factory) throws Exception {

 862 
 863     /**
 864      * Test that isCancelled returns true after close.
 865      */
 866     @Test
 867     void testIsCancelledAfterClose() throws Exception {
 868         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
 869             assertFalse(scope.isCancelled());
 870             scope.close();
 871             assertTrue(scope.isCancelled());
 872         }
 873     }
 874 
 875     /**
 876      * Test Joiner.onFork throwing exception.
 877      */
 878     @Test
 879     void testOnForkThrows() throws Exception {
 880         var joiner = new Joiner<String, Void>() {
 881             @Override
 882             public boolean onFork(Subtask<? extends String> subtask) {
 883                 throw new FooException();
 884             }
 885             @Override
 886             public Void result() {
 887                 return null;
 888             }
 889         };
 890         try (var scope = StructuredTaskScope.open(joiner)) {
 891             assertThrows(FooException.class, () -> scope.fork(() -> "foo"));
 892         }
 893     }
 894 
 895     /**
 896      * Test Joiner.onFork returning true to cancel execution.
 897      */
 898     @Test
 899     void testOnForkCancelsExecution() throws Exception {
 900         var joiner = new Joiner<String, Void>() {
 901             @Override
 902             public boolean onFork(Subtask<? extends String> subtask) {
 903                 return true;
 904             }
 905             @Override
 906             public Void result() {
 907                 return null;
 908             }
 909         };
 910         try (var scope = StructuredTaskScope.open(joiner)) {
 911             assertFalse(scope.isCancelled());
 912             scope.fork(() -> "foo");
 913             assertTrue(scope.isCancelled());
 914             scope.join();
 915         }
 916     }
 917 
 918     /**
 919      * Test Joiner.onComplete throwing exception causes UHE to be invoked.
 920      */
 921     @Test
 922     void testOnCompleteThrows() throws Exception {
 923         var joiner = new Joiner<String, Void>() {
 924             @Override
 925             public boolean onComplete(Subtask<? extends String> subtask) {
 926                 throw new FooException();
 927             }
 928             @Override
 929             public Void result() {
 930                 return null;
 931             }
 932         };
 933         var excRef = new AtomicReference<Throwable>();
 934         Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e);
 935         ThreadFactory factory = Thread.ofVirtual()
 936                 .uncaughtExceptionHandler(uhe)
 937                 .factory();
 938         try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
 939             scope.fork(() -> "foo");
 940             scope.join();
 941             assertInstanceOf(FooException.class, excRef.get());
 942         }
 943     }
 944 
 945     /**
 946      * Test Joiner.onComplete returning true to cancel execution.
 947      */
 948     @Test
 949     void testOnCompleteCancelsExecution() throws Exception {
 950         var joiner = new Joiner<String, Void>() {
 951             @Override
 952             public boolean onComplete(Subtask<? extends String> subtask) {
 953                 return true;
 954             }
 955             @Override
 956             public Void result() {
 957                 return null;
 958             }
 959         };
 960         try (var scope = StructuredTaskScope.open(joiner)) {
 961             assertFalse(scope.isCancelled());
 962             scope.fork(() -> "foo");
 963             awaitCancelled(scope);
 964             scope.join();
 965         }
 966     }
 967 




























































 968     /**
 969      * Test toString.
 970      */
 971     @Test
 972     void testToString() throws Exception {
 973         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 974                 cf -> cf.withName("duke"))) {
 975 
 976             // open
 977             assertTrue(scope.toString().contains("duke"));
 978 
 979             // closed
 980             scope.close();
 981             assertTrue(scope.toString().contains("duke"));
 982         }
 983     }
 984 
 985     /**
 986      * Test Subtask with task that completes successfully.
 987      */
 988     @ParameterizedTest
 989     @MethodSource("factories")
 990     void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception {
 991         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
 992                 cf -> cf.withThreadFactory(factory))) {
 993 
 994             Subtask<String> subtask = scope.fork(() -> "foo");
 995 
 996             // before join
 997             assertThrows(IllegalStateException.class, subtask::get);
 998             assertThrows(IllegalStateException.class, subtask::exception);
 999 




1000             scope.join();
1001 
1002             // after join
1003             assertEquals(Subtask.State.SUCCESS, subtask.state());


1004             assertEquals("foo", subtask.get());
1005             assertThrows(IllegalStateException.class, subtask::exception);




1006         }
1007     }
1008 
1009     /**
1010      * Test Subtask with task that fails.
1011      */
1012     @ParameterizedTest
1013     @MethodSource("factories")
1014     void testSubtaskWhenFailed(ThreadFactory factory) throws Exception {
1015         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1016                 cf -> cf.withThreadFactory(factory))) {
1017 
1018             Subtask<String> subtask = scope.fork(() -> { throw new FooException(); });
1019 
1020             // before join
1021             assertThrows(IllegalStateException.class, subtask::get);
1022             assertThrows(IllegalStateException.class, subtask::exception);
1023 




1024             scope.join();
1025 
1026             // after join
1027             assertEquals(Subtask.State.FAILED, subtask.state());


1028             assertThrows(IllegalStateException.class, subtask::get);
1029             assertTrue(subtask.exception() instanceof FooException);




1030         }
1031     }
1032 
1033     /**
1034      * Test Subtask with a task that has not completed.
1035      */
1036     @ParameterizedTest
1037     @MethodSource("factories")
1038     void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception {
1039         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
1040                 cf -> cf.withThreadFactory(factory))) {
1041             Subtask<Void> subtask = scope.fork(() -> {
1042                 Thread.sleep(Duration.ofDays(1));
1043                 return null;
1044             });
1045 
1046             // before join
1047             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());


1048             assertThrows(IllegalStateException.class, subtask::get);
1049             assertThrows(IllegalStateException.class, subtask::exception);
1050 




1051             // attempt join, join throws
1052             Thread.currentThread().interrupt();
1053             assertThrows(InterruptedException.class, scope::join);
1054 
1055             // after join
1056             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());


1057             assertThrows(IllegalStateException.class, subtask::get);
1058             assertThrows(IllegalStateException.class, subtask::exception);




1059         }
1060     }
1061 
1062     /**
1063      * Test Subtask forked after execution cancelled.
1064      */
1065     @ParameterizedTest
1066     @MethodSource("factories")
1067     void testSubtaskWhenCancelled(ThreadFactory factory) throws Exception {
1068         try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) {
1069             scope.fork(() -> "foo");
1070             awaitCancelled(scope);
1071 
1072             var subtask = scope.fork(() -> "foo");
1073 
1074             // before join
1075             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1076             assertThrows(IllegalStateException.class, subtask::get);
1077             assertThrows(IllegalStateException.class, subtask::exception);
1078 




1079             scope.join();
1080 
1081             // after join
1082             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());


1083             assertThrows(IllegalStateException.class, subtask::get);
1084             assertThrows(IllegalStateException.class, subtask::exception);




1085         }
1086     }
1087 
1088     /**
1089      * Test Subtask::toString.
1090      */
1091     @Test
1092     void testSubtaskToString() throws Exception {
1093         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1094             var latch = new CountDownLatch(1);
1095             var subtask1 = scope.fork(() -> {
1096                 latch.await();
1097                 return "foo";
1098             });
1099             var subtask2 = scope.fork(() -> { throw new FooException(); });
1100 
1101             // subtask1 result is unavailable
1102             assertTrue(subtask1.toString().contains("Unavailable"));
1103             latch.countDown();
1104 
1105             scope.join();
1106 
1107             assertTrue(subtask1.toString().contains("Completed successfully"));
1108             assertTrue(subtask2.toString().contains("Failed"));
1109         }
1110     }
1111 
1112     /**
1113      * Test Joiner.allSuccessfulOrThrow() with no subtasks.
1114      */
1115     @Test
1116     void testAllSuccessfulOrThrow1() throws Throwable {
1117         try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow())) {
1118             var subtasks = scope.join().toList();
1119             assertTrue(subtasks.isEmpty());
1120         }
1121     }
1122 
1123     /**
1124      * Test Joiner.allSuccessfulOrThrow() with subtasks that complete successfully.
1125      */
1126     @ParameterizedTest
1127     @MethodSource("factories")
1128     void testAllSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
1129         try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1130                 cf -> cf.withThreadFactory(factory))) {
1131             var subtask1 = scope.fork(() -> "foo");
1132             var subtask2 = scope.fork(() -> "bar");
1133             var subtasks = scope.join().toList();
1134             assertEquals(List.of(subtask1, subtask2), subtasks);
1135             assertEquals("foo", subtask1.get());
1136             assertEquals("bar", subtask2.get());
1137         }
1138     }
1139 
1140     /**
1141      * Test Joiner.allSuccessfulOrThrow() with a subtask that complete successfully and
1142      * a subtask that fails.
1143      */
1144     @ParameterizedTest
1145     @MethodSource("factories")
1146     void testAllSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1147         try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1148                 cf -> cf.withThreadFactory(factory))) {
1149             scope.fork(() -> "foo");
1150             scope.fork(() -> { throw new FooException(); });
1151             try {
1152                 scope.join();
1153             } catch (FailedException e) {
1154                 assertTrue(e.getCause() instanceof FooException);
1155             }
1156         }
1157     }
1158 
1159     /**
1160      * Test Joiner.anySuccessfulResultOrThrow() with no subtasks.









































1161      */
1162     @Test
1163     void testAnySuccessfulResultOrThrow1() throws Exception {
1164         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
1165             try {
1166                 scope.join();
1167             } catch (FailedException e) {
1168                 assertTrue(e.getCause() instanceof NoSuchElementException);
1169             }
1170         }
1171     }
1172 
1173     /**
1174      * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully.
1175      */
1176     @ParameterizedTest
1177     @MethodSource("factories")
1178     void testAnySuccessfulResultOrThrow2(ThreadFactory factory) throws Exception {
1179         try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
1180                 cf -> cf.withThreadFactory(factory))) {
1181             scope.fork(() -> "foo");
1182             String result = scope.join();
1183             assertEquals("foo", result);
1184         }
1185     }
1186 
1187     /**
1188      * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully
1189      * with a null result.
1190      */
1191     @ParameterizedTest
1192     @MethodSource("factories")
1193     void testAnySuccessfulResultOrThrow3(ThreadFactory factory) throws Exception {
1194         try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
1195                 cf -> cf.withThreadFactory(factory))) {
1196             scope.fork(() -> null);
1197             String result = scope.join();
1198             assertNull(result);
1199         }
1200     }
1201 
1202     /**
1203      * Test Joiner.anySuccessfulResultOrThrow() with a subtask that complete succcessfully
1204      * and a subtask that fails.
1205      */
1206     @ParameterizedTest
1207     @MethodSource("factories")
1208     void testAnySuccessfulResultOrThrow4(ThreadFactory factory) throws Exception {
1209         try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
1210                 cf -> cf.withThreadFactory(factory))) {
1211             scope.fork(() -> "foo");
1212             scope.fork(() -> { throw new FooException(); });
1213             String first = scope.join();
1214             assertEquals("foo", first);
1215         }
1216     }
1217 
1218     /**
1219      * Test Joiner.anySuccessfulResultOrThrow() with a subtask that fails.
1220      */
1221     @ParameterizedTest
1222     @MethodSource("factories")
1223     void testAnySuccessfulResultOrThrow5(ThreadFactory factory) throws Exception {
1224         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
1225                 cf -> cf.withThreadFactory(factory))) {
1226             scope.fork(() -> { throw new FooException(); });
1227             Throwable ex = assertThrows(FailedException.class, scope::join);
1228             assertTrue(ex.getCause() instanceof FooException);
1229         }
1230     }
1231 



















1232     /**
1233      * Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks.
1234      */
1235     @Test
1236     void testAwaitSuccessfulOrThrow1() throws Throwable {
1237         try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {
1238             var result = scope.join();
1239             assertNull(result);
1240         }
1241     }
1242 
1243     /**
1244      * Test Joiner.awaitAllSuccessfulOrThrow() with subtasks that complete successfully.
1245      */
1246     @ParameterizedTest
1247     @MethodSource("factories")
1248     void testAwaitSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
1249         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1250                 cf -> cf.withThreadFactory(factory))) {
1251             var subtask1 = scope.fork(() -> "foo");

1259 
1260     /**
1261      * Test Joiner.awaitAllSuccessfulOrThrow() with a subtask that complete successfully and
1262      * a subtask that fails.
1263      */
1264     @ParameterizedTest
1265     @MethodSource("factories")
1266     void testAwaitSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1267         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1268                 cf -> cf.withThreadFactory(factory))) {
1269             scope.fork(() -> "foo");
1270             scope.fork(() -> { throw new FooException(); });
1271             try {
1272                 scope.join();
1273             } catch (FailedException e) {
1274                 assertTrue(e.getCause() instanceof FooException);
1275             }
1276         }
1277     }
1278 



















1279     /**
1280      * Test Joiner.awaitAll() with no subtasks.
1281      */
1282     @Test
1283     void testAwaitAll1() throws Throwable {
1284         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1285             var result = scope.join();
1286             assertNull(result);
1287         }
1288     }
1289 
1290     /**
1291      * Test Joiner.awaitAll() with subtasks that complete successfully.
1292      */
1293     @ParameterizedTest
1294     @MethodSource("factories")
1295     void testAwaitAll2(ThreadFactory factory) throws Throwable {
1296         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1297                 cf -> cf.withThreadFactory(factory))) {
1298             var subtask1 = scope.fork(() -> "foo");

1305     }
1306 
1307     /**
1308      * Test Joiner.awaitAll() with a subtask that complete successfully and a subtask
1309      * that fails.
1310      */
1311     @ParameterizedTest
1312     @MethodSource("factories")
1313     void testAwaitAll3(ThreadFactory factory) throws Throwable {
1314         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1315                 cf -> cf.withThreadFactory(factory))) {
1316             var subtask1 = scope.fork(() -> "foo");
1317             var subtask2 = scope.fork(() -> { throw new FooException(); });
1318             var result = scope.join();
1319             assertNull(result);
1320             assertEquals("foo", subtask1.get());
1321             assertTrue(subtask2.exception() instanceof FooException);
1322         }
1323     }
1324 



















1325     /**
1326      * Test Joiner.allUntil(Predicate) with no subtasks.
1327      */
1328     @Test
1329     void testAllUntil1() throws Throwable {
1330         try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) {
1331             var subtasks = scope.join();
1332             assertEquals(0, subtasks.count());
1333         }
1334     }
1335 
1336     /**
1337      * Test Joiner.allUntil(Predicate) with no cancellation.
1338      */
1339     @ParameterizedTest
1340     @MethodSource("factories")
1341     void testAllUntil2(ThreadFactory factory) throws Exception {
1342         try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false),
1343                 cf -> cf.withThreadFactory(factory))) {
1344 
1345             var subtask1 = scope.fork(() -> "foo");
1346             var subtask2 = scope.fork(() -> { throw new FooException(); });
1347 
1348             var subtasks = scope.join().toList();
1349             assertEquals(2, subtasks.size());
1350 
1351             assertSame(subtask1, subtasks.get(0));
1352             assertSame(subtask2, subtasks.get(1));
1353             assertEquals("foo", subtask1.get());
1354             assertTrue(subtask2.exception() instanceof FooException);
1355         }
1356     }
1357 
1358     /**
1359      * Test Joiner.allUntil(Predicate) with cancellation after one subtask completes.
1360      */
1361     @ParameterizedTest
1362     @MethodSource("factories")
1363     void testAllUntil3(ThreadFactory factory) throws Exception {
1364         try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> true),
1365                 cf -> cf.withThreadFactory(factory))) {
1366 
1367             var subtask1 = scope.fork(() -> "foo");
1368             var subtask2 = scope.fork(() -> {
1369                 Thread.sleep(Duration.ofDays(1));
1370                 return "bar";
1371             });
1372 
1373             var subtasks = scope.join().toList();

1374 
1375             assertEquals(2, subtasks.size());
1376             assertSame(subtask1, subtasks.get(0));
1377             assertSame(subtask2, subtasks.get(1));
1378             assertEquals("foo", subtask1.get());
1379             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1380         }
1381     }
1382 
1383     /**
1384      * Test Joiner.allUntil(Predicate) with cancellation after serveral subtasks complete.
1385      */
1386     @ParameterizedTest
1387     @MethodSource("factories")
1388     void testAllUntil4(ThreadFactory factory) throws Exception {
1389 
1390         // cancel execution after two or more failures
1391         class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> {
1392             final AtomicInteger failedCount = new AtomicInteger();
1393             @Override
1394             public boolean test(Subtask<? extends T> subtask) {
1395                 return subtask.state() == Subtask.State.FAILED
1396                         && failedCount.incrementAndGet() >= 2;
1397             }
1398         }
1399         var joiner = Joiner.allUntil(new CancelAfterTwoFailures<String>());
1400 
1401         try (var scope = StructuredTaskScope.open(joiner)) {
1402             int forkCount = 0;
1403 
1404             // fork subtasks until execution cancelled
1405             while (!scope.isCancelled()) {
1406                 scope.fork(() -> "foo");
1407                 scope.fork(() -> { throw new FooException(); });
1408                 forkCount += 2;
1409                 Thread.sleep(Duration.ofMillis(20));
1410             }
1411 
1412             var subtasks = scope.join().toList();
1413             assertEquals(forkCount, subtasks.size());
1414 
1415             long failedCount = subtasks.stream()
1416                     .filter(s -> s.state() == Subtask.State.FAILED)
1417                     .count();
1418             assertTrue(failedCount >= 2);
1419         }
1420     }
1421 
1422     /**
1423      * Test Test Joiner.allUntil(Predicate) where the Predicate's test method throws.
1424      */
1425     @Test
1426     void testAllUntil5() throws Exception {
1427         var joiner = Joiner.allUntil(_ -> { throw new FooException(); });
1428         var excRef = new AtomicReference<Throwable>();
1429         Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e);
1430         ThreadFactory factory = Thread.ofVirtual()
1431                 .uncaughtExceptionHandler(uhe)
1432                 .factory();
1433         try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
1434             scope.fork(() -> "foo");
1435             scope.join();
1436             assertInstanceOf(FooException.class, excRef.get());
1437         }
1438     }
1439 





















































1440     /**
1441      * Test Joiner default methods.
1442      */
1443     @Test
1444     void testJoinerDefaultMethods() throws Exception {
1445         try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) {
1446 
1447             // need subtasks to test default methods
1448             var subtask1 = scope.fork(() -> "foo");
1449             awaitCancelled(scope);
1450             var subtask2 = scope.fork(() -> "bar");
1451             scope.join();
1452 
1453             assertEquals(Subtask.State.SUCCESS, subtask1.state());
1454             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1455 
1456             // Joiner that does not override default methods
1457             Joiner<Object, Void> joiner = () -> null;
1458             assertThrows(NullPointerException.class, () -> joiner.onFork(null));
1459             assertThrows(NullPointerException.class, () -> joiner.onComplete(null));
1460             assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1));
1461             assertFalse(joiner.onFork(subtask2));
1462             assertFalse(joiner.onComplete(subtask1));
1463             assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2));

1464         }
1465     }
1466 
1467     /**
1468      * Test Joiners onFork/onComplete methods with a subtask in an unexpected state.
1469      */
1470     @Test
1471     void testJoinersWithUnavailableResult() throws Exception {
1472         try (var scope = StructuredTaskScope.open()) {
1473             var done = new CountDownLatch(1);
1474             var subtask = scope.fork(() -> {
1475                 done.await();
1476                 return null;
1477             });
1478 
1479             // onComplete with uncompleted task should throw IAE
1480             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1481             assertThrows(IllegalArgumentException.class,
1482                     () -> Joiner.allSuccessfulOrThrow().onComplete(subtask));
1483             assertThrows(IllegalArgumentException.class,
1484                     () -> Joiner.anySuccessfulResultOrThrow().onComplete(subtask));
1485             assertThrows(IllegalArgumentException.class,
1486                     () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(subtask));
1487             assertThrows(IllegalArgumentException.class,
1488                     () -> Joiner.awaitAll().onComplete(subtask));
1489             assertThrows(IllegalArgumentException.class,
1490                     () -> Joiner.allUntil(_ -> false).onComplete(subtask));
1491 
1492             done.countDown();
1493             scope.join();
1494 
1495             // onFork with completed task should throw IAE
1496             assertEquals(Subtask.State.SUCCESS, subtask.state());
1497             assertThrows(IllegalArgumentException.class,
1498                     () -> Joiner.allSuccessfulOrThrow().onFork(subtask));
1499             assertThrows(IllegalArgumentException.class,
1500                     () -> Joiner.anySuccessfulResultOrThrow().onFork(subtask));
1501             assertThrows(IllegalArgumentException.class,
1502                     () -> Joiner.awaitAllSuccessfulOrThrow().onFork(subtask));
1503             assertThrows(IllegalArgumentException.class,
1504                     () -> Joiner.awaitAll().onFork(subtask));
1505             assertThrows(IllegalArgumentException.class,
1506                     () -> Joiner.allUntil(_ -> false).onFork(subtask));
1507         }
1508 
1509     }
1510 
1511     /**
1512      * Test the Configuration function apply method throwing an exception.
1513      */
1514     @Test
1515     void testConfigFunctionThrows() throws Exception {
1516         assertThrows(FooException.class,
1517                 () -> StructuredTaskScope.open(Joiner.awaitAll(),
1518                                                cf -> { throw new FooException(); }));
1519     }
1520 
1521     /**
1522      * Test Configuration equals/hashCode/toString
1523      */
1524     @Test
1525     void testConfigMethods() throws Exception {
1526         Function<Configuration, Configuration> testConfig = cf -> {
1527             var name = "duke";
1528             var threadFactory = Thread.ofPlatform().factory();
1529             var timeout = Duration.ofSeconds(10);
1530 
1531             assertEquals(cf, cf);
1532             assertEquals(cf.withName(name), cf.withName(name));
1533             assertEquals(cf.withThreadFactory(threadFactory), cf.withThreadFactory(threadFactory));
1534             assertEquals(cf.withTimeout(timeout), cf.withTimeout(timeout));
1535 
1536             assertNotEquals(cf, cf.withName(name));
1537             assertNotEquals(cf, cf.withThreadFactory(threadFactory));
1538             assertNotEquals(cf, cf.withTimeout(timeout));
1539 
1540             assertEquals(cf.withName(name).hashCode(), cf.withName(name).hashCode());
1541             assertEquals(cf.withThreadFactory(threadFactory).hashCode(),
1542                     cf.withThreadFactory(threadFactory).hashCode());
1543             assertEquals(cf.withTimeout(timeout).hashCode(), cf.withTimeout(timeout).hashCode());
1544 
1545             assertTrue(cf.withName(name).toString().contains(name));
1546             assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
1547             assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
1548 
1549             return cf;
1550         };
1551         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), testConfig)) {
1552             // do nothing
1553         }
1554     }
1555 
1556     /**
1557      * Test for NullPointerException.
1558      */
1559     @Test
1560     void testNulls() throws Exception {
1561         assertThrows(NullPointerException.class,
1562                 () -> StructuredTaskScope.open(null));
1563         assertThrows(NullPointerException.class,
1564                 () -> StructuredTaskScope.open(null, cf -> cf));
1565         assertThrows(NullPointerException.class,
1566                 () -> StructuredTaskScope.open(Joiner.awaitAll(), null));
1567 
1568         assertThrows(NullPointerException.class, () -> Joiner.allUntil(null));
1569 
1570         // fork
1571         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {

1580                 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withName(null)));
1581         assertThrows(NullPointerException.class,
1582                 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withThreadFactory(null)));
1583         assertThrows(NullPointerException.class,
1584                 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withTimeout(null)));
1585 
1586         // Joiner.onFork/onComplete
1587         assertThrows(NullPointerException.class,
1588                 () -> Joiner.awaitAllSuccessfulOrThrow().onFork(null));
1589         assertThrows(NullPointerException.class,
1590                 () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(null));
1591         assertThrows(NullPointerException.class,
1592                 () -> Joiner.awaitAll().onFork(null));
1593         assertThrows(NullPointerException.class,
1594                 () -> Joiner.awaitAll().onComplete(null));
1595         assertThrows(NullPointerException.class,
1596                 () -> Joiner.allSuccessfulOrThrow().onFork(null));
1597         assertThrows(NullPointerException.class,
1598                 () -> Joiner.allSuccessfulOrThrow().onComplete(null));
1599         assertThrows(NullPointerException.class,
1600                 () -> Joiner.anySuccessfulResultOrThrow().onFork(null));
1601         assertThrows(NullPointerException.class,
1602                 () -> Joiner.anySuccessfulResultOrThrow().onComplete(null));
1603     }
1604 
1605     /**
1606      * ThreadFactory that counts usage.
1607      */
1608     private static class CountingThreadFactory implements ThreadFactory {
1609         final ThreadFactory delegate;
1610         final AtomicInteger threadCount = new AtomicInteger();
1611         CountingThreadFactory(ThreadFactory delegate) {
1612             this.delegate = delegate;
1613         }
1614         @Override
1615         public Thread newThread(Runnable task) {
1616             threadCount.incrementAndGet();
1617             return delegate.newThread(task);
1618         }
1619         int threadCount() {
1620             return threadCount.get();
1621         }
1622     }
1623 
1624     /**
1625      * A joiner that counts that counts the number of subtasks that are forked and the
1626      * number of subtasks that complete.
1627      */
1628     private static class CountingJoiner<T> implements Joiner<T, Void> {
1629         final AtomicInteger onForkCount = new AtomicInteger();
1630         final AtomicInteger onCompleteCount = new AtomicInteger();
1631         @Override
1632         public boolean onFork(Subtask<? extends T> subtask) {
1633             onForkCount.incrementAndGet();
1634             return false;
1635         }
1636         @Override
1637         public boolean onComplete(Subtask<? extends T> subtask) {
1638             onCompleteCount.incrementAndGet();
1639             return false;
1640         }
1641         @Override
1642         public Void result() {
1643             return null;
1644         }
1645         int onForkCount() {
1646             return onForkCount.get();
1647         }
1648         int onCompleteCount() {
1649             return onCompleteCount.get();
1650         }
1651     }
1652 
1653     /**
1654      * A joiner that cancels execution when a subtask completes. It also keeps a count
1655      * of the number of subtasks that are forked and the number of subtasks that complete.
1656      */
1657     private static class CancelAfterOneJoiner<T> implements Joiner<T, Void> {
1658         final AtomicInteger onForkCount = new AtomicInteger();
1659         final AtomicInteger onCompleteCount = new AtomicInteger();
1660         @Override
1661         public boolean onFork(Subtask<? extends T> subtask) {
1662             onForkCount.incrementAndGet();
1663             return false;
1664         }
1665         @Override
1666         public boolean onComplete(Subtask<? extends T> subtask) {
1667             onCompleteCount.incrementAndGet();
1668             return true;
1669         }
1670         @Override
1671         public Void result() {
1672             return null;
1673         }
1674         int onForkCount() {
1675             return onForkCount.get();
1676         }
1677         int onCompleteCount() {
1678             return onCompleteCount.get();
1679         }
1680     }
1681 
1682     /**
1683      * A runtime exception for tests.
1684      */
1685     private static class FooException extends RuntimeException {
1686         FooException() { }

1738                 found = true;
1739             } else {
1740                 Thread.sleep(20);
1741             }
1742         }
1743         target.interrupt();
1744     }
1745 
1746     /**
1747      * Schedules the current thread to be interrupted when it waits (timed or untimed)
1748      * at the given location.
1749      */
1750     private void scheduleInterruptAt(String location) {
1751         Thread target = Thread.currentThread();
1752         scheduler.submit(() -> {
1753             interruptThreadAt(target, location);
1754             return null;
1755         });
1756     }
1757 


































1758     /**
1759      * Returns true if the given stack trace contains an element for the given class
1760      * and method name.
1761      */
1762     private boolean contains(StackTraceElement[] stack, String className, String methodName) {
1763         return Arrays.stream(stack)
1764                 .anyMatch(e -> className.equals(e.getClassName())
1765                         && methodName.equals(e.getMethodName()));
1766     }
1767 }

  44 import java.util.concurrent.Callable;
  45 import java.util.concurrent.ConcurrentHashMap;
  46 import java.util.concurrent.CountDownLatch;
  47 import java.util.concurrent.Executors;
  48 import java.util.concurrent.Future;
  49 import java.util.concurrent.LinkedTransferQueue;
  50 import java.util.concurrent.ThreadFactory;
  51 import java.util.concurrent.TimeUnit;
  52 import java.util.concurrent.RejectedExecutionException;
  53 import java.util.concurrent.ScheduledExecutorService;
  54 import java.util.concurrent.StructuredTaskScope;
  55 import java.util.concurrent.StructuredTaskScope.TimeoutException;
  56 import java.util.concurrent.StructuredTaskScope.Configuration;
  57 import java.util.concurrent.StructuredTaskScope.FailedException;
  58 import java.util.concurrent.StructuredTaskScope.Joiner;
  59 import java.util.concurrent.StructuredTaskScope.Subtask;
  60 import java.util.concurrent.StructureViolationException;
  61 import java.util.concurrent.atomic.AtomicBoolean;
  62 import java.util.concurrent.atomic.AtomicInteger;
  63 import java.util.concurrent.atomic.AtomicReference;

  64 import java.util.function.Predicate;
  65 import java.util.function.UnaryOperator;
  66 import java.util.stream.Stream;
  67 import static java.lang.Thread.State.*;
  68 
  69 import org.junit.jupiter.api.Test;
  70 import org.junit.jupiter.api.BeforeAll;
  71 import org.junit.jupiter.api.AfterAll;
  72 import org.junit.jupiter.params.ParameterizedTest;
  73 import org.junit.jupiter.params.provider.MethodSource;
  74 import static org.junit.jupiter.api.Assertions.*;
  75 
  76 class StructuredTaskScopeTest {
  77     private static ScheduledExecutorService scheduler;
  78     private static List<ThreadFactory> threadFactories;
  79 
  80     @BeforeAll
  81     static void setup() throws Exception {
  82         scheduler = Executors.newSingleThreadScheduledExecutor();
  83 
  84         // thread factories
  85         String value = System.getProperty("threadFactory");

 192                 future.get();
 193             }
 194 
 195             // subtask cannot fork
 196             Subtask<Boolean> subtask = scope.fork(() -> {
 197                 assertThrows(WrongThreadException.class, () -> {
 198                     scope.fork(() -> null);
 199                 });
 200                 return true;
 201             });
 202             scope.join();
 203             assertTrue(subtask.get());
 204         }
 205     }
 206 
 207     /**
 208      * Test fork after join, no subtasks forked before join.
 209      */
 210     @ParameterizedTest
 211     @MethodSource("factories")
 212     void testForkAfterJoinCompleted1(ThreadFactory factory) throws Exception {
 213         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 214                 cf -> cf.withThreadFactory(factory))) {
 215             scope.join();
 216             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 217         }
 218     }
 219 
 220     /**
 221      * Test fork after join, subtasks forked before join.
 222      */
 223     @ParameterizedTest
 224     @MethodSource("factories")
 225     void testForkAfterJoinCompleted2(ThreadFactory factory) throws Exception {
 226         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 227                 cf -> cf.withThreadFactory(factory))) {
 228             scope.fork(() -> "foo");
 229             scope.join();
 230             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 231         }
 232     }
 233 
 234     /**
 235      * Test fork after join interrupted.
 236      */
 237     @ParameterizedTest
 238     @MethodSource("factories")
 239     void testForkAfterJoinInterrupted(ThreadFactory factory) throws Exception {
 240         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 241                 cf -> cf.withThreadFactory(factory))) {

 242             var subtask1 = scope.fork(() -> {
 243                 Thread.sleep(Duration.ofDays(1));
 244                 return "foo";
 245             });
 246 
 247             // join throws
 248             Thread.currentThread().interrupt();
 249             assertThrows(InterruptedException.class, scope::join);
 250 
 251             // fork should throw
 252             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 253         }
 254     }
 255 
 256     /**
 257      * Test fork after join timeout.
 258      */
 259     @ParameterizedTest
 260     @MethodSource("factories")
 261     void testForkAfterJoinTimeout(ThreadFactory factory) throws Exception {
 262         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 263                 cf -> cf.withThreadFactory(factory)
 264                         .withTimeout(Duration.ofMillis(100)))) {
 265             awaitCancelled(scope);
 266 
 267             // join throws
 268             assertThrows(TimeoutException.class, scope::join);
 269 
 270             // fork should throw
 271             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 272         }
 273     }
 274 
 275     /**
 276      * Test fork after task scope is cancelled. This test uses a custom Joiner to
 277      * cancel execution.
 278      */
 279     @ParameterizedTest
 280     @MethodSource("factories")
 281     void testForkAfterCancel2(ThreadFactory factory) throws Exception {
 282         var countingThreadFactory = new CountingThreadFactory(factory);
 283         var testJoiner = new CancelAfterOneJoiner<String>();
 284 
 285         try (var scope = StructuredTaskScope.open(testJoiner,
 286                 cf -> cf.withThreadFactory(countingThreadFactory))) {
 287 
 288             // fork subtask, the scope should be cancelled when the subtask completes
 289             var subtask1 = scope.fork(() -> "foo");
 290             awaitCancelled(scope);
 291 
 292             assertEquals(1, countingThreadFactory.threadCount());
 293             assertEquals(1, testJoiner.onForkCount());
 294             assertEquals(1, testJoiner.onCompleteCount());

 297             var subtask2 = scope.fork(() -> "bar");
 298 
 299             // onFork should be invoked, newThread and onComplete should not be invoked
 300             assertEquals(1, countingThreadFactory.threadCount());
 301             assertEquals(2, testJoiner.onForkCount());
 302             assertEquals(1, testJoiner.onCompleteCount());
 303 
 304             scope.join();
 305 
 306             assertEquals(1, countingThreadFactory.threadCount());
 307             assertEquals(2, testJoiner.onForkCount());
 308             assertEquals(1, testJoiner.onCompleteCount());
 309             assertEquals("foo", subtask1.get());
 310             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
 311         }
 312     }
 313 
 314     /**
 315      * Test fork after task scope is closed.
 316      */
 317     @ParameterizedTest
 318     @MethodSource("factories")
 319     void testForkAfterClose(ThreadFactory factory) {
 320         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 321                 cf -> cf.withThreadFactory(factory))) {
 322             scope.close();
 323             assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
 324         }
 325     }
 326 
 327     /**
 328      * Test fork with a ThreadFactory that rejects creating a thread.
 329      */
 330     @Test
 331     void testForkRejectedExecutionException() {
 332         ThreadFactory factory = task -> null;
 333         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 334                 cf -> cf.withThreadFactory(factory))) {
 335             assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null));
 336         }
 337     }
 338 
 339     /**
 340      * Test join with no subtasks.
 341      */

 369     @Test
 370     void testJoinAfterJoin1() throws Exception {
 371         var results = new LinkedTransferQueue<>(List.of("foo", "bar", "baz"));
 372         Joiner<Object, String> joiner = results::take;
 373         try (var scope = StructuredTaskScope.open(joiner)) {
 374             scope.fork(() -> "foo");
 375             assertEquals("foo", scope.join());
 376 
 377             // join already called
 378             for (int i = 0 ; i < 3; i++) {
 379                 assertThrows(IllegalStateException.class, scope::join);
 380             }
 381         }
 382     }
 383 
 384     /**
 385      * Test join after join completed with an exception.
 386      */
 387     @Test
 388     void testJoinAfterJoin2() throws Exception {
 389         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow())) {
 390             scope.fork(() -> { throw new FooException(); });
 391             Throwable ex = assertThrows(FailedException.class, scope::join);
 392             assertTrue(ex.getCause() instanceof FooException);
 393 
 394             // join already called
 395             for (int i = 0 ; i < 3; i++) {
 396                 assertThrows(IllegalStateException.class, scope::join);
 397             }
 398         }
 399     }
 400 
 401     /**
 402      * Test join after join interrupted.
 403      */
 404     @Test
 405     void testJoinAfterJoinInterrupted() throws Exception {
 406         try (var scope = StructuredTaskScope.open()) {
 407             var latch = new CountDownLatch(1);
 408             var subtask = scope.fork(() -> {
 409                 latch.await();
 410                 return "foo";
 411             });
 412 
 413             // join throws InterruptedException
 414             Thread.currentThread().interrupt();
 415             assertThrows(InterruptedException.class, scope::join);
 416 
 417             latch.countDown();
 418 
 419             // retry join to get result
 420             scope.join();
 421             assertEquals("foo", subtask.get());
 422 
 423             // retry after otbaining result
 424             assertThrows(IllegalStateException.class, scope::join);
 425         }
 426     }
 427 
 428     /**
 429      * Test join after join completed with a timeout.
 430      */
 431     @Test
 432     void testJoinAfterJoinTimeout() throws Exception {
 433         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow(),
 434                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
 435             // wait for scope to be cancelled by timeout
 436             awaitCancelled(scope);
 437             assertThrows(TimeoutException.class, scope::join);
 438 
 439             // join already called
 440             for (int i = 0 ; i < 3; i++) {
 441                 assertThrows(IllegalStateException.class, scope::join);
 442             }
 443         }
 444     }
 445 
 446     /**
 447      * Test join invoked from Joiner.onTimeout.
 448      */
 449     @Test
 450     void testJoinInOnTimeout() throws Exception {
 451         Thread owner = Thread.currentThread();
 452         var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>();
 453 
 454         var joiner = new Joiner<String, Void>() {
 455             @Override
 456             public void onTimeout() {
 457                 assertTrue(Thread.currentThread() == owner);
 458                 var scope = scopeRef.get();
 459                 assertThrows(IllegalStateException.class, scope::join);
 460             }
 461             @Override
 462             public Void result() {
 463                 return null;
 464             }
 465         };
 466 
 467         try (var scope = StructuredTaskScope.open(joiner,
 468                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
 469             awaitCancelled(scope);
 470             scopeRef.set(scope);
 471             scope.join();  // invokes onTimeout
 472         }
 473     }
 474 
 475     /**
 476      * Test join method is owner confined.
 477      */
 478     @ParameterizedTest
 479     @MethodSource("factories")
 480     void testJoinConfined(ThreadFactory factory) throws Exception {
 481         try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
 482                 cf -> cf.withThreadFactory(factory))) {
 483 
 484             // random thread cannot join
 485             try (var pool = Executors.newSingleThreadExecutor()) {
 486                 Future<Void> future = pool.submit(() -> {
 487                     assertThrows(WrongThreadException.class, scope::join);
 488                     return null;
 489                 });
 490                 future.get();
 491             }
 492 
 493             // subtask cannot join
 494             Subtask<Boolean> subtask = scope.fork(() -> {
 495                 assertThrows(WrongThreadException.class, () -> { scope.join(); });
 496                 return true;
 497             });
 498             scope.join();
 499             assertTrue(subtask.get());
 500         }
 501     }
 502 
 503     /**
 504      * Test join with interrupt status set.
 505      */
 506     @ParameterizedTest
 507     @MethodSource("factories")
 508     void testInterruptJoin1(ThreadFactory factory) throws Exception {
 509         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 510                 cf -> cf.withThreadFactory(factory))) {
 511 
 512             Subtask<String> subtask = scope.fork(() -> {
 513                 Thread.sleep(Duration.ofDays(1));
 514                 return "foo";
 515             });
 516 
 517             // join should throw
 518             Thread.currentThread().interrupt();
 519             try {
 520                 scope.join();
 521                 fail("join did not throw");
 522             } catch (InterruptedException expected) {
 523                 assertFalse(Thread.interrupted());   // interrupt status should be cleared
 524             }
 525         }
 526     }
 527 
 528     /**
 529      * Test interrupt of thread blocked in join.
 530      */
 531     @ParameterizedTest
 532     @MethodSource("factories")
 533     void testInterruptJoin2(ThreadFactory factory) throws Exception {
 534         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 535                 cf -> cf.withThreadFactory(factory))) {


 536             Subtask<String> subtask = scope.fork(() -> {
 537                 Thread.sleep(Duration.ofDays(1));
 538                 return "foo";
 539             });
 540 
 541             // interrupt main thread when it blocks in join
 542             scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join");
 543             try {
 544                 scope.join();
 545                 fail("join did not throw");
 546             } catch (InterruptedException expected) {
 547                 assertFalse(Thread.interrupted());   // interrupt status should be clear
 548             }
 549         }
 550     }
 551 
 552     /**
 553      * Test join when scope is cancelled.
 554      */
 555     @ParameterizedTest
 556     @MethodSource("factories")
 557     void testJoinWhenCancelled(ThreadFactory factory) throws Exception {

 936 
 937     /**
 938      * Test that isCancelled returns true after close.
 939      */
 940     @Test
 941     void testIsCancelledAfterClose() throws Exception {
 942         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
 943             assertFalse(scope.isCancelled());
 944             scope.close();
 945             assertTrue(scope.isCancelled());
 946         }
 947     }
 948 
 949     /**
 950      * Test Joiner.onFork throwing exception.
 951      */
 952     @Test
 953     void testOnForkThrows() throws Exception {
 954         var joiner = new Joiner<String, Void>() {
 955             @Override
 956             public boolean onFork(Subtask<String> subtask) {
 957                 throw new FooException();
 958             }
 959             @Override
 960             public Void result() {
 961                 return null;
 962             }
 963         };
 964         try (var scope = StructuredTaskScope.open(joiner)) {
 965             assertThrows(FooException.class, () -> scope.fork(() -> "foo"));
 966         }
 967     }
 968 
 969     /**
 970      * Test Joiner.onFork returning true to cancel execution.
 971      */
 972     @Test
 973     void testOnForkCancelsExecution() throws Exception {
 974         var joiner = new Joiner<String, Void>() {
 975             @Override
 976             public boolean onFork(Subtask<String> subtask) {
 977                 return true;
 978             }
 979             @Override
 980             public Void result() {
 981                 return null;
 982             }
 983         };
 984         try (var scope = StructuredTaskScope.open(joiner)) {
 985             assertFalse(scope.isCancelled());
 986             scope.fork(() -> "foo");
 987             assertTrue(scope.isCancelled());
 988             scope.join();
 989         }
 990     }
 991 
 992     /**
 993      * Test Joiner.onComplete throwing exception causes UHE to be invoked.
 994      */
 995     @Test
 996     void testOnCompleteThrows() throws Exception {
 997         var joiner = new Joiner<String, Void>() {
 998             @Override
 999             public boolean onComplete(Subtask<String> subtask) {
1000                 throw new FooException();
1001             }
1002             @Override
1003             public Void result() {
1004                 return null;
1005             }
1006         };
1007         var excRef = new AtomicReference<Throwable>();
1008         Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e);
1009         ThreadFactory factory = Thread.ofVirtual()
1010                 .uncaughtExceptionHandler(uhe)
1011                 .factory();
1012         try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
1013             scope.fork(() -> "foo");
1014             scope.join();
1015             assertInstanceOf(FooException.class, excRef.get());
1016         }
1017     }
1018 
1019     /**
1020      * Test Joiner.onComplete returning true to cancel execution.
1021      */
1022     @Test
1023     void testOnCompleteCancelsExecution() throws Exception {
1024         var joiner = new Joiner<String, Void>() {
1025             @Override
1026             public boolean onComplete(Subtask<String> subtask) {
1027                 return true;
1028             }
1029             @Override
1030             public Void result() {
1031                 return null;
1032             }
1033         };
1034         try (var scope = StructuredTaskScope.open(joiner)) {
1035             assertFalse(scope.isCancelled());
1036             scope.fork(() -> "foo");
1037             awaitCancelled(scope);
1038             scope.join();
1039         }
1040     }
1041 
1042     /**
1043      * Test Joiner.onTimeout invoked by owner thread when timeout expires.
1044      */
1045     @Test
1046     void testOnTimeoutInvoked() throws Exception {
1047         var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>();
1048         Thread owner = Thread.currentThread();
1049         var invokeCount = new AtomicInteger();
1050         var joiner = new Joiner<String, Void>() {
1051             @Override
1052             public void onTimeout() {
1053                 assertTrue(Thread.currentThread() == owner);
1054                 assertTrue(scopeRef.get().isCancelled());
1055                 invokeCount.incrementAndGet();
1056             }
1057             @Override
1058             public Void result() {
1059                 return null;
1060             }
1061         };
1062         try (var scope = StructuredTaskScope.open(joiner,
1063                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1064             scopeRef.set(scope);
1065             scope.fork(() -> {
1066                 Thread.sleep(Duration.ofDays(1));
1067                 return null;
1068             });
1069             scope.join();
1070             assertEquals(1, invokeCount.get());
1071         }
1072     }
1073 
1074     /**
1075      * Test Joiner.onTimeout throwing an excepiton.
1076      */
1077     @Test
1078     void testOnTimeoutThrows() throws Exception {
1079         var joiner = new Joiner<String, Void>() {
1080             @Override
1081             public void onTimeout() {
1082                 throw new FooException();
1083             }
1084             @Override
1085             public Void result() {
1086                 return null;
1087             }
1088         };
1089         try (var scope = StructuredTaskScope.open(joiner,
1090                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1091             // wait for scope to be cancelled by timeout
1092             awaitCancelled(scope);
1093 
1094             // join should throw FooException on first usage
1095             assertThrows(FooException.class, scope::join);
1096 
1097             // retry after onTimeout fails
1098             assertThrows(IllegalStateException.class, scope::join);
1099         }
1100     }
1101 
1102     /**
1103      * Test toString.
1104      */
1105     @Test
1106     void testToString() throws Exception {
1107         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
1108                 cf -> cf.withName("duke"))) {
1109 
1110             // open
1111             assertTrue(scope.toString().contains("duke"));
1112 
1113             // closed
1114             scope.close();
1115             assertTrue(scope.toString().contains("duke"));
1116         }
1117     }
1118 
1119     /**
1120      * Test Subtask with task that completes successfully.
1121      */
1122     @ParameterizedTest
1123     @MethodSource("factories")
1124     void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception {
1125         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1126                 cf -> cf.withThreadFactory(factory))) {

1127             Subtask<String> subtask = scope.fork(() -> "foo");
1128 
1129             // before join, owner thread
1130             assertThrows(IllegalStateException.class, subtask::get);
1131             assertThrows(IllegalStateException.class, subtask::exception);
1132 
1133             // before join, another thread
1134             assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
1135             assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
1136 
1137             scope.join();
1138 

1139             assertEquals(Subtask.State.SUCCESS, subtask.state());
1140 
1141             // after join, owner thread
1142             assertEquals("foo", subtask.get());
1143             assertThrows(IllegalStateException.class, subtask::exception);
1144 
1145             // after join, another thread
1146             assertEquals("foo", callInOtherThread(subtask::get));
1147             assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
1148         }
1149     }
1150 
1151     /**
1152      * Test Subtask with task that fails.
1153      */
1154     @ParameterizedTest
1155     @MethodSource("factories")
1156     void testSubtaskWhenFailed(ThreadFactory factory) throws Exception {
1157         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1158                 cf -> cf.withThreadFactory(factory))) {
1159 
1160             Subtask<String> subtask = scope.fork(() -> { throw new FooException(); });
1161 
1162             // before join, owner thread
1163             assertThrows(IllegalStateException.class, subtask::get);
1164             assertThrows(IllegalStateException.class, subtask::exception);
1165 
1166             // before join, another thread
1167             assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
1168             assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
1169 
1170             scope.join();
1171 

1172             assertEquals(Subtask.State.FAILED, subtask.state());
1173 
1174             // after join, owner thread
1175             assertThrows(IllegalStateException.class, subtask::get);
1176             assertTrue(subtask.exception() instanceof FooException);
1177 
1178             // after join, another thread
1179             assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
1180             assertTrue(callInOtherThread(subtask::exception) instanceof FooException);
1181         }
1182     }
1183 
1184     /**
1185      * Test Subtask with a task that has not completed.
1186      */
1187     @ParameterizedTest
1188     @MethodSource("factories")
1189     void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception {
1190         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
1191                 cf -> cf.withThreadFactory(factory))) {
1192             Subtask<Void> subtask = scope.fork(() -> {
1193                 Thread.sleep(Duration.ofDays(1));
1194                 return null;
1195             });


1196             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1197 
1198             // before join, owner thread
1199             assertThrows(IllegalStateException.class, subtask::get);
1200             assertThrows(IllegalStateException.class, subtask::exception);
1201 
1202             // before join, another thread
1203             assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
1204             assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
1205 
1206             // attempt join, join throws
1207             Thread.currentThread().interrupt();
1208             assertThrows(InterruptedException.class, scope::join);
1209 

1210             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1211 
1212             // after join, owner thread
1213             assertThrows(IllegalStateException.class, subtask::get);
1214             assertThrows(IllegalStateException.class, subtask::exception);
1215 
1216             // before join, another thread
1217             assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
1218             assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
1219         }
1220     }
1221 
1222     /**
1223      * Test Subtask forked after execution cancelled.
1224      */
1225     @ParameterizedTest
1226     @MethodSource("factories")
1227     void testSubtaskWhenCancelled(ThreadFactory factory) throws Exception {
1228         try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) {
1229             scope.fork(() -> "foo");
1230             awaitCancelled(scope);
1231 
1232             var subtask = scope.fork(() -> "foo");
1233 
1234             // before join, owner thread

1235             assertThrows(IllegalStateException.class, subtask::get);
1236             assertThrows(IllegalStateException.class, subtask::exception);
1237 
1238             // before join, another thread
1239             assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
1240             assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
1241 
1242             scope.join();
1243 

1244             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1245 
1246             // after join, owner thread
1247             assertThrows(IllegalStateException.class, subtask::get);
1248             assertThrows(IllegalStateException.class, subtask::exception);
1249 
1250             // before join, another thread
1251             assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
1252             assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
1253         }
1254     }
1255 
1256     /**
1257      * Test Subtask::toString.
1258      */
1259     @Test
1260     void testSubtaskToString() throws Exception {
1261         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1262             var latch = new CountDownLatch(1);
1263             var subtask1 = scope.fork(() -> {
1264                 latch.await();
1265                 return "foo";
1266             });
1267             var subtask2 = scope.fork(() -> { throw new FooException(); });
1268 
1269             // subtask1 result is unavailable
1270             assertTrue(subtask1.toString().contains("Unavailable"));
1271             latch.countDown();
1272 
1273             scope.join();
1274 
1275             assertTrue(subtask1.toString().contains("Completed successfully"));
1276             assertTrue(subtask2.toString().contains("Failed"));
1277         }
1278     }
1279 
1280     /**
1281      * Test Joiner.allSuccessfulOrThrow() with no subtasks.
1282      */
1283     @Test
1284     void testAllSuccessfulOrThrow1() throws Throwable {
1285         try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow())) {
1286             var results = scope.join();
1287             assertTrue(results.isEmpty());
1288         }
1289     }
1290 
1291     /**
1292      * Test Joiner.allSuccessfulOrThrow() with subtasks that complete successfully.
1293      */
1294     @ParameterizedTest
1295     @MethodSource("factories")
1296     void testAllSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
1297         try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1298                 cf -> cf.withThreadFactory(factory))) {
1299             scope.fork(() -> "foo");
1300             scope.fork(() -> "bar");
1301             var results = scope.join();
1302             assertEquals(List.of("foo", "bar"), results);


1303         }
1304     }
1305 
1306     /**
1307      * Test Joiner.allSuccessfulOrThrow() with a subtask that complete successfully and
1308      * a subtask that fails.
1309      */
1310     @ParameterizedTest
1311     @MethodSource("factories")
1312     void testAllSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1313         try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1314                 cf -> cf.withThreadFactory(factory))) {
1315             scope.fork(() -> "foo");
1316             scope.fork(() -> { throw new FooException(); });
1317             try {
1318                 scope.join();
1319             } catch (FailedException e) {
1320                 assertTrue(e.getCause() instanceof FooException);
1321             }
1322         }
1323     }
1324 
1325     /**
1326      * Test Joiner.allSuccessfulOrThrow() with a timeout.
1327      */
1328     @Test
1329     void testAllSuccessfulOrThrow4() throws Exception {
1330         try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1331                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1332             scope.fork(() -> "foo");
1333             scope.fork(() -> {
1334                 Thread.sleep(Duration.ofDays(1));
1335                 return "bar";
1336             });
1337             assertThrows(TimeoutException.class, scope::join);
1338 
1339             // retry after join throws TimeoutException
1340             assertThrows(IllegalStateException.class, scope::join);
1341         }
1342     }
1343 
1344     /**
1345      * Test Joiner.allSuccessfulOrThrow() yields an unmodifiable list.
1346      */
1347     @Test
1348     void testAllSuccessfulOrThrow5() throws Exception {
1349         // empty list
1350         try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow())) {
1351             var results = scope.join();
1352             assertEquals(0, results.size());
1353             assertThrows(UnsupportedOperationException.class, () -> results.add("foo"));
1354         }
1355 
1356         // non-empty list
1357         try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow())) {
1358             scope.fork(() -> "foo");
1359             var results = scope.join();
1360             assertEquals(1, results.size());
1361             assertThrows(UnsupportedOperationException.class, () -> results.add("foo"));
1362             assertThrows(UnsupportedOperationException.class, () -> results.add("bar"));
1363         }
1364     }
1365 
1366     /**
1367      * Test Joiner.anySuccessfulOrThrow() with no subtasks.
1368      */
1369     @Test
1370     void testAnySuccessfulOrThrow1() throws Exception {
1371         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow())) {
1372             try {
1373                 scope.join();
1374             } catch (FailedException e) {
1375                 assertTrue(e.getCause() instanceof NoSuchElementException);
1376             }
1377         }
1378     }
1379 
1380     /**
1381      * Test Joiner.anySuccessfulOrThrow() with a subtask that completes successfully.
1382      */
1383     @ParameterizedTest
1384     @MethodSource("factories")
1385     void testAnySuccessfulOrThrow2(ThreadFactory factory) throws Exception {
1386         try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulOrThrow(),
1387                 cf -> cf.withThreadFactory(factory))) {
1388             scope.fork(() -> "foo");
1389             String result = scope.join();
1390             assertEquals("foo", result);
1391         }
1392     }
1393 
1394     /**
1395      * Test Joiner.anySuccessfulOrThrow() with a subtask that completes successfully
1396      * with a null result.
1397      */
1398     @ParameterizedTest
1399     @MethodSource("factories")
1400     void testAnySuccessfulOrThrow3(ThreadFactory factory) throws Exception {
1401         try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulOrThrow(),
1402                 cf -> cf.withThreadFactory(factory))) {
1403             scope.fork(() -> null);
1404             String result = scope.join();
1405             assertNull(result);
1406         }
1407     }
1408 
1409     /**
1410      * Test Joiner.anySuccessfulOrThrow() with a subtask that complete succcessfully
1411      * and a subtask that fails.
1412      */
1413     @ParameterizedTest
1414     @MethodSource("factories")
1415     void testAnySuccessfulOrThrow4(ThreadFactory factory) throws Exception {
1416         try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulOrThrow(),
1417                 cf -> cf.withThreadFactory(factory))) {
1418             scope.fork(() -> "foo");
1419             scope.fork(() -> { throw new FooException(); });
1420             String first = scope.join();
1421             assertEquals("foo", first);
1422         }
1423     }
1424 
1425     /**
1426      * Test Joiner.anySuccessfulOrThrow() with a subtask that fails.
1427      */
1428     @ParameterizedTest
1429     @MethodSource("factories")
1430     void testAnySuccessfulOrThrow5(ThreadFactory factory) throws Exception {
1431         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow(),
1432                 cf -> cf.withThreadFactory(factory))) {
1433             scope.fork(() -> { throw new FooException(); });
1434             Throwable ex = assertThrows(FailedException.class, scope::join);
1435             assertTrue(ex.getCause() instanceof FooException);
1436         }
1437     }
1438 
1439     /**
1440      * Test Joiner.anySuccessfulOrThrow() with a timeout.
1441      */
1442     @Test
1443     void anySuccessfulOrThrow6() throws Exception {
1444         try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulOrThrow(),
1445                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1446             scope.fork(() -> { throw new FooException(); });
1447             scope.fork(() -> {
1448                 Thread.sleep(Duration.ofDays(1));
1449                 return "bar";
1450             });
1451             assertThrows(TimeoutException.class, scope::join);
1452 
1453             // retry after join throws TimeoutException
1454             assertThrows(IllegalStateException.class, scope::join);
1455         }
1456     }
1457 
1458     /**
1459      * Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks.
1460      */
1461     @Test
1462     void testAwaitSuccessfulOrThrow1() throws Throwable {
1463         try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {
1464             var result = scope.join();
1465             assertNull(result);
1466         }
1467     }
1468 
1469     /**
1470      * Test Joiner.awaitAllSuccessfulOrThrow() with subtasks that complete successfully.
1471      */
1472     @ParameterizedTest
1473     @MethodSource("factories")
1474     void testAwaitSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
1475         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1476                 cf -> cf.withThreadFactory(factory))) {
1477             var subtask1 = scope.fork(() -> "foo");

1485 
1486     /**
1487      * Test Joiner.awaitAllSuccessfulOrThrow() with a subtask that complete successfully and
1488      * a subtask that fails.
1489      */
1490     @ParameterizedTest
1491     @MethodSource("factories")
1492     void testAwaitSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1493         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1494                 cf -> cf.withThreadFactory(factory))) {
1495             scope.fork(() -> "foo");
1496             scope.fork(() -> { throw new FooException(); });
1497             try {
1498                 scope.join();
1499             } catch (FailedException e) {
1500                 assertTrue(e.getCause() instanceof FooException);
1501             }
1502         }
1503     }
1504 
1505     /**
1506      * Test Joiner.awaitAllSuccessfulOrThrow() with a timeout.
1507      */
1508     @Test
1509     void testAwaitSuccessfulOrThrow4() throws Exception {
1510         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1511                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1512             scope.fork(() -> "foo");
1513             scope.fork(() -> {
1514                 Thread.sleep(Duration.ofDays(1));
1515                 return "bar";
1516             });
1517             assertThrows(TimeoutException.class, scope::join);
1518 
1519             // retry after join throws TimeoutException
1520             assertThrows(IllegalStateException.class, scope::join);
1521         }
1522     }
1523 
1524     /**
1525      * Test Joiner.awaitAll() with no subtasks.
1526      */
1527     @Test
1528     void testAwaitAll1() throws Throwable {
1529         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1530             var result = scope.join();
1531             assertNull(result);
1532         }
1533     }
1534 
1535     /**
1536      * Test Joiner.awaitAll() with subtasks that complete successfully.
1537      */
1538     @ParameterizedTest
1539     @MethodSource("factories")
1540     void testAwaitAll2(ThreadFactory factory) throws Throwable {
1541         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1542                 cf -> cf.withThreadFactory(factory))) {
1543             var subtask1 = scope.fork(() -> "foo");

1550     }
1551 
1552     /**
1553      * Test Joiner.awaitAll() with a subtask that complete successfully and a subtask
1554      * that fails.
1555      */
1556     @ParameterizedTest
1557     @MethodSource("factories")
1558     void testAwaitAll3(ThreadFactory factory) throws Throwable {
1559         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1560                 cf -> cf.withThreadFactory(factory))) {
1561             var subtask1 = scope.fork(() -> "foo");
1562             var subtask2 = scope.fork(() -> { throw new FooException(); });
1563             var result = scope.join();
1564             assertNull(result);
1565             assertEquals("foo", subtask1.get());
1566             assertTrue(subtask2.exception() instanceof FooException);
1567         }
1568     }
1569 
1570     /**
1571      * Test Joiner.awaitAll() with a timeout.
1572      */
1573     @Test
1574     void testAwaitAll4() throws Exception {
1575         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1576                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1577             scope.fork(() -> "foo");
1578             scope.fork(() -> {
1579                 Thread.sleep(Duration.ofDays(1));
1580                 return "bar";
1581             });
1582             assertThrows(TimeoutException.class, scope::join);
1583 
1584             // retry after join throws TimeoutException
1585             assertThrows(IllegalStateException.class, scope::join);
1586         }
1587     }
1588 
1589     /**
1590      * Test Joiner.allUntil(Predicate) with no subtasks.
1591      */
1592     @Test
1593     void testAllUntil1() throws Throwable {
1594         try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) {
1595             var subtasks = scope.join();
1596             assertEquals(0, subtasks.size());
1597         }
1598     }
1599 
1600     /**
1601      * Test Joiner.allUntil(Predicate) with no cancellation.
1602      */
1603     @ParameterizedTest
1604     @MethodSource("factories")
1605     void testAllUntil2(ThreadFactory factory) throws Exception {
1606         try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false),
1607                 cf -> cf.withThreadFactory(factory))) {
1608 
1609             var subtask1 = scope.fork(() -> "foo");
1610             var subtask2 = scope.fork(() -> { throw new FooException(); });
1611 
1612             var subtasks = scope.join();
1613             assertEquals(List.of(subtask1, subtask2), subtasks);
1614 


1615             assertEquals("foo", subtask1.get());
1616             assertTrue(subtask2.exception() instanceof FooException);
1617         }
1618     }
1619 
1620     /**
1621      * Test Joiner.allUntil(Predicate) with cancellation after one subtask completes.
1622      */
1623     @ParameterizedTest
1624     @MethodSource("factories")
1625     void testAllUntil3(ThreadFactory factory) throws Exception {
1626         try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> true),
1627                 cf -> cf.withThreadFactory(factory))) {
1628 
1629             var subtask1 = scope.fork(() -> "foo");
1630             var subtask2 = scope.fork(() -> {
1631                 Thread.sleep(Duration.ofDays(1));
1632                 return "bar";
1633             });
1634 
1635             var subtasks = scope.join();
1636             assertEquals(List.of(subtask1, subtask2), subtasks);
1637 



1638             assertEquals("foo", subtask1.get());
1639             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1640         }
1641     }
1642 
1643     /**
1644      * Test Joiner.allUntil(Predicate) with cancellation after serveral subtasks complete.
1645      */
1646     @ParameterizedTest
1647     @MethodSource("factories")
1648     void testAllUntil4(ThreadFactory factory) throws Exception {
1649 
1650         // cancel execution after two or more failures
1651         class CancelAfterTwoFailures<T> implements Predicate<Subtask<T>> {
1652             final AtomicInteger failedCount = new AtomicInteger();
1653             @Override
1654             public boolean test(Subtask<T> subtask) {
1655                 return subtask.state() == Subtask.State.FAILED
1656                         && failedCount.incrementAndGet() >= 2;
1657             }
1658         }
1659         var joiner = Joiner.allUntil(new CancelAfterTwoFailures<String>());
1660 
1661         try (var scope = StructuredTaskScope.open(joiner)) {
1662             int forkCount = 0;
1663 
1664             // fork subtasks until execution cancelled
1665             while (!scope.isCancelled()) {
1666                 scope.fork(() -> "foo");
1667                 scope.fork(() -> { throw new FooException(); });
1668                 forkCount += 2;
1669                 Thread.sleep(Duration.ofMillis(20));
1670             }
1671 
1672             var subtasks = scope.join();
1673             assertEquals(forkCount, subtasks.size());
1674 
1675             long failedCount = subtasks.stream()
1676                     .filter(s -> s.state() == Subtask.State.FAILED)
1677                     .count();
1678             assertTrue(failedCount >= 2);
1679         }
1680     }
1681 
1682     /**
1683      * Test Test Joiner.allUntil(Predicate) where the Predicate's test method throws.
1684      */
1685     @Test
1686     void testAllUntil5() throws Exception {
1687         var joiner = Joiner.allUntil(_ -> { throw new FooException(); });
1688         var excRef = new AtomicReference<Throwable>();
1689         Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e);
1690         ThreadFactory factory = Thread.ofVirtual()
1691                 .uncaughtExceptionHandler(uhe)
1692                 .factory();
1693         try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
1694             scope.fork(() -> "foo");
1695             scope.join();
1696             assertInstanceOf(FooException.class, excRef.get());
1697         }
1698     }
1699 
1700     /**
1701      * Test Joiner.allUntil(Predicate) with a timeout.
1702      */
1703     @Test
1704     void testAllUntil6() throws Exception {
1705         try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false),
1706                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
1707             var subtask1 = scope.fork(() -> "foo");
1708             var subtask2 = scope.fork(() -> {
1709                 Thread.sleep(Duration.ofDays(1));
1710                 return "bar";
1711             });
1712 
1713             // TimeoutException should not be thrown
1714             var subtasks = scope.join();
1715 
1716             // stream should have two elements, subtask1 may or may not have completed
1717             assertEquals(List.of(subtask1, subtask2), subtasks);
1718             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1719 
1720             // retry after join throws TimeoutException
1721             assertThrows(IllegalStateException.class, scope::join);
1722         }
1723     }
1724 
1725     /**
1726      * Test Joiner.allUntil(Predicate) yields an unmodifiable list.
1727      */
1728     @Test
1729     void testAllUntil7() throws Exception {
1730         Subtask<String> subtask1;
1731         try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false))) {
1732             subtask1 = scope.fork(() -> "?");
1733             scope.join();
1734         }
1735 
1736         // empty list
1737         try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false))) {
1738             var subtasks = scope.join();
1739             assertEquals(0, subtasks.size());
1740             assertThrows(UnsupportedOperationException.class, () -> subtasks.add(subtask1));
1741         }
1742 
1743         // non-empty list
1744         try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false))) {
1745             var subtask2 = scope.fork(() -> "foo");
1746             var subtasks = scope.join();
1747             assertEquals(1, subtasks.size());
1748             assertThrows(UnsupportedOperationException.class, () -> subtasks.add(subtask1));
1749             assertThrows(UnsupportedOperationException.class, () -> subtasks.add(subtask2));
1750         }
1751     }
1752 
1753     /**
1754      * Test Joiner default methods.
1755      */
1756     @Test
1757     void testJoinerDefaultMethods() throws Exception {
1758         try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) {
1759 
1760             // need subtasks to test default methods
1761             var subtask1 = scope.fork(() -> "foo");
1762             awaitCancelled(scope);
1763             var subtask2 = scope.fork(() -> "bar");
1764             scope.join();
1765 
1766             assertEquals(Subtask.State.SUCCESS, subtask1.state());
1767             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1768 
1769             // Joiner that does not override default methods
1770             Joiner<String, Void> joiner = () -> null;
1771             assertThrows(NullPointerException.class, () -> joiner.onFork(null));
1772             assertThrows(NullPointerException.class, () -> joiner.onComplete(null));
1773             assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1));
1774             assertFalse(joiner.onFork(subtask2));
1775             assertFalse(joiner.onComplete(subtask1));
1776             assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2));
1777             assertThrows(TimeoutException.class, joiner::onTimeout);
1778         }
1779     }
1780 
1781     /**
1782      * Test Joiners onFork/onComplete methods with a subtask in an unexpected state.
1783      */
1784     @Test
1785     void testJoinersWithUnavailableResult() throws Exception {
1786         try (var scope = StructuredTaskScope.open()) {
1787             var done = new CountDownLatch(1);
1788             var subtask = scope.fork(() -> {
1789                 done.await();
1790                 return null;
1791             });
1792 
1793             // onComplete with uncompleted task should throw IAE
1794             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1795             assertThrows(IllegalArgumentException.class,
1796                     () -> Joiner.allSuccessfulOrThrow().onComplete(subtask));
1797             assertThrows(IllegalArgumentException.class,
1798                     () -> Joiner.anySuccessfulOrThrow().onComplete(subtask));
1799             assertThrows(IllegalArgumentException.class,
1800                     () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(subtask));
1801             assertThrows(IllegalArgumentException.class,
1802                     () -> Joiner.awaitAll().onComplete(subtask));
1803             assertThrows(IllegalArgumentException.class,
1804                     () -> Joiner.allUntil(_ -> false).onComplete(subtask));
1805 
1806             done.countDown();
1807             scope.join();
1808 
1809             // onFork with completed task should throw IAE
1810             assertEquals(Subtask.State.SUCCESS, subtask.state());
1811             assertThrows(IllegalArgumentException.class,
1812                     () -> Joiner.allSuccessfulOrThrow().onFork(subtask));
1813             assertThrows(IllegalArgumentException.class,
1814                     () -> Joiner.anySuccessfulOrThrow().onFork(subtask));
1815             assertThrows(IllegalArgumentException.class,
1816                     () -> Joiner.awaitAllSuccessfulOrThrow().onFork(subtask));
1817             assertThrows(IllegalArgumentException.class,
1818                     () -> Joiner.awaitAll().onFork(subtask));
1819             assertThrows(IllegalArgumentException.class,
1820                     () -> Joiner.allUntil(_ -> false).onFork(subtask));
1821         }
1822 
1823     }
1824 
1825     /**
1826      * Test the Configuration function apply method throwing an exception.
1827      */
1828     @Test
1829     void testConfigFunctionThrows() throws Exception {
1830         assertThrows(FooException.class,
1831                 () -> StructuredTaskScope.open(Joiner.awaitAll(),
1832                                                cf -> { throw new FooException(); }));
1833     }
1834 
1835     /**
1836      * Test Configuration equals/hashCode/toString
1837      */
1838     @Test
1839     void testConfigMethods() throws Exception {
1840         UnaryOperator<Configuration> configOperator = cf -> {
1841             var name = "duke";
1842             var threadFactory = Thread.ofPlatform().factory();
1843             var timeout = Duration.ofSeconds(10);
1844 
1845             assertEquals(cf, cf);
1846             assertEquals(cf.withName(name), cf.withName(name));
1847             assertEquals(cf.withThreadFactory(threadFactory), cf.withThreadFactory(threadFactory));
1848             assertEquals(cf.withTimeout(timeout), cf.withTimeout(timeout));
1849 
1850             assertNotEquals(cf, cf.withName(name));
1851             assertNotEquals(cf, cf.withThreadFactory(threadFactory));
1852             assertNotEquals(cf, cf.withTimeout(timeout));
1853 
1854             assertEquals(cf.withName(name).hashCode(), cf.withName(name).hashCode());
1855             assertEquals(cf.withThreadFactory(threadFactory).hashCode(),
1856                     cf.withThreadFactory(threadFactory).hashCode());
1857             assertEquals(cf.withTimeout(timeout).hashCode(), cf.withTimeout(timeout).hashCode());
1858 
1859             assertTrue(cf.withName(name).toString().contains(name));
1860             assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
1861             assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
1862 
1863             return cf;
1864         };
1865         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), configOperator)) {
1866             // do nothing
1867         }
1868     }
1869 
1870     /**
1871      * Test for NullPointerException.
1872      */
1873     @Test
1874     void testNulls() throws Exception {
1875         assertThrows(NullPointerException.class,
1876                 () -> StructuredTaskScope.open(null));
1877         assertThrows(NullPointerException.class,
1878                 () -> StructuredTaskScope.open(null, cf -> cf));
1879         assertThrows(NullPointerException.class,
1880                 () -> StructuredTaskScope.open(Joiner.awaitAll(), null));
1881 
1882         assertThrows(NullPointerException.class, () -> Joiner.allUntil(null));
1883 
1884         // fork
1885         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {

1894                 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withName(null)));
1895         assertThrows(NullPointerException.class,
1896                 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withThreadFactory(null)));
1897         assertThrows(NullPointerException.class,
1898                 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withTimeout(null)));
1899 
1900         // Joiner.onFork/onComplete
1901         assertThrows(NullPointerException.class,
1902                 () -> Joiner.awaitAllSuccessfulOrThrow().onFork(null));
1903         assertThrows(NullPointerException.class,
1904                 () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(null));
1905         assertThrows(NullPointerException.class,
1906                 () -> Joiner.awaitAll().onFork(null));
1907         assertThrows(NullPointerException.class,
1908                 () -> Joiner.awaitAll().onComplete(null));
1909         assertThrows(NullPointerException.class,
1910                 () -> Joiner.allSuccessfulOrThrow().onFork(null));
1911         assertThrows(NullPointerException.class,
1912                 () -> Joiner.allSuccessfulOrThrow().onComplete(null));
1913         assertThrows(NullPointerException.class,
1914                 () -> Joiner.anySuccessfulOrThrow().onFork(null));
1915         assertThrows(NullPointerException.class,
1916                 () -> Joiner.anySuccessfulOrThrow().onComplete(null));
1917     }
1918 
1919     /**
1920      * ThreadFactory that counts usage.
1921      */
1922     private static class CountingThreadFactory implements ThreadFactory {
1923         final ThreadFactory delegate;
1924         final AtomicInteger threadCount = new AtomicInteger();
1925         CountingThreadFactory(ThreadFactory delegate) {
1926             this.delegate = delegate;
1927         }
1928         @Override
1929         public Thread newThread(Runnable task) {
1930             threadCount.incrementAndGet();
1931             return delegate.newThread(task);
1932         }
1933         int threadCount() {
1934             return threadCount.get();
1935         }
1936     }
1937 
1938     /**
1939      * A joiner that counts that counts the number of subtasks that are forked and the
1940      * number of subtasks that complete.
1941      */
1942     private static class CountingJoiner<T> implements Joiner<T, Void> {
1943         final AtomicInteger onForkCount = new AtomicInteger();
1944         final AtomicInteger onCompleteCount = new AtomicInteger();
1945         @Override
1946         public boolean onFork(Subtask<T> subtask) {
1947             onForkCount.incrementAndGet();
1948             return false;
1949         }
1950         @Override
1951         public boolean onComplete(Subtask<T> subtask) {
1952             onCompleteCount.incrementAndGet();
1953             return false;
1954         }
1955         @Override
1956         public Void result() {
1957             return null;
1958         }
1959         int onForkCount() {
1960             return onForkCount.get();
1961         }
1962         int onCompleteCount() {
1963             return onCompleteCount.get();
1964         }
1965     }
1966 
1967     /**
1968      * A joiner that cancels execution when a subtask completes. It also keeps a count
1969      * of the number of subtasks that are forked and the number of subtasks that complete.
1970      */
1971     private static class CancelAfterOneJoiner<T> implements Joiner<T, Void> {
1972         final AtomicInteger onForkCount = new AtomicInteger();
1973         final AtomicInteger onCompleteCount = new AtomicInteger();
1974         @Override
1975         public boolean onFork(Subtask<T> subtask) {
1976             onForkCount.incrementAndGet();
1977             return false;
1978         }
1979         @Override
1980         public boolean onComplete(Subtask<T> subtask) {
1981             onCompleteCount.incrementAndGet();
1982             return true;
1983         }
1984         @Override
1985         public Void result() {
1986             return null;
1987         }
1988         int onForkCount() {
1989             return onForkCount.get();
1990         }
1991         int onCompleteCount() {
1992             return onCompleteCount.get();
1993         }
1994     }
1995 
1996     /**
1997      * A runtime exception for tests.
1998      */
1999     private static class FooException extends RuntimeException {
2000         FooException() { }

2052                 found = true;
2053             } else {
2054                 Thread.sleep(20);
2055             }
2056         }
2057         target.interrupt();
2058     }
2059 
2060     /**
2061      * Schedules the current thread to be interrupted when it waits (timed or untimed)
2062      * at the given location.
2063      */
2064     private void scheduleInterruptAt(String location) {
2065         Thread target = Thread.currentThread();
2066         scheduler.submit(() -> {
2067             interruptThreadAt(target, location);
2068             return null;
2069         });
2070     }
2071 
2072     /**
2073      * Calls a result returning task from another thread.
2074      */
2075     private <V> V callInOtherThread(Callable<V> task) throws Exception {
2076         var result = new AtomicReference<V>();
2077         var exc = new AtomicReference<Exception>();
2078         Thread thread = Thread.ofVirtual().start(() -> {
2079             try {
2080                 result.set(task.call());
2081             } catch (Exception e) {
2082                 exc.set(e);
2083             }
2084         });
2085         boolean interrupted = false;
2086         boolean terminated = false;
2087         while (!terminated) {
2088             try {
2089                 thread.join();
2090                 terminated = true;
2091             } catch (InterruptedException e) {
2092                 interrupted = true;
2093             }
2094         }
2095         if (interrupted) {
2096             Thread.currentThread().interrupt();
2097         }
2098         Exception e = exc.get();
2099         if (e != null) {
2100             throw e;
2101         } else {
2102             return result.get();
2103         }
2104     }
2105 
2106     /**
2107      * Returns true if the given stack trace contains an element for the given class
2108      * and method name.
2109      */
2110     private boolean contains(StackTraceElement[] stack, String className, String methodName) {
2111         return Arrays.stream(stack)
2112                 .anyMatch(e -> className.equals(e.getClassName())
2113                         && methodName.equals(e.getMethodName()));
2114     }
2115 }
< prev index next >