1 /*
   2  * Copyright (c) 2021, 2025, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /*
  25  * @test id=platform
  26  * @bug 8284199 8296779 8306647
  27  * @summary Basic tests for StructuredTaskScope
  28  * @enablePreview
  29  * @run junit/othervm -DthreadFactory=platform StructuredTaskScopeTest
  30  */
  31 
  32 /*
  33  * @test id=virtual
  34  * @enablePreview
  35  * @run junit/othervm -DthreadFactory=virtual StructuredTaskScopeTest
  36  */
  37 
  38 import java.time.Duration;
  39 import java.util.Arrays;
  40 import java.util.ArrayList;
  41 import java.util.List;
  42 import java.util.Set;
  43 import java.util.NoSuchElementException;
  44 import java.util.concurrent.Callable;
  45 import java.util.concurrent.ConcurrentHashMap;
  46 import java.util.concurrent.CountDownLatch;
  47 import java.util.concurrent.Executors;
  48 import java.util.concurrent.Future;
  49 import java.util.concurrent.LinkedTransferQueue;
  50 import java.util.concurrent.ThreadFactory;
  51 import java.util.concurrent.TimeUnit;
  52 import java.util.concurrent.RejectedExecutionException;
  53 import java.util.concurrent.ScheduledExecutorService;
  54 import java.util.concurrent.StructuredTaskScope;
  55 import java.util.concurrent.StructuredTaskScope.TimeoutException;
  56 import java.util.concurrent.StructuredTaskScope.Configuration;
  57 import java.util.concurrent.StructuredTaskScope.FailedException;
  58 import java.util.concurrent.StructuredTaskScope.Joiner;
  59 import java.util.concurrent.StructuredTaskScope.Subtask;
  60 import java.util.concurrent.StructureViolationException;
  61 import java.util.concurrent.atomic.AtomicBoolean;
  62 import java.util.concurrent.atomic.AtomicInteger;
  63 import java.util.concurrent.atomic.AtomicReference;
  64 import java.util.function.Function;
  65 import java.util.function.Predicate;
  66 import java.util.stream.Stream;
  67 import static java.lang.Thread.State.*;
  68 
  69 import org.junit.jupiter.api.Test;
  70 import org.junit.jupiter.api.BeforeAll;
  71 import org.junit.jupiter.api.AfterAll;
  72 import org.junit.jupiter.params.ParameterizedTest;
  73 import org.junit.jupiter.params.provider.MethodSource;
  74 import static org.junit.jupiter.api.Assertions.*;
  75 
  76 class StructuredTaskScopeTest {
  77     private static ScheduledExecutorService scheduler;
  78     private static List<ThreadFactory> threadFactories;
  79 
  80     @BeforeAll
  81     static void setup() throws Exception {
  82         scheduler = Executors.newSingleThreadScheduledExecutor();
  83 
  84         // thread factories
  85         String value = System.getProperty("threadFactory");
  86         List<ThreadFactory> list = new ArrayList<>();
  87         if (value == null || value.equals("platform"))
  88             list.add(Thread.ofPlatform().factory());
  89         if (value == null || value.equals("virtual"))
  90             list.add(Thread.ofVirtual().factory());
  91         assertTrue(list.size() > 0, "No thread factories for tests");
  92         threadFactories = list;
  93     }
  94 
  95     @AfterAll
  96     static void shutdown() {
  97         scheduler.shutdown();
  98     }
  99 
 100     private static Stream<ThreadFactory> factories() {
 101         return threadFactories.stream();
 102     }
 103 
 104     /**
 105      * Test that fork creates virtual threads when no ThreadFactory is configured.
 106      */
 107     @Test
 108     void testForkCreatesVirtualThread() throws Exception {
 109         Set<Thread> threads = ConcurrentHashMap.newKeySet();
 110         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
 111             for (int i = 0; i < 50; i++) {
 112                 // runnable
 113                 scope.fork(() -> {
 114                     threads.add(Thread.currentThread());
 115                 });
 116 
 117                 // callable
 118                 scope.fork(() -> {
 119                     threads.add(Thread.currentThread());
 120                     return null;
 121                 });
 122             }
 123             scope.join();
 124         }
 125         assertEquals(100, threads.size());
 126         threads.forEach(t -> assertTrue(t.isVirtual()));
 127     }
 128 
 129     /**
 130      * Test that fork create threads with the configured ThreadFactory.
 131      */
 132     @ParameterizedTest
 133     @MethodSource("factories")
 134     void testForkUsesThreadFactory(ThreadFactory factory) throws Exception {
 135         // TheadFactory that keeps reference to all threads it creates
 136         class RecordingThreadFactory implements ThreadFactory {
 137             final ThreadFactory delegate;
 138             final Set<Thread> threads = ConcurrentHashMap.newKeySet();
 139             RecordingThreadFactory(ThreadFactory delegate) {
 140                 this.delegate = delegate;
 141             }
 142             @Override
 143             public Thread newThread(Runnable task) {
 144                 Thread thread = delegate.newThread(task);
 145                 threads.add(thread);
 146                 return thread;
 147             }
 148             Set<Thread> threads() {
 149                 return threads;
 150             }
 151         }
 152         var recordingThreadFactory = new RecordingThreadFactory(factory);
 153         Set<Thread> threads = ConcurrentHashMap.newKeySet();
 154         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 155                 cf -> cf.withThreadFactory(recordingThreadFactory))) {
 156 
 157             for (int i = 0; i < 50; i++) {
 158                 // runnable
 159                 scope.fork(() -> {
 160                     threads.add(Thread.currentThread());
 161                 });
 162 
 163                 // callable
 164                 scope.fork(() -> {
 165                     threads.add(Thread.currentThread());
 166                     return null;
 167                 });
 168             }
 169             scope.join();
 170         }
 171         assertEquals(100, threads.size());
 172         assertEquals(recordingThreadFactory.threads(), threads);
 173     }
 174 
 175     /**
 176      * Test fork method is owner confined.
 177      */
 178     @ParameterizedTest
 179     @MethodSource("factories")
 180     void testForkConfined(ThreadFactory factory) throws Exception {
 181         try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
 182                 cf -> cf.withThreadFactory(factory))) {
 183 
 184             // random thread cannot fork
 185             try (var pool = Executors.newSingleThreadExecutor()) {
 186                 Future<Void> future = pool.submit(() -> {
 187                     assertThrows(WrongThreadException.class, () -> {
 188                         scope.fork(() -> null);
 189                     });
 190                     return null;
 191                 });
 192                 future.get();
 193             }
 194 
 195             // subtask cannot fork
 196             Subtask<Boolean> subtask = scope.fork(() -> {
 197                 assertThrows(WrongThreadException.class, () -> {
 198                     scope.fork(() -> null);
 199                 });
 200                 return true;
 201             });
 202             scope.join();
 203             assertTrue(subtask.get());
 204         }
 205     }
 206 
 207     /**
 208      * Test fork after join, no subtasks forked before join.
 209      */
 210     @ParameterizedTest
 211     @MethodSource("factories")
 212     void testForkAfterJoin1(ThreadFactory factory) throws Exception {
 213         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 214                 cf -> cf.withThreadFactory(factory))) {
 215             scope.join();
 216             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 217         }
 218     }
 219 
 220     /**
 221      * Test fork after join, subtasks forked before join.
 222      */
 223     @ParameterizedTest
 224     @MethodSource("factories")
 225     void testForkAfterJoin2(ThreadFactory factory) throws Exception {
 226         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 227                 cf -> cf.withThreadFactory(factory))) {
 228             scope.fork(() -> "foo");
 229             scope.join();
 230             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 231         }
 232     }
 233 
 234     /**
 235      * Test fork after join throws.
 236      */
 237     @ParameterizedTest
 238     @MethodSource("factories")
 239     void testForkAfterJoinThrows(ThreadFactory factory) throws Exception {
 240         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 241                 cf -> cf.withThreadFactory(factory))) {
 242             var latch = new CountDownLatch(1);
 243             var subtask1 = scope.fork(() -> {
 244                 latch.await();
 245                 return "foo";
 246             });
 247 
 248             // join throws
 249             Thread.currentThread().interrupt();
 250             assertThrows(InterruptedException.class, scope::join);
 251 
 252             // fork should throw
 253             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 254         }
 255     }
 256 
 257     /**
 258      * Test fork after task scope is cancelled. This test uses a custom Joiner to
 259      * cancel execution.
 260      */
 261     @ParameterizedTest
 262     @MethodSource("factories")
 263     void testForkAfterCancel2(ThreadFactory factory) throws Exception {
 264         var countingThreadFactory = new CountingThreadFactory(factory);
 265         var testJoiner = new CancelAfterOneJoiner<String>();
 266 
 267         try (var scope = StructuredTaskScope.open(testJoiner,
 268                 cf -> cf.withThreadFactory(countingThreadFactory))) {
 269 
 270             // fork subtask, the scope should be cancelled when the subtask completes
 271             var subtask1 = scope.fork(() -> "foo");
 272             awaitCancelled(scope);
 273 
 274             assertEquals(1, countingThreadFactory.threadCount());
 275             assertEquals(1, testJoiner.onForkCount());
 276             assertEquals(1, testJoiner.onCompleteCount());
 277 
 278             // fork second subtask, it should not run
 279             var subtask2 = scope.fork(() -> "bar");
 280 
 281             // onFork should be invoked, newThread and onComplete should not be invoked
 282             assertEquals(1, countingThreadFactory.threadCount());
 283             assertEquals(2, testJoiner.onForkCount());
 284             assertEquals(1, testJoiner.onCompleteCount());
 285 
 286             scope.join();
 287 
 288             assertEquals(1, countingThreadFactory.threadCount());
 289             assertEquals(2, testJoiner.onForkCount());
 290             assertEquals(1, testJoiner.onCompleteCount());
 291             assertEquals("foo", subtask1.get());
 292             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
 293         }
 294     }
 295 
 296     /**
 297      * Test fork after task scope is closed.
 298      */
 299     @Test
 300     void testForkAfterClose() {
 301         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
 302             scope.close();
 303             assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
 304         }
 305     }
 306 
 307     /**
 308      * Test fork with a ThreadFactory that rejects creating a thread.
 309      */
 310     @Test
 311     void testForkRejectedExecutionException() {
 312         ThreadFactory factory = task -> null;
 313         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 314                 cf -> cf.withThreadFactory(factory))) {
 315             assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null));
 316         }
 317     }
 318 
 319     /**
 320      * Test join with no subtasks.
 321      */
 322     @Test
 323     void testJoinWithNoSubtasks() throws Exception {
 324         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
 325             scope.join();
 326         }
 327     }
 328 
 329     /**
 330      * Test join with a remaining subtask.
 331      */
 332     @ParameterizedTest
 333     @MethodSource("factories")
 334     void testJoinWithRemainingSubtasks(ThreadFactory factory) throws Exception {
 335         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 336                 cf -> cf.withThreadFactory(factory))) {
 337             Subtask<String> subtask = scope.fork(() -> {
 338                 Thread.sleep(Duration.ofMillis(100));
 339                 return "foo";
 340             });
 341             scope.join();
 342             assertEquals("foo", subtask.get());
 343         }
 344     }
 345 
 346     /**
 347      * Test join after join completed with a result.
 348      */
 349     @Test
 350     void testJoinAfterJoin1() throws Exception {
 351         var results = new LinkedTransferQueue<>(List.of("foo", "bar", "baz"));
 352         Joiner<Object, String> joiner = results::take;
 353         try (var scope = StructuredTaskScope.open(joiner)) {
 354             scope.fork(() -> "foo");
 355             assertEquals("foo", scope.join());
 356 
 357             // join already called
 358             for (int i = 0 ; i < 3; i++) {
 359                 assertThrows(IllegalStateException.class, scope::join);
 360             }
 361         }
 362     }
 363 
 364     /**
 365      * Test join after join completed with an exception.
 366      */
 367     @Test
 368     void testJoinAfterJoin2() throws Exception {
 369         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
 370             scope.fork(() -> { throw new FooException(); });
 371             Throwable ex = assertThrows(FailedException.class, scope::join);
 372             assertTrue(ex.getCause() instanceof FooException);
 373 
 374             // join already called
 375             for (int i = 0 ; i < 3; i++) {
 376                 assertThrows(IllegalStateException.class, scope::join);
 377             }
 378         }
 379     }
 380 
 381     /**
 382      * Test join after join completed with a timeout.
 383      */
 384     @Test
 385     void testJoinAfterJoin3() throws Exception {
 386         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
 387                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
 388             // wait for scope to be cancelled by timeout
 389             awaitCancelled(scope);
 390             assertThrows(TimeoutException.class, scope::join);
 391 
 392             // join already called
 393             for (int i = 0 ; i < 3; i++) {
 394                 assertThrows(IllegalStateException.class, scope::join);
 395             }
 396         }
 397     }
 398 
 399     /**
 400      * Test join method is owner confined.
 401      */
 402     @ParameterizedTest
 403     @MethodSource("factories")
 404     void testJoinConfined(ThreadFactory factory) throws Exception {
 405         try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
 406                 cf -> cf.withThreadFactory(factory))) {
 407 
 408             // random thread cannot join
 409             try (var pool = Executors.newSingleThreadExecutor()) {
 410                 Future<Void> future = pool.submit(() -> {
 411                     assertThrows(WrongThreadException.class, scope::join);
 412                     return null;
 413                 });
 414                 future.get();
 415             }
 416 
 417             // subtask cannot join
 418             Subtask<Boolean> subtask = scope.fork(() -> {
 419                 assertThrows(WrongThreadException.class, () -> { scope.join(); });
 420                 return true;
 421             });
 422             scope.join();
 423             assertTrue(subtask.get());
 424         }
 425     }
 426 
 427     /**
 428      * Test join with interrupt status set.
 429      */
 430     @ParameterizedTest
 431     @MethodSource("factories")
 432     void testInterruptJoin1(ThreadFactory factory) throws Exception {
 433         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 434                 cf -> cf.withThreadFactory(factory))) {
 435 
 436             Subtask<String> subtask = scope.fork(() -> {
 437                 Thread.sleep(60_000);
 438                 return "foo";
 439             });
 440 
 441             // join should throw
 442             Thread.currentThread().interrupt();
 443             try {
 444                 scope.join();
 445                 fail("join did not throw");
 446             } catch (InterruptedException expected) {
 447                 assertFalse(Thread.interrupted());   // interrupt status should be cleared
 448             }
 449         }
 450     }
 451 
 452     /**
 453      * Test interrupt of thread blocked in join.
 454      */
 455     @ParameterizedTest
 456     @MethodSource("factories")
 457     void testInterruptJoin2(ThreadFactory factory) throws Exception {
 458         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 459                 cf -> cf.withThreadFactory(factory))) {
 460 
 461             var latch = new CountDownLatch(1);
 462             Subtask<String> subtask = scope.fork(() -> {
 463                 Thread.sleep(60_000);
 464                 return "foo";
 465             });
 466 
 467             // interrupt main thread when it blocks in join
 468             scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join");
 469             try {
 470                 scope.join();
 471                 fail("join did not throw");
 472             } catch (InterruptedException expected) {
 473                 assertFalse(Thread.interrupted());   // interrupt status should be clear
 474             }
 475         }
 476     }
 477 
 478     /**
 479      * Test join when scope is cancelled.
 480      */
 481     @ParameterizedTest
 482     @MethodSource("factories")
 483     void testJoinWhenCancelled(ThreadFactory factory) throws Exception {
 484         var countingThreadFactory = new CountingThreadFactory(factory);
 485         var testJoiner = new CancelAfterOneJoiner<String>();
 486 
 487         try (var scope = StructuredTaskScope.open(testJoiner,
 488                     cf -> cf.withThreadFactory(countingThreadFactory))) {
 489 
 490             // fork subtask, the scope should be cancelled when the subtask completes
 491             var subtask1 = scope.fork(() -> "foo");
 492             awaitCancelled(scope);
 493 
 494             // fork second subtask, it should not run
 495             var subtask2 = scope.fork(() -> {
 496                 Thread.sleep(Duration.ofDays(1));
 497                 return "bar";
 498             });
 499 
 500             scope.join();
 501 
 502             assertEquals("foo", subtask1.get());
 503             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
 504         }
 505     }
 506 
 507     /**
 508      * Test join after scope is closed.
 509      */
 510     @Test
 511     void testJoinAfterClose() throws Exception {
 512         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
 513             scope.close();
 514             assertThrows(IllegalStateException.class, () -> scope.join());
 515         }
 516     }
 517 
 518     /**
 519      * Test join with timeout, subtasks finish before timeout expires.
 520      */
 521     @ParameterizedTest
 522     @MethodSource("factories")
 523     void testJoinWithTimeout1(ThreadFactory factory) throws Exception {
 524         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 525                 cf -> cf.withThreadFactory(factory)
 526                         .withTimeout(Duration.ofDays(1)))) {
 527 
 528             Subtask<String> subtask = scope.fork(() -> {
 529                 Thread.sleep(Duration.ofSeconds(1));
 530                 return "foo";
 531             });
 532 
 533             scope.join();
 534 
 535             assertFalse(scope.isCancelled());
 536             assertEquals("foo", subtask.get());
 537         }
 538     }
 539 
 540     /**
 541      * Test join with timeout, timeout expires before subtasks finish.
 542      */
 543     @ParameterizedTest
 544     @MethodSource("factories")
 545     void testJoinWithTimeout2(ThreadFactory factory) throws Exception {
 546         long startMillis = millisTime();
 547         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 548                 cf -> cf.withThreadFactory(factory)
 549                         .withTimeout(Duration.ofSeconds(2)))) {
 550 
 551             Subtask<Void> subtask = scope.fork(() -> {
 552                 Thread.sleep(Duration.ofDays(1));
 553                 return null;
 554             });
 555 
 556             assertThrows(TimeoutException.class, scope::join);
 557             expectDuration(startMillis, /*min*/1900, /*max*/20_000);
 558 
 559             assertTrue(scope.isCancelled());
 560             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 561         }
 562     }
 563 
 564     /**
 565      * Test join with timeout that has already expired.
 566      */
 567     @ParameterizedTest
 568     @MethodSource("factories")
 569     void testJoinWithTimeout3(ThreadFactory factory) throws Exception {
 570         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 571                 cf -> cf.withThreadFactory(factory)
 572                         .withTimeout(Duration.ofSeconds(-1)))) {
 573 
 574             Subtask<Void> subtask = scope.fork(() -> {
 575                 Thread.sleep(Duration.ofDays(1));
 576                 return null;
 577             });
 578 
 579             assertThrows(TimeoutException.class, scope::join);
 580 
 581             assertTrue(scope.isCancelled());
 582             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 583         }
 584     }
 585 
 586     /**
 587      * Test that cancelling execution interrupts unfinished threads. This test uses
 588      * a custom Joiner to cancel execution.
 589      */
 590     @ParameterizedTest
 591     @MethodSource("factories")
 592     void testCancelInterruptsThreads2(ThreadFactory factory) throws Exception {
 593         var testJoiner = new CancelAfterOneJoiner<String>();
 594 
 595         try (var scope = StructuredTaskScope.open(testJoiner,
 596                 cf -> cf.withThreadFactory(factory))) {
 597 
 598             // fork subtask1 that runs for a long time
 599             var started = new CountDownLatch(1);
 600             var interrupted = new CountDownLatch(1);
 601             var subtask1 = scope.fork(() -> {
 602                 started.countDown();
 603                 try {
 604                     Thread.sleep(Duration.ofDays(1));
 605                 } catch (InterruptedException e) {
 606                     interrupted.countDown();
 607                 }
 608             });
 609             started.await();
 610 
 611             // fork subtask2, the scope should be cancelled when the subtask completes
 612             var subtask2 = scope.fork(() -> "bar");
 613             awaitCancelled(scope);
 614 
 615             // subtask1 should be interrupted
 616             interrupted.await();
 617 
 618             scope.join();
 619             assertEquals(Subtask.State.UNAVAILABLE, subtask1.state());
 620             assertEquals("bar", subtask2.get());
 621         }
 622     }
 623 
 624     /**
 625      * Test that timeout interrupts unfinished threads.
 626      */
 627     @ParameterizedTest
 628     @MethodSource("factories")
 629     void testTimeoutInterruptsThreads(ThreadFactory factory) throws Exception {
 630         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 631                 cf -> cf.withThreadFactory(factory)
 632                         .withTimeout(Duration.ofSeconds(2)))) {
 633 
 634             var started = new AtomicBoolean();
 635             var interrupted = new CountDownLatch(1);
 636             Subtask<Void> subtask = scope.fork(() -> {
 637                 started.set(true);
 638                 try {
 639                     Thread.sleep(Duration.ofDays(1));
 640                 } catch (InterruptedException e) {
 641                     interrupted.countDown();
 642                 }
 643                 return null;
 644             });
 645 
 646             // wait for scope to be cancelled by timeout
 647             awaitCancelled(scope);
 648 
 649             // if subtask started then it should be interrupted
 650             if (started.get()) {
 651                 interrupted.await();
 652             }
 653 
 654             assertThrows(TimeoutException.class, scope::join);
 655 
 656             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 657         }
 658     }
 659 
 660     /**
 661      * Test close without join, no subtasks forked.
 662      */
 663     @Test
 664     void testCloseWithoutJoin1() {
 665         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
 666             // do nothing
 667         }
 668     }
 669 
 670     /**
 671      * Test close without join, subtasks forked.
 672      */
 673     @ParameterizedTest
 674     @MethodSource("factories")
 675     void testCloseWithoutJoin2(ThreadFactory factory) {
 676         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 677                 cf -> cf.withThreadFactory(factory))) {
 678             Subtask<String> subtask = scope.fork(() -> {
 679                 Thread.sleep(Duration.ofDays(1));
 680                 return null;
 681             });
 682 
 683             // first call to close should throw
 684             assertThrows(IllegalStateException.class, scope::close);
 685 
 686             // subsequent calls to close should not throw
 687             for (int i = 0; i < 3; i++) {
 688                 scope.close();
 689             }
 690 
 691             // subtask result/exception not available
 692             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 693             assertThrows(IllegalStateException.class, subtask::get);
 694             assertThrows(IllegalStateException.class, subtask::exception);
 695         }
 696     }
 697 
 698     /**
 699      * Test close after join throws. Close should not throw as join attempted.
 700      */
 701     @ParameterizedTest
 702     @MethodSource("factories")
 703     void testCloseAfterJoinThrows(ThreadFactory factory) throws Exception {
 704         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 705                 cf -> cf.withThreadFactory(factory))) {
 706             var subtask = scope.fork(() -> {
 707                 Thread.sleep(Duration.ofDays(1));
 708                 return null;
 709             });
 710 
 711             // join throws
 712             Thread.currentThread().interrupt();
 713             assertThrows(InterruptedException.class, scope::join);
 714             assertThrows(IllegalStateException.class, subtask::get);
 715 
 716         }  // close should not throw
 717     }
 718 
 719     /**
 720      * Test close method is owner confined.
 721      */
 722     @ParameterizedTest
 723     @MethodSource("factories")
 724     void testCloseConfined(ThreadFactory factory) throws Exception {
 725         try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
 726                 cf -> cf.withThreadFactory(factory))) {
 727 
 728             // random thread cannot close scope
 729             try (var pool = Executors.newCachedThreadPool(factory)) {
 730                 Future<Boolean> future = pool.submit(() -> {
 731                     assertThrows(WrongThreadException.class, scope::close);
 732                     return null;
 733                 });
 734                 future.get();
 735             }
 736 
 737             // subtask cannot close
 738             Subtask<Boolean> subtask = scope.fork(() -> {
 739                 assertThrows(WrongThreadException.class, scope::close);
 740                 return true;
 741             });
 742             scope.join();
 743             assertTrue(subtask.get());
 744         }
 745     }
 746 
 747     /**
 748      * Test close with interrupt status set.
 749      */
 750     @ParameterizedTest
 751     @MethodSource("factories")
 752     void testInterruptClose1(ThreadFactory factory) throws Exception {
 753         var testJoiner = new CancelAfterOneJoiner<String>();
 754         try (var scope = StructuredTaskScope.open(testJoiner,
 755                 cf -> cf.withThreadFactory(factory))) {
 756 
 757             // fork first subtask, a straggler as it continues after being interrupted
 758             var started = new CountDownLatch(1);
 759             var done = new AtomicBoolean();
 760             scope.fork(() -> {
 761                 started.countDown();
 762                 try {
 763                     Thread.sleep(Duration.ofDays(1));
 764                 } catch (InterruptedException e) {
 765                     // interrupted by cancel, expected
 766                 }
 767                 Thread.sleep(Duration.ofMillis(100)); // force close to wait
 768                 done.set(true);
 769                 return null;
 770             });
 771             started.await();
 772 
 773             // fork second subtask, the scope should be cancelled when this subtask completes
 774             scope.fork(() -> "bar");
 775             awaitCancelled(scope);
 776 
 777             scope.join();
 778 
 779             // invoke close with interrupt status set
 780             Thread.currentThread().interrupt();
 781             try {
 782                 scope.close();
 783             } finally {
 784                 assertTrue(Thread.interrupted());   // clear interrupt status
 785                 assertTrue(done.get());
 786             }
 787         }
 788     }
 789 
 790     /**
 791      * Test interrupting thread waiting in close.
 792      */
 793     @ParameterizedTest
 794     @MethodSource("factories")
 795     void testInterruptClose2(ThreadFactory factory) throws Exception {
 796         var testJoiner = new CancelAfterOneJoiner<String>();
 797         try (var scope = StructuredTaskScope.open(testJoiner,
 798                 cf -> cf.withThreadFactory(factory))) {
 799 
 800             Thread mainThread = Thread.currentThread();
 801 
 802             // fork first subtask, a straggler as it continues after being interrupted
 803             var started = new CountDownLatch(1);
 804             var done = new AtomicBoolean();
 805             scope.fork(() -> {
 806                 started.countDown();
 807                 try {
 808                     Thread.sleep(Duration.ofDays(1));
 809                 } catch (InterruptedException e) {
 810                     // interrupted by cancel, expected
 811                 }
 812 
 813                 // interrupt main thread when it blocks in close
 814                 interruptThreadAt(mainThread, "java.util.concurrent.StructuredTaskScopeImpl.close");
 815 
 816                 Thread.sleep(Duration.ofMillis(100)); // force close to wait
 817                 done.set(true);
 818                 return null;
 819             });
 820             started.await();
 821 
 822             // fork second subtask, the scope should be cancelled when this subtask completes
 823             scope.fork(() -> "bar");
 824             awaitCancelled(scope);
 825 
 826             scope.join();
 827 
 828             // main thread will be interrupted while blocked in close
 829             try {
 830                 scope.close();
 831             } finally {
 832                 assertTrue(Thread.interrupted());   // clear interrupt status
 833                 assertTrue(done.get());
 834             }
 835         }
 836     }
 837 
 838     /**
 839      * Test that closing an enclosing scope closes the thread flock of a nested scope.
 840      */
 841     @Test
 842     void testCloseThrowsStructureViolation() throws Exception {
 843         try (var scope1 = StructuredTaskScope.open(Joiner.awaitAll())) {
 844             try (var scope2 = StructuredTaskScope.open(Joiner.awaitAll())) {
 845 
 846                 // close enclosing scope
 847                 try {
 848                     scope1.close();
 849                     fail("close did not throw");
 850                 } catch (StructureViolationException expected) { }
 851 
 852                 // underlying flock should be closed
 853                 var executed = new AtomicBoolean();
 854                 Subtask<?> subtask = scope2.fork(() -> executed.set(true));
 855                 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 856                 scope2.join();
 857                 assertFalse(executed.get());
 858                 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 859             }
 860         }
 861     }
 862 
 863     /**
 864      * Test that isCancelled returns true after close.
 865      */
 866     @Test
 867     void testIsCancelledAfterClose() throws Exception {
 868         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
 869             assertFalse(scope.isCancelled());
 870             scope.close();
 871             assertTrue(scope.isCancelled());
 872         }
 873     }
 874 
 875     /**
 876      * Test Joiner.onFork throwing exception.
 877      */
 878     @Test
 879     void testOnForkThrows() throws Exception {
 880         var joiner = new Joiner<String, Void>() {
 881             @Override
 882             public boolean onFork(Subtask<? extends String> subtask) {
 883                 throw new FooException();
 884             }
 885             @Override
 886             public Void result() {
 887                 return null;
 888             }
 889         };
 890         try (var scope = StructuredTaskScope.open(joiner)) {
 891             assertThrows(FooException.class, () -> scope.fork(() -> "foo"));
 892         }
 893     }
 894 
 895     /**
 896      * Test Joiner.onFork returning true to cancel execution.
 897      */
 898     @Test
 899     void testOnForkCancelsExecution() throws Exception {
 900         var joiner = new Joiner<String, Void>() {
 901             @Override
 902             public boolean onFork(Subtask<? extends String> subtask) {
 903                 return true;
 904             }
 905             @Override
 906             public Void result() {
 907                 return null;
 908             }
 909         };
 910         try (var scope = StructuredTaskScope.open(joiner)) {
 911             assertFalse(scope.isCancelled());
 912             scope.fork(() -> "foo");
 913             assertTrue(scope.isCancelled());
 914             scope.join();
 915         }
 916     }
 917 
 918     /**
 919      * Test Joiner.onComplete throwing exception causes UHE to be invoked.
 920      */
 921     @Test
 922     void testOnCompleteThrows() throws Exception {
 923         var joiner = new Joiner<String, Void>() {
 924             @Override
 925             public boolean onComplete(Subtask<? extends String> subtask) {
 926                 throw new FooException();
 927             }
 928             @Override
 929             public Void result() {
 930                 return null;
 931             }
 932         };
 933         var excRef = new AtomicReference<Throwable>();
 934         Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e);
 935         ThreadFactory factory = Thread.ofVirtual()
 936                 .uncaughtExceptionHandler(uhe)
 937                 .factory();
 938         try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
 939             scope.fork(() -> "foo");
 940             scope.join();
 941             assertInstanceOf(FooException.class, excRef.get());
 942         }
 943     }
 944 
 945     /**
 946      * Test Joiner.onComplete returning true to cancel execution.
 947      */
 948     @Test
 949     void testOnCompleteCancelsExecution() throws Exception {
 950         var joiner = new Joiner<String, Void>() {
 951             @Override
 952             public boolean onComplete(Subtask<? extends String> subtask) {
 953                 return true;
 954             }
 955             @Override
 956             public Void result() {
 957                 return null;
 958             }
 959         };
 960         try (var scope = StructuredTaskScope.open(joiner)) {
 961             assertFalse(scope.isCancelled());
 962             scope.fork(() -> "foo");
 963             awaitCancelled(scope);
 964             scope.join();
 965         }
 966     }
 967 
 968     /**
 969      * Test toString.
 970      */
 971     @Test
 972     void testToString() throws Exception {
 973         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 974                 cf -> cf.withName("duke"))) {
 975 
 976             // open
 977             assertTrue(scope.toString().contains("duke"));
 978 
 979             // closed
 980             scope.close();
 981             assertTrue(scope.toString().contains("duke"));
 982         }
 983     }
 984 
 985     /**
 986      * Test Subtask with task that completes successfully.
 987      */
 988     @ParameterizedTest
 989     @MethodSource("factories")
 990     void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception {
 991         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
 992                 cf -> cf.withThreadFactory(factory))) {
 993 
 994             Subtask<String> subtask = scope.fork(() -> "foo");
 995 
 996             // before join
 997             assertThrows(IllegalStateException.class, subtask::get);
 998             assertThrows(IllegalStateException.class, subtask::exception);
 999 
1000             scope.join();
1001 
1002             // after join
1003             assertEquals(Subtask.State.SUCCESS, subtask.state());
1004             assertEquals("foo", subtask.get());
1005             assertThrows(IllegalStateException.class, subtask::exception);
1006         }
1007     }
1008 
1009     /**
1010      * Test Subtask with task that fails.
1011      */
1012     @ParameterizedTest
1013     @MethodSource("factories")
1014     void testSubtaskWhenFailed(ThreadFactory factory) throws Exception {
1015         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1016                 cf -> cf.withThreadFactory(factory))) {
1017 
1018             Subtask<String> subtask = scope.fork(() -> { throw new FooException(); });
1019 
1020             // before join
1021             assertThrows(IllegalStateException.class, subtask::get);
1022             assertThrows(IllegalStateException.class, subtask::exception);
1023 
1024             scope.join();
1025 
1026             // after join
1027             assertEquals(Subtask.State.FAILED, subtask.state());
1028             assertThrows(IllegalStateException.class, subtask::get);
1029             assertTrue(subtask.exception() instanceof FooException);
1030         }
1031     }
1032 
1033     /**
1034      * Test Subtask with a task that has not completed.
1035      */
1036     @ParameterizedTest
1037     @MethodSource("factories")
1038     void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception {
1039         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
1040                 cf -> cf.withThreadFactory(factory))) {
1041             Subtask<Void> subtask = scope.fork(() -> {
1042                 Thread.sleep(Duration.ofDays(1));
1043                 return null;
1044             });
1045 
1046             // before join
1047             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1048             assertThrows(IllegalStateException.class, subtask::get);
1049             assertThrows(IllegalStateException.class, subtask::exception);
1050 
1051             // attempt join, join throws
1052             Thread.currentThread().interrupt();
1053             assertThrows(InterruptedException.class, scope::join);
1054 
1055             // after join
1056             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1057             assertThrows(IllegalStateException.class, subtask::get);
1058             assertThrows(IllegalStateException.class, subtask::exception);
1059         }
1060     }
1061 
1062     /**
1063      * Test Subtask forked after execution cancelled.
1064      */
1065     @ParameterizedTest
1066     @MethodSource("factories")
1067     void testSubtaskWhenCancelled(ThreadFactory factory) throws Exception {
1068         try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) {
1069             scope.fork(() -> "foo");
1070             awaitCancelled(scope);
1071 
1072             var subtask = scope.fork(() -> "foo");
1073 
1074             // before join
1075             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1076             assertThrows(IllegalStateException.class, subtask::get);
1077             assertThrows(IllegalStateException.class, subtask::exception);
1078 
1079             scope.join();
1080 
1081             // after join
1082             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1083             assertThrows(IllegalStateException.class, subtask::get);
1084             assertThrows(IllegalStateException.class, subtask::exception);
1085         }
1086     }
1087 
1088     /**
1089      * Test Subtask::toString.
1090      */
1091     @Test
1092     void testSubtaskToString() throws Exception {
1093         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1094             var latch = new CountDownLatch(1);
1095             var subtask1 = scope.fork(() -> {
1096                 latch.await();
1097                 return "foo";
1098             });
1099             var subtask2 = scope.fork(() -> { throw new FooException(); });
1100 
1101             // subtask1 result is unavailable
1102             assertTrue(subtask1.toString().contains("Unavailable"));
1103             latch.countDown();
1104 
1105             scope.join();
1106 
1107             assertTrue(subtask1.toString().contains("Completed successfully"));
1108             assertTrue(subtask2.toString().contains("Failed"));
1109         }
1110     }
1111 
1112     /**
1113      * Test Joiner.allSuccessfulOrThrow() with no subtasks.
1114      */
1115     @Test
1116     void testAllSuccessfulOrThrow1() throws Throwable {
1117         try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow())) {
1118             var subtasks = scope.join().toList();
1119             assertTrue(subtasks.isEmpty());
1120         }
1121     }
1122 
1123     /**
1124      * Test Joiner.allSuccessfulOrThrow() with subtasks that complete successfully.
1125      */
1126     @ParameterizedTest
1127     @MethodSource("factories")
1128     void testAllSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
1129         try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1130                 cf -> cf.withThreadFactory(factory))) {
1131             var subtask1 = scope.fork(() -> "foo");
1132             var subtask2 = scope.fork(() -> "bar");
1133             var subtasks = scope.join().toList();
1134             assertEquals(List.of(subtask1, subtask2), subtasks);
1135             assertEquals("foo", subtask1.get());
1136             assertEquals("bar", subtask2.get());
1137         }
1138     }
1139 
1140     /**
1141      * Test Joiner.allSuccessfulOrThrow() with a subtask that complete successfully and
1142      * a subtask that fails.
1143      */
1144     @ParameterizedTest
1145     @MethodSource("factories")
1146     void testAllSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1147         try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1148                 cf -> cf.withThreadFactory(factory))) {
1149             scope.fork(() -> "foo");
1150             scope.fork(() -> { throw new FooException(); });
1151             try {
1152                 scope.join();
1153             } catch (FailedException e) {
1154                 assertTrue(e.getCause() instanceof FooException);
1155             }
1156         }
1157     }
1158 
1159     /**
1160      * Test Joiner.anySuccessfulResultOrThrow() with no subtasks.
1161      */
1162     @Test
1163     void testAnySuccessfulResultOrThrow1() throws Exception {
1164         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
1165             try {
1166                 scope.join();
1167             } catch (FailedException e) {
1168                 assertTrue(e.getCause() instanceof NoSuchElementException);
1169             }
1170         }
1171     }
1172 
1173     /**
1174      * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully.
1175      */
1176     @ParameterizedTest
1177     @MethodSource("factories")
1178     void testAnySuccessfulResultOrThrow2(ThreadFactory factory) throws Exception {
1179         try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
1180                 cf -> cf.withThreadFactory(factory))) {
1181             scope.fork(() -> "foo");
1182             String result = scope.join();
1183             assertEquals("foo", result);
1184         }
1185     }
1186 
1187     /**
1188      * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully
1189      * with a null result.
1190      */
1191     @ParameterizedTest
1192     @MethodSource("factories")
1193     void testAnySuccessfulResultOrThrow3(ThreadFactory factory) throws Exception {
1194         try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
1195                 cf -> cf.withThreadFactory(factory))) {
1196             scope.fork(() -> null);
1197             String result = scope.join();
1198             assertNull(result);
1199         }
1200     }
1201 
1202     /**
1203      * Test Joiner.anySuccessfulResultOrThrow() with a subtask that complete succcessfully
1204      * and a subtask that fails.
1205      */
1206     @ParameterizedTest
1207     @MethodSource("factories")
1208     void testAnySuccessfulResultOrThrow4(ThreadFactory factory) throws Exception {
1209         try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
1210                 cf -> cf.withThreadFactory(factory))) {
1211             scope.fork(() -> "foo");
1212             scope.fork(() -> { throw new FooException(); });
1213             String first = scope.join();
1214             assertEquals("foo", first);
1215         }
1216     }
1217 
1218     /**
1219      * Test Joiner.anySuccessfulResultOrThrow() with a subtask that fails.
1220      */
1221     @ParameterizedTest
1222     @MethodSource("factories")
1223     void testAnySuccessfulResultOrThrow5(ThreadFactory factory) throws Exception {
1224         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
1225                 cf -> cf.withThreadFactory(factory))) {
1226             scope.fork(() -> { throw new FooException(); });
1227             Throwable ex = assertThrows(FailedException.class, scope::join);
1228             assertTrue(ex.getCause() instanceof FooException);
1229         }
1230     }
1231 
1232     /**
1233      * Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks.
1234      */
1235     @Test
1236     void testAwaitSuccessfulOrThrow1() throws Throwable {
1237         try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {
1238             var result = scope.join();
1239             assertNull(result);
1240         }
1241     }
1242 
1243     /**
1244      * Test Joiner.awaitAllSuccessfulOrThrow() with subtasks that complete successfully.
1245      */
1246     @ParameterizedTest
1247     @MethodSource("factories")
1248     void testAwaitSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
1249         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1250                 cf -> cf.withThreadFactory(factory))) {
1251             var subtask1 = scope.fork(() -> "foo");
1252             var subtask2 = scope.fork(() -> "bar");
1253             var result = scope.join();
1254             assertNull(result);
1255             assertEquals("foo", subtask1.get());
1256             assertEquals("bar", subtask2.get());
1257         }
1258     }
1259 
1260     /**
1261      * Test Joiner.awaitAllSuccessfulOrThrow() with a subtask that complete successfully and
1262      * a subtask that fails.
1263      */
1264     @ParameterizedTest
1265     @MethodSource("factories")
1266     void testAwaitSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1267         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1268                 cf -> cf.withThreadFactory(factory))) {
1269             scope.fork(() -> "foo");
1270             scope.fork(() -> { throw new FooException(); });
1271             try {
1272                 scope.join();
1273             } catch (FailedException e) {
1274                 assertTrue(e.getCause() instanceof FooException);
1275             }
1276         }
1277     }
1278 
1279     /**
1280      * Test Joiner.awaitAll() with no subtasks.
1281      */
1282     @Test
1283     void testAwaitAll1() throws Throwable {
1284         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1285             var result = scope.join();
1286             assertNull(result);
1287         }
1288     }
1289 
1290     /**
1291      * Test Joiner.awaitAll() with subtasks that complete successfully.
1292      */
1293     @ParameterizedTest
1294     @MethodSource("factories")
1295     void testAwaitAll2(ThreadFactory factory) throws Throwable {
1296         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1297                 cf -> cf.withThreadFactory(factory))) {
1298             var subtask1 = scope.fork(() -> "foo");
1299             var subtask2 = scope.fork(() -> "bar");
1300             var result = scope.join();
1301             assertNull(result);
1302             assertEquals("foo", subtask1.get());
1303             assertEquals("bar", subtask2.get());
1304         }
1305     }
1306 
1307     /**
1308      * Test Joiner.awaitAll() with a subtask that complete successfully and a subtask
1309      * that fails.
1310      */
1311     @ParameterizedTest
1312     @MethodSource("factories")
1313     void testAwaitAll3(ThreadFactory factory) throws Throwable {
1314         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1315                 cf -> cf.withThreadFactory(factory))) {
1316             var subtask1 = scope.fork(() -> "foo");
1317             var subtask2 = scope.fork(() -> { throw new FooException(); });
1318             var result = scope.join();
1319             assertNull(result);
1320             assertEquals("foo", subtask1.get());
1321             assertTrue(subtask2.exception() instanceof FooException);
1322         }
1323     }
1324 
1325     /**
1326      * Test Joiner.allUntil(Predicate) with no subtasks.
1327      */
1328     @Test
1329     void testAllUntil1() throws Throwable {
1330         try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) {
1331             var subtasks = scope.join();
1332             assertEquals(0, subtasks.count());
1333         }
1334     }
1335 
1336     /**
1337      * Test Joiner.allUntil(Predicate) with no cancellation.
1338      */
1339     @ParameterizedTest
1340     @MethodSource("factories")
1341     void testAllUntil2(ThreadFactory factory) throws Exception {
1342         try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false),
1343                 cf -> cf.withThreadFactory(factory))) {
1344 
1345             var subtask1 = scope.fork(() -> "foo");
1346             var subtask2 = scope.fork(() -> { throw new FooException(); });
1347 
1348             var subtasks = scope.join().toList();
1349             assertEquals(2, subtasks.size());
1350 
1351             assertSame(subtask1, subtasks.get(0));
1352             assertSame(subtask2, subtasks.get(1));
1353             assertEquals("foo", subtask1.get());
1354             assertTrue(subtask2.exception() instanceof FooException);
1355         }
1356     }
1357 
1358     /**
1359      * Test Joiner.allUntil(Predicate) with cancellation after one subtask completes.
1360      */
1361     @ParameterizedTest
1362     @MethodSource("factories")
1363     void testAllUntil3(ThreadFactory factory) throws Exception {
1364         try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> true),
1365                 cf -> cf.withThreadFactory(factory))) {
1366 
1367             var subtask1 = scope.fork(() -> "foo");
1368             var subtask2 = scope.fork(() -> {
1369                 Thread.sleep(Duration.ofDays(1));
1370                 return "bar";
1371             });
1372 
1373             var subtasks = scope.join().toList();
1374 
1375             assertEquals(2, subtasks.size());
1376             assertSame(subtask1, subtasks.get(0));
1377             assertSame(subtask2, subtasks.get(1));
1378             assertEquals("foo", subtask1.get());
1379             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1380         }
1381     }
1382 
1383     /**
1384      * Test Joiner.allUntil(Predicate) with cancellation after serveral subtasks complete.
1385      */
1386     @ParameterizedTest
1387     @MethodSource("factories")
1388     void testAllUntil4(ThreadFactory factory) throws Exception {
1389 
1390         // cancel execution after two or more failures
1391         class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> {
1392             final AtomicInteger failedCount = new AtomicInteger();
1393             @Override
1394             public boolean test(Subtask<? extends T> subtask) {
1395                 return subtask.state() == Subtask.State.FAILED
1396                         && failedCount.incrementAndGet() >= 2;
1397             }
1398         }
1399         var joiner = Joiner.allUntil(new CancelAfterTwoFailures<String>());
1400 
1401         try (var scope = StructuredTaskScope.open(joiner)) {
1402             int forkCount = 0;
1403 
1404             // fork subtasks until execution cancelled
1405             while (!scope.isCancelled()) {
1406                 scope.fork(() -> "foo");
1407                 scope.fork(() -> { throw new FooException(); });
1408                 forkCount += 2;
1409                 Thread.sleep(Duration.ofMillis(20));
1410             }
1411 
1412             var subtasks = scope.join().toList();
1413             assertEquals(forkCount, subtasks.size());
1414 
1415             long failedCount = subtasks.stream()
1416                     .filter(s -> s.state() == Subtask.State.FAILED)
1417                     .count();
1418             assertTrue(failedCount >= 2);
1419         }
1420     }
1421 
1422     /**
1423      * Test Test Joiner.allUntil(Predicate) where the Predicate's test method throws.
1424      */
1425     @Test
1426     void testAllUntil5() throws Exception {
1427         var joiner = Joiner.allUntil(_ -> { throw new FooException(); });
1428         var excRef = new AtomicReference<Throwable>();
1429         Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e);
1430         ThreadFactory factory = Thread.ofVirtual()
1431                 .uncaughtExceptionHandler(uhe)
1432                 .factory();
1433         try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
1434             scope.fork(() -> "foo");
1435             scope.join();
1436             assertInstanceOf(FooException.class, excRef.get());
1437         }
1438     }
1439 
1440     /**
1441      * Test Joiner default methods.
1442      */
1443     @Test
1444     void testJoinerDefaultMethods() throws Exception {
1445         try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) {
1446 
1447             // need subtasks to test default methods
1448             var subtask1 = scope.fork(() -> "foo");
1449             awaitCancelled(scope);
1450             var subtask2 = scope.fork(() -> "bar");
1451             scope.join();
1452 
1453             assertEquals(Subtask.State.SUCCESS, subtask1.state());
1454             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1455 
1456             // Joiner that does not override default methods
1457             Joiner<Object, Void> joiner = () -> null;
1458             assertThrows(NullPointerException.class, () -> joiner.onFork(null));
1459             assertThrows(NullPointerException.class, () -> joiner.onComplete(null));
1460             assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1));
1461             assertFalse(joiner.onFork(subtask2));
1462             assertFalse(joiner.onComplete(subtask1));
1463             assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2));
1464         }
1465     }
1466 
1467     /**
1468      * Test Joiners onFork/onComplete methods with a subtask in an unexpected state.
1469      */
1470     @Test
1471     void testJoinersWithUnavailableResult() throws Exception {
1472         try (var scope = StructuredTaskScope.open()) {
1473             var done = new CountDownLatch(1);
1474             var subtask = scope.fork(() -> {
1475                 done.await();
1476                 return null;
1477             });
1478 
1479             // onComplete with uncompleted task should throw IAE
1480             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1481             assertThrows(IllegalArgumentException.class,
1482                     () -> Joiner.allSuccessfulOrThrow().onComplete(subtask));
1483             assertThrows(IllegalArgumentException.class,
1484                     () -> Joiner.anySuccessfulResultOrThrow().onComplete(subtask));
1485             assertThrows(IllegalArgumentException.class,
1486                     () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(subtask));
1487             assertThrows(IllegalArgumentException.class,
1488                     () -> Joiner.awaitAll().onComplete(subtask));
1489             assertThrows(IllegalArgumentException.class,
1490                     () -> Joiner.allUntil(_ -> false).onComplete(subtask));
1491 
1492             done.countDown();
1493             scope.join();
1494 
1495             // onFork with completed task should throw IAE
1496             assertEquals(Subtask.State.SUCCESS, subtask.state());
1497             assertThrows(IllegalArgumentException.class,
1498                     () -> Joiner.allSuccessfulOrThrow().onFork(subtask));
1499             assertThrows(IllegalArgumentException.class,
1500                     () -> Joiner.anySuccessfulResultOrThrow().onFork(subtask));
1501             assertThrows(IllegalArgumentException.class,
1502                     () -> Joiner.awaitAllSuccessfulOrThrow().onFork(subtask));
1503             assertThrows(IllegalArgumentException.class,
1504                     () -> Joiner.awaitAll().onFork(subtask));
1505             assertThrows(IllegalArgumentException.class,
1506                     () -> Joiner.allUntil(_ -> false).onFork(subtask));
1507         }
1508 
1509     }
1510 
1511     /**
1512      * Test the Configuration function apply method throwing an exception.
1513      */
1514     @Test
1515     void testConfigFunctionThrows() throws Exception {
1516         assertThrows(FooException.class,
1517                 () -> StructuredTaskScope.open(Joiner.awaitAll(),
1518                                                cf -> { throw new FooException(); }));
1519     }
1520 
1521     /**
1522      * Test Configuration equals/hashCode/toString
1523      */
1524     @Test
1525     void testConfigMethods() throws Exception {
1526         Function<Configuration, Configuration> testConfig = cf -> {
1527             var name = "duke";
1528             var threadFactory = Thread.ofPlatform().factory();
1529             var timeout = Duration.ofSeconds(10);
1530 
1531             assertEquals(cf, cf);
1532             assertEquals(cf.withName(name), cf.withName(name));
1533             assertEquals(cf.withThreadFactory(threadFactory), cf.withThreadFactory(threadFactory));
1534             assertEquals(cf.withTimeout(timeout), cf.withTimeout(timeout));
1535 
1536             assertNotEquals(cf, cf.withName(name));
1537             assertNotEquals(cf, cf.withThreadFactory(threadFactory));
1538             assertNotEquals(cf, cf.withTimeout(timeout));
1539 
1540             assertEquals(cf.withName(name).hashCode(), cf.withName(name).hashCode());
1541             assertEquals(cf.withThreadFactory(threadFactory).hashCode(),
1542                     cf.withThreadFactory(threadFactory).hashCode());
1543             assertEquals(cf.withTimeout(timeout).hashCode(), cf.withTimeout(timeout).hashCode());
1544 
1545             assertTrue(cf.withName(name).toString().contains(name));
1546             assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
1547             assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
1548 
1549             return cf;
1550         };
1551         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), testConfig)) {
1552             // do nothing
1553         }
1554     }
1555 
1556     /**
1557      * Test for NullPointerException.
1558      */
1559     @Test
1560     void testNulls() throws Exception {
1561         assertThrows(NullPointerException.class,
1562                 () -> StructuredTaskScope.open(null));
1563         assertThrows(NullPointerException.class,
1564                 () -> StructuredTaskScope.open(null, cf -> cf));
1565         assertThrows(NullPointerException.class,
1566                 () -> StructuredTaskScope.open(Joiner.awaitAll(), null));
1567 
1568         assertThrows(NullPointerException.class, () -> Joiner.allUntil(null));
1569 
1570         // fork
1571         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1572             assertThrows(NullPointerException.class, () -> scope.fork((Callable<Object>) null));
1573             assertThrows(NullPointerException.class, () -> scope.fork((Runnable) null));
1574         }
1575 
1576         // Configuration and withXXX methods
1577         assertThrows(NullPointerException.class,
1578                 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> null));
1579         assertThrows(NullPointerException.class,
1580                 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withName(null)));
1581         assertThrows(NullPointerException.class,
1582                 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withThreadFactory(null)));
1583         assertThrows(NullPointerException.class,
1584                 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withTimeout(null)));
1585 
1586         // Joiner.onFork/onComplete
1587         assertThrows(NullPointerException.class,
1588                 () -> Joiner.awaitAllSuccessfulOrThrow().onFork(null));
1589         assertThrows(NullPointerException.class,
1590                 () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(null));
1591         assertThrows(NullPointerException.class,
1592                 () -> Joiner.awaitAll().onFork(null));
1593         assertThrows(NullPointerException.class,
1594                 () -> Joiner.awaitAll().onComplete(null));
1595         assertThrows(NullPointerException.class,
1596                 () -> Joiner.allSuccessfulOrThrow().onFork(null));
1597         assertThrows(NullPointerException.class,
1598                 () -> Joiner.allSuccessfulOrThrow().onComplete(null));
1599         assertThrows(NullPointerException.class,
1600                 () -> Joiner.anySuccessfulResultOrThrow().onFork(null));
1601         assertThrows(NullPointerException.class,
1602                 () -> Joiner.anySuccessfulResultOrThrow().onComplete(null));
1603     }
1604 
1605     /**
1606      * ThreadFactory that counts usage.
1607      */
1608     private static class CountingThreadFactory implements ThreadFactory {
1609         final ThreadFactory delegate;
1610         final AtomicInteger threadCount = new AtomicInteger();
1611         CountingThreadFactory(ThreadFactory delegate) {
1612             this.delegate = delegate;
1613         }
1614         @Override
1615         public Thread newThread(Runnable task) {
1616             threadCount.incrementAndGet();
1617             return delegate.newThread(task);
1618         }
1619         int threadCount() {
1620             return threadCount.get();
1621         }
1622     }
1623 
1624     /**
1625      * A joiner that counts that counts the number of subtasks that are forked and the
1626      * number of subtasks that complete.
1627      */
1628     private static class CountingJoiner<T> implements Joiner<T, Void> {
1629         final AtomicInteger onForkCount = new AtomicInteger();
1630         final AtomicInteger onCompleteCount = new AtomicInteger();
1631         @Override
1632         public boolean onFork(Subtask<? extends T> subtask) {
1633             onForkCount.incrementAndGet();
1634             return false;
1635         }
1636         @Override
1637         public boolean onComplete(Subtask<? extends T> subtask) {
1638             onCompleteCount.incrementAndGet();
1639             return false;
1640         }
1641         @Override
1642         public Void result() {
1643             return null;
1644         }
1645         int onForkCount() {
1646             return onForkCount.get();
1647         }
1648         int onCompleteCount() {
1649             return onCompleteCount.get();
1650         }
1651     }
1652 
1653     /**
1654      * A joiner that cancels execution when a subtask completes. It also keeps a count
1655      * of the number of subtasks that are forked and the number of subtasks that complete.
1656      */
1657     private static class CancelAfterOneJoiner<T> implements Joiner<T, Void> {
1658         final AtomicInteger onForkCount = new AtomicInteger();
1659         final AtomicInteger onCompleteCount = new AtomicInteger();
1660         @Override
1661         public boolean onFork(Subtask<? extends T> subtask) {
1662             onForkCount.incrementAndGet();
1663             return false;
1664         }
1665         @Override
1666         public boolean onComplete(Subtask<? extends T> subtask) {
1667             onCompleteCount.incrementAndGet();
1668             return true;
1669         }
1670         @Override
1671         public Void result() {
1672             return null;
1673         }
1674         int onForkCount() {
1675             return onForkCount.get();
1676         }
1677         int onCompleteCount() {
1678             return onCompleteCount.get();
1679         }
1680     }
1681 
1682     /**
1683      * A runtime exception for tests.
1684      */
1685     private static class FooException extends RuntimeException {
1686         FooException() { }
1687         FooException(Throwable cause) { super(cause); }
1688     }
1689 
1690     /**
1691      * Returns the current time in milliseconds.
1692      */
1693     private long millisTime() {
1694         long now = System.nanoTime();
1695         return TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);
1696     }
1697 
1698     /**
1699      * Check the duration of a task
1700      * @param start start time, in milliseconds
1701      * @param min minimum expected duration, in milliseconds
1702      * @param max maximum expected duration, in milliseconds
1703      * @return the duration (now - start), in milliseconds
1704      */
1705     private long expectDuration(long start, long min, long max) {
1706         long duration = millisTime() - start;
1707         assertTrue(duration >= min,
1708                 "Duration " + duration + "ms, expected >= " + min + "ms");
1709         assertTrue(duration <= max,
1710                 "Duration " + duration + "ms, expected <= " + max + "ms");
1711         return duration;
1712     }
1713 
1714     /**
1715      * Wait for the given scope to be cancelled.
1716      */
1717     private static void awaitCancelled(StructuredTaskScope<?, ?> scope) throws InterruptedException {
1718         while (!scope.isCancelled()) {
1719             Thread.sleep(Duration.ofMillis(20));
1720         }
1721     }
1722 
1723     /**
1724      * Interrupts a thread when it waits (timed or untimed) at location "{@code c.m}".
1725      * {@code c} is the fully qualified class name and {@code m} is the method name.
1726      */
1727     private void interruptThreadAt(Thread target, String location) throws InterruptedException {
1728         int index = location.lastIndexOf('.');
1729         String className = location.substring(0, index);
1730         String methodName = location.substring(index + 1);
1731 
1732         boolean found = false;
1733         while (!found) {
1734             Thread.State state = target.getState();
1735             assertTrue(state != TERMINATED);
1736             if ((state == WAITING || state == TIMED_WAITING)
1737                     && contains(target.getStackTrace(), className, methodName)) {
1738                 found = true;
1739             } else {
1740                 Thread.sleep(20);
1741             }
1742         }
1743         target.interrupt();
1744     }
1745 
1746     /**
1747      * Schedules the current thread to be interrupted when it waits (timed or untimed)
1748      * at the given location.
1749      */
1750     private void scheduleInterruptAt(String location) {
1751         Thread target = Thread.currentThread();
1752         scheduler.submit(() -> {
1753             interruptThreadAt(target, location);
1754             return null;
1755         });
1756     }
1757 
1758     /**
1759      * Returns true if the given stack trace contains an element for the given class
1760      * and method name.
1761      */
1762     private boolean contains(StackTraceElement[] stack, String className, String methodName) {
1763         return Arrays.stream(stack)
1764                 .anyMatch(e -> className.equals(e.getClassName())
1765                         && methodName.equals(e.getMethodName()));
1766     }
1767 }