1 /*
   2  * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /*
  25  * @test id=platform
  26  * @bug 8284199 8296779 8306647
  27  * @summary Basic tests for StructuredTaskScope
  28  * @enablePreview
  29  * @run junit/othervm -DthreadFactory=platform StructuredTaskScopeTest
  30  */
  31 
  32 /*
  33  * @test id=virtual
  34  * @enablePreview
  35  * @run junit/othervm -DthreadFactory=virtual StructuredTaskScopeTest
  36  */
  37 
  38 import java.time.Duration;
  39 import java.util.Arrays;
  40 import java.util.ArrayList;
  41 import java.util.List;
  42 import java.util.Set;
  43 import java.util.NoSuchElementException;
  44 import java.util.concurrent.Callable;
  45 import java.util.concurrent.ConcurrentHashMap;
  46 import java.util.concurrent.CountDownLatch;
  47 import java.util.concurrent.Executors;
  48 import java.util.concurrent.ExecutionException;
  49 import java.util.concurrent.Future;
  50 import java.util.concurrent.LinkedTransferQueue;
  51 import java.util.concurrent.ThreadFactory;
  52 import java.util.concurrent.TimeoutException;
  53 import java.util.concurrent.TimeUnit;
  54 import java.util.concurrent.RejectedExecutionException;
  55 import java.util.concurrent.ScheduledExecutorService;
  56 import java.util.concurrent.StructuredTaskScope;
  57 import java.util.concurrent.StructuredTaskScope.Config;
  58 import java.util.concurrent.StructuredTaskScope.Joiner;
  59 import java.util.concurrent.StructuredTaskScope.Subtask;
  60 import java.util.concurrent.StructureViolationException;
  61 import java.util.concurrent.atomic.AtomicBoolean;
  62 import java.util.concurrent.atomic.AtomicInteger;
  63 import java.util.function.Function;
  64 import java.util.function.Predicate;
  65 import java.util.stream.Stream;
  66 import static java.lang.Thread.State.*;
  67 
  68 import org.junit.jupiter.api.Test;
  69 import org.junit.jupiter.api.BeforeAll;
  70 import org.junit.jupiter.api.AfterAll;
  71 import org.junit.jupiter.params.ParameterizedTest;
  72 import org.junit.jupiter.params.provider.MethodSource;
  73 import static org.junit.jupiter.api.Assertions.*;
  74 
  75 class StructuredTaskScopeTest {
  76     private static ScheduledExecutorService scheduler;
  77     private static List<ThreadFactory> threadFactories;
  78 
  79     @BeforeAll
  80     static void setup() throws Exception {
  81         scheduler = Executors.newSingleThreadScheduledExecutor();
  82 
  83         // thread factories
  84         String value = System.getProperty("threadFactory");
  85         List<ThreadFactory> list = new ArrayList<>();
  86         if (value == null || value.equals("platform"))
  87             list.add(Thread.ofPlatform().factory());
  88         if (value == null || value.equals("virtual"))
  89             list.add(Thread.ofVirtual().factory());
  90         assertTrue(list.size() > 0, "No thread factories for tests");
  91         threadFactories = list;
  92     }
  93 
  94     @AfterAll
  95     static void shutdown() {
  96         scheduler.shutdown();
  97     }
  98 
  99     private static Stream<ThreadFactory> factories() {
 100         return threadFactories.stream();
 101     }
 102 
 103     /**
 104      * Test that fork creates virtual threads when no ThreadFactory is configured.
 105      */
 106     @Test
 107     void testForkCreateVirtualThread() throws Exception {
 108         Set<Thread> threads = ConcurrentHashMap.newKeySet();
 109         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
 110             for (int i = 0; i < 50; i++) {
 111                 // runnable
 112                 scope.fork(() -> {
 113                     threads.add(Thread.currentThread());
 114                 });
 115 
 116                 // callable
 117                 scope.fork(() -> {
 118                     threads.add(Thread.currentThread());
 119                     return null;
 120                 });
 121             }
 122             scope.join();
 123         }
 124         assertEquals(100, threads.size());
 125         threads.forEach(t -> assertTrue(t.isVirtual()));
 126     }
 127 
 128     /**
 129      * Test that fork create threads with the configured ThreadFactory.
 130      */
 131     @ParameterizedTest
 132     @MethodSource("factories")
 133     void testForkUsesThreadFactory(ThreadFactory factory) throws Exception {
 134         // TheadFactory that keeps reference to all threads it creates
 135         class RecordingThreadFactory implements ThreadFactory {
 136             final ThreadFactory delegate;
 137             final Set<Thread> threads = ConcurrentHashMap.newKeySet();
 138             RecordingThreadFactory(ThreadFactory delegate) {
 139                 this.delegate = delegate;
 140             }
 141             @Override
 142             public Thread newThread(Runnable task) {
 143                 Thread thread = delegate.newThread(task);
 144                 threads.add(thread);
 145                 return thread;
 146             }
 147             Set<Thread> threads() {
 148                 return threads;
 149             }
 150         }
 151         var recordingThreadFactory = new RecordingThreadFactory(factory);
 152         Set<Thread> threads = ConcurrentHashMap.newKeySet();
 153         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 154                 cf -> cf.withThreadFactory(recordingThreadFactory))) {
 155 
 156             for (int i = 0; i < 50; i++) {
 157                 // runnable
 158                 scope.fork(() -> {
 159                     threads.add(Thread.currentThread());
 160                 });
 161 
 162                 // callable
 163                 scope.fork(() -> {
 164                     threads.add(Thread.currentThread());
 165                     return null;
 166                 });
 167             }
 168             scope.join();
 169         }
 170         assertEquals(100, threads.size());
 171         assertEquals(recordingThreadFactory.threads(), threads);
 172     }
 173 
 174     /**
 175      * Test fork method is owner confined.
 176      */
 177     @ParameterizedTest
 178     @MethodSource("factories")
 179     void testForkConfined(ThreadFactory factory) throws Exception {
 180         try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
 181                 cf -> cf.withThreadFactory(factory))) {
 182 
 183             // random thread cannot fork
 184             try (var pool = Executors.newSingleThreadExecutor()) {
 185                 Future<Void> future = pool.submit(() -> {
 186                     assertThrows(WrongThreadException.class, () -> {
 187                         scope.fork(() -> null);
 188                     });
 189                     return null;
 190                 });
 191                 future.get();
 192             }
 193 
 194             // subtask cannot fork
 195             Subtask<Boolean> subtask = scope.fork(() -> {
 196                 assertThrows(WrongThreadException.class, () -> {
 197                     scope.fork(() -> null);
 198                 });
 199                 return true;
 200             });
 201             scope.join();
 202             assertTrue(subtask.get());
 203         }
 204     }
 205 
 206     /**
 207      * Test fork after join, no subtasks forked before join.
 208      */
 209     @ParameterizedTest
 210     @MethodSource("factories")
 211     void testForkAfterJoin1(ThreadFactory factory) throws Exception {
 212         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 213                 cf -> cf.withThreadFactory(factory))) {
 214             scope.join();
 215             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 216         }
 217     }
 218 
 219     /**
 220      * Test fork after join, subtasks forked before join.
 221      */
 222     @ParameterizedTest
 223     @MethodSource("factories")
 224     void testForkAfterJoin2(ThreadFactory factory) throws Exception {
 225         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 226                 cf -> cf.withThreadFactory(factory))) {
 227             scope.fork(() -> "foo");
 228             scope.join();
 229             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 230         }
 231     }
 232 
 233     /**
 234      * Test fork after join throws.
 235      */
 236     @ParameterizedTest
 237     @MethodSource("factories")
 238     void testForkAfterJoinThrows(ThreadFactory factory) throws Exception {
 239         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 240                 cf -> cf.withThreadFactory(factory))) {
 241             var latch = new CountDownLatch(1);
 242             var subtask1 = scope.fork(() -> {
 243                 latch.await();
 244                 return "foo";
 245             });
 246 
 247             // join throws
 248             Thread.currentThread().interrupt();
 249             assertThrows(InterruptedException.class, scope::join);
 250 
 251             // allow subtask1 to finish
 252             latch.countDown();
 253 
 254             // continue to fork
 255             var subtask2 = scope.fork(() -> "bar");
 256             scope.join();
 257             assertEquals("foo", subtask1.get());
 258             assertEquals("bar", subtask2.get());
 259         }
 260     }
 261 
 262     /**
 263      * Test fork after task scope is cancelled. This test uses the cancel method to
 264      * cancel execution explicitly.
 265      */
 266     @ParameterizedTest
 267     @MethodSource("factories")
 268     void testForkAfterCancel1(ThreadFactory factory) throws Exception {
 269         var countingThreadFactory = new CountingThreadFactory(factory);
 270         var testJoiner = new CountingJoiner<String>();
 271 
 272         try (var scope = StructuredTaskScope.open(testJoiner,
 273                 cf -> cf.withThreadFactory(countingThreadFactory))) {
 274 
 275             scope.cancel();
 276 
 277             // fork subtask
 278             var subtask = scope.fork(() -> "foo");
 279             
 280             assertEquals(0, countingThreadFactory.threadCount());
 281             assertEquals(1, testJoiner.onForkCount());
 282             assertEquals(0, testJoiner.onCompleteCount());
 283 
 284             scope.join();
 285 
 286             assertEquals(0, countingThreadFactory.threadCount());
 287             assertEquals(1, testJoiner.onForkCount());
 288             assertEquals(0, testJoiner.onCompleteCount());
 289 
 290             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 291         }
 292     }
 293     
 294 
 295     /**
 296      * Test fork after task scope is cancelled. This test uses a custom Joiner to
 297      * cancel execution.
 298      */
 299     @ParameterizedTest
 300     @MethodSource("factories")
 301     void testForkAfterCancel2(ThreadFactory factory) throws Exception {
 302         var countingThreadFactory = new CountingThreadFactory(factory);
 303         var testJoiner = new CancelAfterOneJoiner<String>();
 304 
 305         try (var scope = StructuredTaskScope.open(testJoiner,
 306                 cf -> cf.withThreadFactory(countingThreadFactory))) {
 307 
 308             // fork subtask, the scope should be cancelled when the subtask completes
 309             var subtask1 = scope.fork(() -> "foo");
 310             while (!scope.isCancelled()) {
 311                 Thread.sleep(20);
 312             }
 313 
 314             assertEquals(1, countingThreadFactory.threadCount());
 315             assertEquals(1, testJoiner.onForkCount());
 316             assertEquals(1, testJoiner.onCompleteCount());
 317 
 318             // fork second subtask, it should not run
 319             var subtask2 = scope.fork(() -> "bar");
 320 
 321             // onFork should be invoked, newThread and onComplete should not be invoked
 322             assertEquals(1, countingThreadFactory.threadCount());
 323             assertEquals(2, testJoiner.onForkCount());
 324             assertEquals(1, testJoiner.onCompleteCount());
 325 
 326             scope.join();
 327 
 328             assertEquals(1, countingThreadFactory.threadCount());
 329             assertEquals(2, testJoiner.onForkCount());
 330             assertEquals(1, testJoiner.onCompleteCount());
 331             assertEquals("foo", subtask1.get());
 332             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
 333         }
 334     }
 335 
 336     /**
 337      * Test fork after task scope is closed.
 338      */
 339     @Test
 340     void testForkAfterClose() {
 341         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
 342             scope.close();
 343             assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
 344         }
 345     }
 346 
 347     /**
 348      * Test fork when the ThreadFactory rejects creating a thread.
 349      */
 350     @Test
 351     void testForkRejectedExecutionException() {
 352         ThreadFactory factory = task -> null;
 353         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 354                 cf -> cf.withThreadFactory(factory))) {
 355             assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null));
 356         }
 357     }
 358 
 359     /**
 360      * Test join with no subtasks.
 361      */
 362     @Test
 363     void testJoinWithNoSubtasks() throws Exception {
 364         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
 365             scope.join();
 366         }
 367     }
 368 
 369     /**
 370      * Test join with a remaining subtask.
 371      */
 372     @ParameterizedTest
 373     @MethodSource("factories")
 374     void testJoinWithRemainingSubtasks(ThreadFactory factory) throws Exception {
 375         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 376                 cf -> cf.withThreadFactory(factory))) {
 377             Subtask<String> subtask = scope.fork(() -> {
 378                 Thread.sleep(Duration.ofMillis(100));
 379                 return "foo";
 380             });
 381             scope.join();
 382             assertEquals("foo", subtask.get());
 383         }
 384     }
 385 
 386     /**
 387      * Test repeated calls to join.
 388      */
 389     @Test
 390     void testJoinAfterJoin() throws Exception {
 391         var results = new LinkedTransferQueue<>(List.of("foo", "bar", "baz"));
 392         Joiner<Object, String> joiner = results::take;
 393         try (var scope = StructuredTaskScope.open(joiner)) {
 394             scope.fork(() -> "foo");
 395 
 396             // each call to join should invoke Joiner::result
 397             assertEquals("foo", scope.join());
 398             assertEquals("bar", scope.join());
 399             assertEquals("baz", scope.join());
 400         }
 401     }
 402 
 403     /**
 404      * Test join method is owner confined.
 405      */
 406     @ParameterizedTest
 407     @MethodSource("factories")
 408     void testJoinConfined(ThreadFactory factory) throws Exception {
 409         try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
 410                 cf -> cf.withThreadFactory(factory))) {
 411 
 412             // random thread cannot join
 413             try (var pool = Executors.newSingleThreadExecutor()) {
 414                 Future<Void> future = pool.submit(() -> {
 415                     assertThrows(WrongThreadException.class, scope::join);
 416                     return null;
 417                 });
 418                 future.get();
 419             }
 420 
 421             // subtask cannot join
 422             Subtask<Boolean> subtask = scope.fork(() -> {
 423                 assertThrows(WrongThreadException.class, () -> { scope.join(); });
 424                 return true;
 425             });
 426             scope.join();
 427             assertTrue(subtask.get());
 428         }
 429     }
 430 
 431     /**
 432      * Test join with interrupt status set.
 433      */
 434     @ParameterizedTest
 435     @MethodSource("factories")
 436     void testInterruptJoin1(ThreadFactory factory) throws Exception {
 437         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 438                 cf -> cf.withThreadFactory(factory))) {
 439 
 440             var latch = new CountDownLatch(1);
 441 
 442             Subtask<String> subtask = scope.fork(() -> {
 443                 latch.await();
 444                 return "foo";
 445             });
 446 
 447             // join should throw
 448             Thread.currentThread().interrupt();
 449             try {
 450                 scope.join();
 451                 fail("join did not throw");
 452             } catch (InterruptedException expected) {
 453                 assertFalse(Thread.interrupted());   // interrupt status should be cleared
 454             } finally {
 455                 // let task continue
 456                 latch.countDown();
 457             }
 458 
 459             // join should complete
 460             scope.join();
 461             assertEquals("foo", subtask.get());
 462         }
 463     }
 464 
 465     /**
 466      * Test interrupt of thread blocked in join.
 467      */
 468     @ParameterizedTest
 469     @MethodSource("factories")
 470     void testInterruptJoin2(ThreadFactory factory) throws Exception {
 471         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 472                 cf -> cf.withThreadFactory(factory))) {
 473 
 474             var latch = new CountDownLatch(1);
 475             Subtask<String> subtask = scope.fork(() -> {
 476                 latch.await();
 477                 return "foo";
 478             });
 479 
 480             // join should throw
 481             scheduleInterruptAt("java.util.concurrent.StructuredTaskScope.join");
 482             try {
 483                 scope.join();
 484                 fail("join did not throw");
 485             } catch (InterruptedException expected) {
 486                 assertFalse(Thread.interrupted());   // interrupt status should be clear
 487             } finally {
 488                 // let task continue
 489                 latch.countDown();
 490             }
 491 
 492             // join should complete
 493             scope.join();
 494             assertEquals("foo", subtask.get());
 495         }
 496     }
 497 
 498     /**
 499      * Test join when scope is cancelled.
 500      */
 501     @ParameterizedTest
 502     @MethodSource("factories")
 503     void testJoinWhenCancelled(ThreadFactory factory) throws Exception {
 504         var countingThreadFactory = new CountingThreadFactory(factory);
 505         var testJoiner = new CancelAfterOneJoiner<String>();
 506 
 507         try (var scope = StructuredTaskScope.open(testJoiner,
 508                     cf -> cf.withThreadFactory(countingThreadFactory))) {
 509 
 510             // fork subtask, the scope should be cancelled when the subtask completes
 511             var subtask1 = scope.fork(() -> "foo");
 512             while (!scope.isCancelled()) {
 513                 Thread.sleep(20);
 514             }
 515 
 516             // fork second subtask, it should not run
 517             var subtask2 = scope.fork(() -> {
 518                 Thread.sleep(Duration.ofDays(1));
 519                 return "bar";
 520             });
 521 
 522             scope.join();
 523 
 524             assertEquals("foo", subtask1.get());
 525             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
 526         }
 527     }
 528 
 529     /**
 530      * Test join after scope is closed.
 531      */
 532     @Test
 533     void testJoinAfterClose() throws Exception {
 534         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
 535             scope.close();
 536             assertThrows(IllegalStateException.class, () -> scope.join());
 537         }
 538     }
 539 
 540     /**
 541      * Test join with timeout, subtasks finish before timeout expires.
 542      */
 543     @ParameterizedTest
 544     @MethodSource("factories")
 545     void testJoinWithTimeout1(ThreadFactory factory) throws Exception {
 546         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 547                 cf -> cf.withThreadFactory(factory)
 548                         .withTimeout(Duration.ofDays(1)))) {
 549 
 550             Subtask<String> subtask = scope.fork(() -> {
 551                 Thread.sleep(Duration.ofSeconds(1));
 552                 return "foo";
 553             });
 554 
 555             scope.join();
 556 
 557             assertFalse(scope.isCancelled());
 558             assertEquals("foo", subtask.get());
 559         }
 560     }
 561 
 562     /**
 563      * Test join with timeout, timeout expires before subtasks finish.
 564      */
 565     @ParameterizedTest
 566     @MethodSource("factories")
 567     void testJoinWithTimeout2(ThreadFactory factory) throws Exception {
 568         long startMillis = millisTime();
 569         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 570                 cf -> cf.withThreadFactory(factory)
 571                         .withTimeout(Duration.ofSeconds(2)))) {
 572 
 573             Subtask<Void> subtask = scope.fork(() -> {
 574                 Thread.sleep(Duration.ofDays(1));
 575                 return null;
 576             });
 577 
 578             try {
 579                 scope.join();
 580                 fail();
 581             } catch (ExecutionException e) {
 582                 assertTrue(e.getCause() instanceof TimeoutException);
 583             }
 584             expectDuration(startMillis, /*min*/1900, /*max*/20_000);
 585 
 586             assertTrue(scope.isCancelled());
 587             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 588         }
 589     }
 590 
 591     /**
 592      * Test join with timeout that has already expired.
 593      */
 594     @ParameterizedTest
 595     @MethodSource("factories")
 596     void testJoinWithTimeout3(ThreadFactory factory) throws Exception {
 597         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 598                 cf -> cf.withThreadFactory(factory)
 599                         .withTimeout(Duration.ofSeconds(-1)))) {
 600 
 601             Subtask<Void> subtask = scope.fork(() -> {
 602                 Thread.sleep(Duration.ofDays(1));
 603                 return null;
 604             });
 605 
 606             try {
 607                 scope.join();
 608                 fail();
 609             } catch (ExecutionException e) {
 610                 assertTrue(e.getCause() instanceof TimeoutException);
 611             }
 612             assertTrue(scope.isCancelled());
 613             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 614         }
 615     }
 616 
 617     /**
 618      * Test that cancelling execution interrupts unfinished threads. This test uses the
 619      * cancel method to cancel execution explicitly.
 620      */
 621     @ParameterizedTest
 622     @MethodSource("factories")
 623     void testCancelInterruptsThreads1(ThreadFactory factory) throws Exception {
 624         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 625                 cf -> cf.withThreadFactory(factory))) {
 626 
 627             // fork subtask that runs for a long time
 628             var started = new CountDownLatch(1);
 629             var interrupted = new CountDownLatch(1);
 630             var subtask = scope.fork(() -> {
 631                 started.countDown();
 632                 try {
 633                     Thread.sleep(Duration.ofDays(1));
 634                 } catch (InterruptedException e) {
 635                     interrupted.countDown();
 636                 }
 637             });
 638             started.await();
 639 
 640             scope.cancel();
 641 
 642             // subtask should be interrupted
 643             interrupted.await();
 644 
 645             scope.join();
 646             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 647         }
 648     }
 649 
 650     /**
 651      * Test that cancelling execution interrupts unfinished threads. This test uses
 652      * a custom Joiner to cancel execution.
 653      */
 654     @ParameterizedTest
 655     @MethodSource("factories")
 656     void testCancelInterruptsThreads2(ThreadFactory factory) throws Exception {
 657         var testJoiner = new CancelAfterOneJoiner<String>();
 658 
 659         try (var scope = StructuredTaskScope.open(testJoiner,
 660                 cf -> cf.withThreadFactory(factory))) {
 661 
 662             // fork subtask1 that runs for a long time
 663             var started = new CountDownLatch(1);
 664             var interrupted = new CountDownLatch(1);
 665             var subtask1 = scope.fork(() -> {
 666                 started.countDown();
 667                 try {
 668                     Thread.sleep(Duration.ofDays(1));
 669                 } catch (InterruptedException e) {
 670                     interrupted.countDown();
 671                 }
 672             });
 673             started.await();
 674 
 675             // fork subtask2, the scope should be cancelled when the subtask completes
 676             var subtask2 = scope.fork(() -> "bar");
 677             while (!scope.isCancelled()) {
 678                 Thread.sleep(20);
 679             }
 680 
 681             // subtask1 should be interrupted
 682             interrupted.await();
 683 
 684             scope.join();
 685             assertEquals(Subtask.State.UNAVAILABLE, subtask1.state());
 686             assertEquals("bar", subtask2.get());
 687         }
 688     }
 689 
 690     /**
 691      * Test that timeout interrupts unfinished threads.
 692      */
 693     @ParameterizedTest
 694     @MethodSource("factories")
 695     void testTimeoutInterruptsThreads(ThreadFactory factory) throws Exception {
 696         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 697                 cf -> cf.withThreadFactory(factory)
 698                         .withTimeout(Duration.ofSeconds(2)))) {
 699 
 700             var started = new AtomicBoolean();
 701             var interrupted = new CountDownLatch(1);
 702             Subtask<Void> subtask = scope.fork(() -> {
 703                 started.set(true);
 704                 try {
 705                     Thread.sleep(Duration.ofDays(1));
 706                 } catch (InterruptedException e) {
 707                     interrupted.countDown();
 708                 }
 709                 return null;
 710             });
 711 
 712             while (!scope.isCancelled()) {
 713                 Thread.sleep(50);
 714             }
 715 
 716             // if subtask started then it should be interrupted
 717             if (started.get()) {
 718                 interrupted.await();
 719             }
 720 
 721             try {
 722                 scope.join();
 723                 fail();
 724             } catch (ExecutionException e) {
 725                 assertTrue(e.getCause() instanceof TimeoutException);
 726             }
 727             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 728         }
 729     }
 730 
 731     /**
 732      * Test close without join, no subtasks forked.
 733      */
 734     @Test
 735     void testCloseWithoutJoin1() {
 736         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
 737             // do nothing
 738         }
 739     }
 740 
 741     /**
 742      * Test close without join, subtasks forked.
 743      */
 744     @ParameterizedTest
 745     @MethodSource("factories")
 746     void testCloseWithoutJoin2(ThreadFactory factory) {
 747         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 748                 cf -> cf.withThreadFactory(factory))) {
 749             Subtask<String> subtask = scope.fork(() -> {
 750                 Thread.sleep(Duration.ofDays(1));
 751                 return null;
 752             });
 753             assertThrows(IllegalStateException.class, scope::close);
 754 
 755             // subtask result/exception not available
 756             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 757             assertThrows(IllegalStateException.class, subtask::get);
 758             assertThrows(IllegalStateException.class, subtask::exception);
 759         }
 760     }
 761 
 762     /**
 763      * Test close after join throws. Close should not throw as join attempted.
 764      */
 765     @ParameterizedTest
 766     @MethodSource("factories")
 767     void testCloseAfterJoinThrows(ThreadFactory factory) throws Exception {
 768         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 769                 cf -> cf.withThreadFactory(factory))) {
 770             var subtask = scope.fork(() -> {
 771                 Thread.sleep(Duration.ofDays(1));
 772                 return null;
 773             });
 774 
 775             // join throws
 776             Thread.currentThread().interrupt();
 777             assertThrows(InterruptedException.class, scope::join);
 778             assertThrows(IllegalStateException.class, subtask::get);
 779 
 780         }  // close should not throw
 781     }
 782 
 783     /**
 784      * Test close method is owner confined.
 785      */
 786     @ParameterizedTest
 787     @MethodSource("factories")
 788     void testCloseConfined(ThreadFactory factory) throws Exception {
 789         try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
 790                 cf -> cf.withThreadFactory(factory))) {
 791 
 792             // random thread cannot close scope
 793             try (var pool = Executors.newCachedThreadPool(factory)) {
 794                 Future<Boolean> future = pool.submit(() -> {
 795                     assertThrows(WrongThreadException.class, scope::close);
 796                     return null;
 797                 });
 798                 future.get();
 799             }
 800 
 801             // subtask cannot close
 802             Subtask<Boolean> subtask = scope.fork(() -> {
 803                 assertThrows(WrongThreadException.class, scope::close);
 804                 return true;
 805             });
 806             scope.join();
 807             assertTrue(subtask.get());
 808         }
 809     }
 810 
 811     /**
 812      * Test close with interrupt status set.
 813      */
 814     @ParameterizedTest
 815     @MethodSource("factories")
 816     void testInterruptClose1(ThreadFactory factory) throws Exception {
 817         var testJoiner = new CancelAfterOneJoiner<String>();
 818         try (var scope = StructuredTaskScope.open(testJoiner,
 819                 cf -> cf.withThreadFactory(factory))) {
 820 
 821             // fork first subtask, a straggler as it continues after being interrupted
 822             var started = new CountDownLatch(1);
 823             var done = new AtomicBoolean();
 824             scope.fork(() -> {
 825                 started.countDown();
 826                 try {
 827                     Thread.sleep(Duration.ofDays(1));
 828                 } catch (InterruptedException e) {
 829                     // interrupted by cancel, expected
 830                 }
 831                 Thread.sleep(Duration.ofMillis(100)); // force close to wait
 832                 done.set(true);
 833                 return null;
 834             });
 835             started.await();
 836 
 837             // fork second subtask, the scope should be cancelled when this subtask completes
 838             scope.fork(() -> "bar");
 839             while (!scope.isCancelled()) {
 840                 Thread.sleep(20);
 841             }
 842 
 843             scope.join();
 844 
 845             // invoke close with interrupt status set
 846             Thread.currentThread().interrupt();
 847             try {
 848                 scope.close();
 849             } finally {
 850                 assertTrue(Thread.interrupted());   // clear interrupt status
 851                 assertTrue(done.get());
 852             }
 853         }
 854     }
 855 
 856     /**
 857      * Test interrupting thread waiting in close.
 858      */
 859     @ParameterizedTest
 860     @MethodSource("factories")
 861     void testInterruptClose2(ThreadFactory factory) throws Exception {
 862         var testJoiner = new CancelAfterOneJoiner<String>();
 863         try (var scope = StructuredTaskScope.open(testJoiner,
 864                 cf -> cf.withThreadFactory(factory))) {
 865 
 866             Thread mainThread = Thread.currentThread();
 867 
 868             // fork first subtask, a straggler as it continues after being interrupted
 869             var started = new CountDownLatch(1);
 870             var done = new AtomicBoolean();
 871             scope.fork(() -> {
 872                 started.countDown();
 873                 try {
 874                     Thread.sleep(Duration.ofDays(1));
 875                 } catch (InterruptedException e) {
 876                     // interrupted by cancel, expected
 877                 }
 878 
 879                 // interrupt main thread when it blocks in close
 880                 interruptThreadAt(mainThread, "java.util.concurrent.StructuredTaskScope.close");
 881 
 882                 Thread.sleep(Duration.ofMillis(100)); // force close to wait
 883                 done.set(true);
 884                 return null;
 885             });
 886             started.await();
 887 
 888             // fork second subtask, the scope should be cancelled when this subtask completes
 889             scope.fork(() -> "bar");
 890             while (!scope.isCancelled()) {
 891                 Thread.sleep(20);
 892             }
 893 
 894             scope.join();
 895 
 896             // main thread will be interrupted while blocked in close
 897             try {
 898                 scope.close();
 899             } finally {
 900                 assertTrue(Thread.interrupted());   // clear interrupt status
 901                 assertTrue(done.get());
 902             }
 903         }
 904     }
 905 
 906     /**
 907      * Test that closing an enclosing scope closes the thread flock of a nested scope.
 908      */
 909     @Test
 910     void testCloseThrowsStructureViolation() throws Exception {
 911         try (var scope1 = StructuredTaskScope.open(Joiner.awaitAll())) {
 912             try (var scope2 = StructuredTaskScope.open(Joiner.awaitAll())) {
 913 
 914                 // close enclosing scope
 915                 try {
 916                     scope1.close();
 917                     fail("close did not throw");
 918                 } catch (StructureViolationException expected) { }
 919 
 920                 // underlying flock should be closed
 921                 var executed = new AtomicBoolean();
 922                 Subtask<?> subtask = scope2.fork(() -> executed.set(true));
 923                 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 924                 scope2.join();
 925                 assertFalse(executed.get());
 926                 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 927             }
 928         }
 929     }
 930 
 931 
 932     /**
 933      * Test cancel after task scope is cancelled.
 934      */
 935     @Test
 936     void testCancelAfterCancel() {
 937         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
 938             scope.cancel();
 939         }
 940     }
 941 
 942     /**
 943      * Test cancel after task scope is closed.
 944      */
 945     @Test
 946     void testCancelAfterClose() {
 947         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
 948             assertFalse(scope.isCancelled());
 949             scope.cancel();
 950             assertTrue(scope.isCancelled());
 951             scope.cancel();
 952             assertTrue(scope.isCancelled());
 953         }
 954     }
 955 
 956     /**
 957      * Test cancel method is owner confined.
 958      */
 959     @ParameterizedTest
 960     @MethodSource("factories")
 961     void testCancelConfined(ThreadFactory factory) throws Exception {
 962         try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
 963                 cf -> cf.withThreadFactory(factory))) {
 964 
 965             // random thread cannot cancel scope
 966             try (var pool = Executors.newCachedThreadPool(factory)) {
 967                 Future<Boolean> future = pool.submit(() -> {
 968                     assertThrows(WrongThreadException.class, scope::cancel);
 969                     return null;
 970                 });
 971                 future.get();
 972             }
 973 
 974             // subtask cannot cancel
 975             Subtask<Boolean> subtask = scope.fork(() -> {
 976                 assertThrows(WrongThreadException.class, scope::cancel);
 977                 return true;
 978             });
 979             scope.join();
 980             assertTrue(subtask.get());
 981         }
 982     }
 983 
 984     /**
 985      * Test that isCancelled returns true after close.
 986      */
 987     @Test
 988     void testIsCancelledAfterClose() throws Exception {
 989         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
 990             assertFalse(scope.isCancelled());
 991             scope.close();
 992             assertTrue(scope.isCancelled());
 993         }
 994     }
 995 
 996     /**
 997      * Test Joiner.onFork throwing exception.
 998      */
 999     @Test
1000     void testOnForkThrows() throws Exception {
1001         var joiner = new Joiner<String, Void>() {
1002             @Override
1003             public boolean onFork(Subtask<? extends String> subtask) {
1004                 throw new FooException();
1005             }
1006             @Override
1007             public Void result() {
1008                 return null;
1009             }
1010         };
1011         try (var scope = StructuredTaskScope.open(joiner)) {
1012             assertThrows(FooException.class, () -> scope.fork(() -> "foo"));
1013         }
1014     }
1015 
1016     /**
1017      * Test Joiner.onFork returning true to cancel execution.
1018      */
1019     @Test
1020     void testOnForkCancelsExecution() throws Exception {
1021         var joiner = new Joiner<String, Void>() {
1022             @Override
1023             public boolean onFork(Subtask<? extends String> subtask) {
1024                 return true;
1025             }
1026             @Override
1027             public Void result() {
1028                 return null;
1029             }
1030         };
1031         try (var scope = StructuredTaskScope.open(joiner)) {
1032             assertFalse(scope.isCancelled());
1033             scope.fork(() -> "foo");
1034             assertTrue(scope.isCancelled());
1035             scope.join();
1036         }
1037     }
1038 
1039     /**
1040      * Test Joiner.onComplete returning true to cancel execution.
1041      */
1042     @Test
1043     void testOnCompleteCancelsExecution() throws Exception {
1044         var joiner = new Joiner<String, Void>() {
1045             @Override
1046             public boolean onComplete(Subtask<? extends String> subtask) {
1047                 return true;
1048             }
1049             @Override
1050             public Void result() {
1051                 return null;
1052             }
1053         };
1054         try (var scope = StructuredTaskScope.open(joiner)) {
1055             assertFalse(scope.isCancelled());
1056             scope.fork(() -> "foo");
1057             while (!scope.isCancelled()) {
1058                 Thread.sleep(10);
1059             }
1060             scope.join();
1061         }
1062     }
1063 
1064     /**
1065      * Test toString.
1066      */
1067     @Test
1068     void testToString() throws Exception {
1069         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
1070                 cf -> cf.withName("duke"))) {
1071 
1072             // open
1073             assertTrue(scope.toString().contains("duke"));
1074 
1075             // closed
1076             scope.close();
1077             assertTrue(scope.toString().contains("duke"));
1078         }
1079     }
1080 
1081     /**
1082      * Test Subtask with task that completes successfully.
1083      */
1084     @ParameterizedTest
1085     @MethodSource("factories")
1086     void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception {
1087         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1088                 cf -> cf.withThreadFactory(factory))) {
1089 
1090             Subtask<String> subtask = scope.fork(() -> "foo");
1091 
1092             // before join
1093             assertThrows(IllegalStateException.class, subtask::get);
1094             assertThrows(IllegalStateException.class, subtask::exception);
1095 
1096             scope.join();
1097 
1098             // after join
1099             assertEquals(Subtask.State.SUCCESS, subtask.state());
1100             assertEquals("foo", subtask.get());
1101             assertThrows(IllegalStateException.class, subtask::exception);
1102         }
1103     }
1104 
1105     /**
1106      * Test Subtask with task that fails.
1107      */
1108     @ParameterizedTest
1109     @MethodSource("factories")
1110     void testSubtaskWhenFailed(ThreadFactory factory) throws Exception {
1111         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1112                 cf -> cf.withThreadFactory(factory))) {
1113 
1114             Subtask<String> subtask = scope.fork(() -> { throw new FooException(); });
1115 
1116             // before join
1117             assertThrows(IllegalStateException.class, subtask::get);
1118             assertThrows(IllegalStateException.class, subtask::exception);
1119 
1120             scope.join();
1121 
1122             // after join
1123             assertEquals(Subtask.State.FAILED, subtask.state());
1124             assertThrows(IllegalStateException.class, subtask::get);
1125             assertTrue(subtask.exception() instanceof FooException);
1126         }
1127     }
1128 
1129     /**
1130      * Test Subtask with a task that has not completed.
1131      */
1132     @ParameterizedTest
1133     @MethodSource("factories")
1134     void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception {
1135         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
1136                 cf -> cf.withThreadFactory(factory))) {
1137             Subtask<Void> subtask = scope.fork(() -> {
1138                 Thread.sleep(Duration.ofDays(1));
1139                 return null;
1140             });
1141 
1142             // before join
1143             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1144             assertThrows(IllegalStateException.class, subtask::get);
1145             assertThrows(IllegalStateException.class, subtask::exception);
1146 
1147             // attempt join, join throws
1148             Thread.currentThread().interrupt();
1149             assertThrows(InterruptedException.class, scope::join);
1150 
1151             // after join
1152             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1153             assertThrows(IllegalStateException.class, subtask::get);
1154             assertThrows(IllegalStateException.class, subtask::exception);
1155         }
1156     }
1157 
1158     /**
1159      * Test Subtask forked after execution cancelled.
1160      */
1161     @ParameterizedTest
1162     @MethodSource("factories")
1163     void testSubtaskWhenCancelled(ThreadFactory factory) throws Exception {
1164         try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) {
1165             scope.fork(() -> "foo");
1166             while (!scope.isCancelled()) {
1167                 Thread.sleep(20);
1168             }
1169 
1170             var subtask = scope.fork(() -> "foo");
1171 
1172             // before join
1173             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1174             assertThrows(IllegalStateException.class, subtask::get);
1175             assertThrows(IllegalStateException.class, subtask::exception);
1176 
1177             scope.join();
1178 
1179             // after join
1180             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1181             assertThrows(IllegalStateException.class, subtask::get);
1182             assertThrows(IllegalStateException.class, subtask::exception);
1183         }
1184     }
1185 
1186     /**
1187      * Test Subtask::toString.
1188      */
1189     @Test
1190     void testSubtaskToString() throws Exception {
1191         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1192             var latch = new CountDownLatch(1);
1193             var subtask1 = scope.fork(() -> {
1194                 latch.await();
1195                 return "foo";
1196             });
1197             var subtask2 = scope.fork(() -> { throw new FooException(); });
1198 
1199             // subtask1 result is unavailable
1200             assertTrue(subtask1.toString().contains("Unavailable"));
1201             latch.countDown();
1202 
1203             scope.join();
1204 
1205             assertTrue(subtask1.toString().contains("Completed successfully"));
1206             assertTrue(subtask2.toString().contains("Failed"));
1207         }
1208     }
1209 
1210     /**
1211      * Test Joiner.allSuccessfulOrThrow() with no subtasks.
1212      */
1213     @Test
1214     void testAllSuccessfulOrThrow1() throws Throwable {
1215         try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow())) {
1216             var subtasks = scope.join().toList();
1217             assertTrue(subtasks.isEmpty());
1218         }
1219     }
1220 
1221     /**
1222      * Test Joiner.allSuccessfulOrThrow() with subtasks that complete successfully.
1223      */
1224     @ParameterizedTest
1225     @MethodSource("factories")
1226     void testAllSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
1227         try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1228                 cf -> cf.withThreadFactory(factory))) {
1229             var subtask1 = scope.fork(() -> "foo");
1230             var subtask2 = scope.fork(() -> "bar");
1231             var subtasks = scope.join().toList();
1232             assertEquals(List.of(subtask1, subtask2), subtasks);
1233             assertEquals("foo", subtask1.get());
1234             assertEquals("bar", subtask2.get());
1235         }
1236     }
1237 
1238     /**
1239      * Test Joiner.allSuccessfulOrThrow() with a subtask that complete successfully and
1240      * a subtask that fails.
1241      */
1242     @ParameterizedTest
1243     @MethodSource("factories")
1244     void testAllSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1245         try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1246                 cf -> cf.withThreadFactory(factory))) {
1247             scope.fork(() -> "foo");
1248             scope.fork(() -> { throw new FooException(); });
1249             try {
1250                 scope.join();
1251             } catch (ExecutionException e) {
1252                 assertTrue(e.getCause() instanceof FooException);
1253             }
1254         }
1255     }
1256 
1257     /**
1258      * Test Joiner.anySuccessfulResultOrThrow() with no subtasks.
1259      */
1260     @Test
1261     void testAnySuccessfulResultOrThrow1() throws Exception {
1262         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
1263             try {
1264                 scope.join();
1265             } catch (ExecutionException e) {
1266                 assertTrue(e.getCause() instanceof NoSuchElementException);
1267             }
1268         }
1269     }
1270 
1271     /**
1272      * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully.
1273      */
1274     @ParameterizedTest
1275     @MethodSource("factories")
1276     void testAnySuccessfulResultOrThrow2(ThreadFactory factory) throws Exception {
1277         try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
1278                 cf -> cf.withThreadFactory(factory))) {
1279             scope.fork(() -> "foo");
1280             String result = scope.join();
1281             assertEquals("foo", result);
1282         }
1283     }
1284 
1285     /**
1286      * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully
1287      * with a null result.
1288      */
1289     @ParameterizedTest
1290     @MethodSource("factories")
1291     void testAnySuccessfulResultOrThrow3(ThreadFactory factory) throws Exception {
1292         try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
1293                 cf -> cf.withThreadFactory(factory))) {
1294             scope.fork(() -> null);
1295             String result = scope.join();
1296             assertNull(result);
1297         }
1298     }
1299 
1300     /**
1301      * Test Joiner.anySuccessfulResultOrThrow() with a subtask that complete succcessfully
1302      * and a subtask that fails.
1303      */
1304     @ParameterizedTest
1305     @MethodSource("factories")
1306     void testAnySuccessfulResultOrThrow4(ThreadFactory factory) throws Exception {
1307         try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
1308                 cf -> cf.withThreadFactory(factory))) {
1309             scope.fork(() -> "foo");
1310             scope.fork(() -> { throw new FooException(); });
1311             String first = scope.join();
1312             assertEquals("foo", first);
1313         }
1314     }
1315 
1316     /**
1317      * Test Joiner.anySuccessfulResultOrThrow() with a subtask that fails.
1318      */
1319     @ParameterizedTest
1320     @MethodSource("factories")
1321     void testAnySuccessfulResultOrThrow5(ThreadFactory factory) throws Exception {
1322         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
1323                 cf -> cf.withThreadFactory(factory))) {
1324             scope.fork(() -> { throw new FooException(); });
1325             Throwable ex = assertThrows(ExecutionException.class, scope::join);
1326             assertTrue(ex.getCause() instanceof FooException);
1327         }
1328     }
1329 
1330     /**
1331      * Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks.
1332      */
1333     @Test
1334     void testIgnoreSuccessfulOrThrow1() throws Throwable {
1335         try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {
1336             var result = scope.join();
1337             assertNull(result);
1338         }
1339     }
1340 
1341     /**
1342      * Test Joiner.awaitAllSuccessfulOrThrow() with subtasks that complete successfully.
1343      */
1344     @ParameterizedTest
1345     @MethodSource("factories")
1346     void testIgnoreSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
1347         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1348                 cf -> cf.withThreadFactory(factory))) {
1349             var subtask1 = scope.fork(() -> "foo");
1350             var subtask2 = scope.fork(() -> "bar");
1351             var result = scope.join();
1352             assertNull(result);
1353             assertEquals("foo", subtask1.get());
1354             assertEquals("bar", subtask2.get());
1355         }
1356     }
1357 
1358     /**
1359      * Test Joiner.awaitAllSuccessfulOrThrow() with a subtask that complete successfully and
1360      * a subtask that fails.
1361      */
1362     @ParameterizedTest
1363     @MethodSource("factories")
1364     void testIgnoreSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1365         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1366                 cf -> cf.withThreadFactory(factory))) {
1367             scope.fork(() -> "foo");
1368             scope.fork(() -> { throw new FooException(); });
1369             try {
1370                 scope.join();
1371             } catch (ExecutionException e) {
1372                 assertTrue(e.getCause() instanceof FooException);
1373             }
1374         }
1375     }
1376 
1377     /**
1378      * Test Joiner.awaitAll() with no subtasks.
1379      */
1380     @Test
1381     void testIgnoreAll1() throws Throwable {
1382         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1383             var result = scope.join();
1384             assertNull(result);
1385         }
1386     }
1387 
1388     /**
1389      * Test Joiner.awaitAll() with subtasks that complete successfully.
1390      */
1391     @ParameterizedTest
1392     @MethodSource("factories")
1393     void testIgnoreAll2(ThreadFactory factory) throws Throwable {
1394         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1395                 cf -> cf.withThreadFactory(factory))) {
1396             var subtask1 = scope.fork(() -> "foo");
1397             var subtask2 = scope.fork(() -> "bar");
1398             var result = scope.join();
1399             assertNull(result);
1400             assertEquals("foo", subtask1.get());
1401             assertEquals("bar", subtask2.get());
1402         }
1403     }
1404 
1405     /**
1406      * Test Joiner.awaitAll() with a subtask that complete successfully and a subtask
1407      * that fails.
1408      */
1409     @ParameterizedTest
1410     @MethodSource("factories")
1411     void testIgnoreAll3(ThreadFactory factory) throws Throwable {
1412         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1413                 cf -> cf.withThreadFactory(factory))) {
1414             var subtask1 = scope.fork(() -> "foo");
1415             var subtask2 = scope.fork(() -> { throw new FooException(); });
1416             var result = scope.join();
1417             assertNull(result);
1418             assertEquals("foo", subtask1.get());
1419             assertTrue(subtask2.exception() instanceof FooException);
1420         }
1421     }
1422 
1423     /**
1424      * Test Joiner.all(Predicate) with no subtasks.
1425      */
1426     @Test
1427     void testAllWithPredicate1() throws Throwable {
1428         try (var scope = StructuredTaskScope.open(Joiner.all(s -> false))) {
1429             var subtasks = scope.join();
1430             assertEquals(0, subtasks.count());
1431         }
1432     }
1433 
1434     /**
1435      * Test Joiner.all(Predicate) with no cancellation.
1436      */
1437     @ParameterizedTest
1438     @MethodSource("factories")
1439     void testAllWithPredicate2(ThreadFactory factory) throws Exception {
1440         try (var scope = StructuredTaskScope.open(Joiner.<String>all(s -> false),
1441                 cf -> cf.withThreadFactory(factory))) {
1442 
1443             var subtask1 = scope.fork(() -> "foo");
1444             var subtask2 = scope.fork(() -> { throw new FooException(); });
1445 
1446             var subtasks = scope.join().toList();
1447             assertEquals(2, subtasks.size());
1448 
1449             assertTrue(subtasks.get(0) == subtask1);
1450             assertTrue(subtasks.get(1) == subtask2);
1451             assertEquals("foo", subtask1.get());
1452             assertTrue(subtask2.exception() instanceof FooException);
1453         }
1454     }
1455 
1456     /**
1457      * Test Joiner.all(Predicate) with cancellation after one subtask completes.
1458      */
1459     @ParameterizedTest
1460     @MethodSource("factories")
1461     void testAllWithPredicate3(ThreadFactory factory) throws Exception {
1462         try (var scope = StructuredTaskScope.open(Joiner.<String>all(s -> true),
1463                 cf -> cf.withThreadFactory(factory))) {
1464 
1465             var subtask1 = scope.fork(() -> "foo");
1466             var subtask2 = scope.fork(() -> {
1467                 Thread.sleep(Duration.ofDays(1));
1468                 return "bar";
1469             });
1470 
1471             var subtasks = scope.join().toList();
1472 
1473             assertEquals(2, subtasks.size());
1474             assertTrue(subtasks.get(0) == subtask1);
1475             assertTrue(subtasks.get(1) == subtask2);
1476             assertEquals("foo", subtask1.get());
1477             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1478         }
1479     }
1480 
1481     /**
1482      * Test Joiner.all(Predicate) with cancellation after serveral subtasks complete.
1483      */
1484     @ParameterizedTest
1485     @MethodSource("factories")
1486     void testAllWithPredicate4(ThreadFactory factory) throws Exception {
1487 
1488         // cancel execution after two or more failures
1489         class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> {
1490             final AtomicInteger failedCount = new AtomicInteger();
1491             @Override
1492             public boolean test(Subtask<? extends T> subtask) {
1493                 return subtask.state() == Subtask.State.FAILED
1494                         && failedCount.incrementAndGet() >= 2;
1495             }
1496         }
1497         var joiner = Joiner.all(new CancelAfterTwoFailures<String>());
1498 
1499         try (var scope = StructuredTaskScope.open(joiner)) {
1500             int forkCount = 0;
1501 
1502             // fork subtasks until execution cancelled
1503             while (!scope.isCancelled()) {
1504                 scope.fork(() -> "foo");
1505                 scope.fork(() -> { throw new FooException(); });
1506                 forkCount += 2;
1507                 Thread.sleep(Duration.ofMillis(10));
1508             }
1509 
1510             var subtasks = scope.join().toList();
1511             assertEquals(forkCount, subtasks.size());
1512 
1513             long failedCount = subtasks.stream()
1514                     .filter(s -> s.state() == Subtask.State.FAILED)
1515                     .count();
1516             assertTrue(failedCount >= 2);
1517         }
1518     }
1519 
1520     /**
1521      * Test Config equals/hashCode/toString
1522      */
1523     @Test
1524     void testConfigMethods() throws Exception {
1525         Function<Config, Config> testConfig = cf -> {
1526             var name = "duke";
1527             var threadFactory = Thread.ofPlatform().factory();
1528             var timeout = Duration.ofSeconds(10);
1529 
1530             assertEquals(cf, cf);
1531             assertEquals(cf.withName(name), cf.withName(name));
1532             assertEquals(cf.withThreadFactory(threadFactory), cf.withThreadFactory(threadFactory));
1533             assertEquals(cf.withTimeout(timeout), cf.withTimeout(timeout));
1534 
1535             assertNotEquals(cf, cf.withName(name));
1536             assertNotEquals(cf, cf.withThreadFactory(threadFactory));
1537             assertNotEquals(cf, cf.withTimeout(timeout));
1538 
1539             assertEquals(cf.withName(name).hashCode(), cf.withName(name).hashCode());
1540             assertEquals(cf.withThreadFactory(threadFactory).hashCode(),
1541                     cf.withThreadFactory(threadFactory).hashCode());
1542             assertEquals(cf.withTimeout(timeout).hashCode(), cf.withTimeout(timeout).hashCode());
1543 
1544             assertTrue(cf.withName(name).toString().contains(name));
1545             assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
1546             assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
1547 
1548             return cf;
1549         };
1550         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), testConfig)) {
1551             // do nothing
1552         }
1553     }
1554 
1555     /**
1556      * Test Joiner default methods.
1557      */
1558     @Test
1559     void testJoinerDefaultMethods() throws Exception {
1560         try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) {
1561 
1562             // need subtasks to test default methods
1563             var subtask1 = scope.fork(() -> "foo");
1564             while (!scope.isCancelled()) {
1565                 Thread.sleep(20);
1566             }
1567             var subtask2 = scope.fork(() -> "bar");
1568             scope.join();
1569 
1570             assertEquals(Subtask.State.SUCCESS, subtask1.state());
1571             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1572 
1573             // Joiner that does not override default methods
1574             Joiner<Object, Void> joiner = () -> null;
1575             assertThrows(NullPointerException.class, () -> joiner.onFork(null));
1576             assertThrows(NullPointerException.class, () -> joiner.onComplete(null));
1577             assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1));
1578             assertFalse(joiner.onFork(subtask2));
1579             assertFalse(joiner.onComplete(subtask1));
1580             assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2));
1581         }
1582     }
1583 
1584     /**
1585      * Test for NullPointerException.
1586      */
1587     @Test
1588     void testNulls() throws Exception {
1589         assertThrows(NullPointerException.class,
1590                 () -> StructuredTaskScope.open(null));
1591         assertThrows(NullPointerException.class,
1592                 () -> StructuredTaskScope.open(null, cf -> cf));
1593         assertThrows(NullPointerException.class,
1594                 () -> StructuredTaskScope.open(Joiner.awaitAll(), null));
1595         assertThrows(NullPointerException.class,
1596                 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> null));
1597 
1598         assertThrows(NullPointerException.class, () -> Joiner.all(null));
1599 
1600         // fork
1601         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1602             assertThrows(NullPointerException.class, () -> scope.fork((Callable<Object>) null));
1603             assertThrows(NullPointerException.class, () -> scope.fork((Runnable) null));
1604         }
1605 
1606         // withXXX
1607         assertThrows(NullPointerException.class,
1608                 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withName(null)));
1609         assertThrows(NullPointerException.class,
1610                 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withThreadFactory(null)));
1611         assertThrows(NullPointerException.class,
1612                 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withTimeout(null)));
1613     }
1614 
1615     /**
1616      * ThreadFactory that counts usage.
1617      */
1618     private static class CountingThreadFactory implements ThreadFactory {
1619         final ThreadFactory delegate;
1620         final AtomicInteger threadCount = new AtomicInteger();
1621         CountingThreadFactory(ThreadFactory delegate) {
1622             this.delegate = delegate;
1623         }
1624         @Override
1625         public Thread newThread(Runnable task) {
1626             threadCount.incrementAndGet();
1627             return delegate.newThread(task);
1628         }
1629         int threadCount() {
1630             return threadCount.get();
1631         }
1632     }
1633 
1634     /**
1635      * A joiner that counts that counts the number of subtasks that are forked and the
1636      * number of subtasks that complete.
1637      */
1638     private static class CountingJoiner<T> implements Joiner<T, Void> {
1639         final AtomicInteger onForkCount = new AtomicInteger();
1640         final AtomicInteger onCompleteCount = new AtomicInteger();
1641         @Override
1642         public boolean onFork(Subtask<? extends T> subtask) {
1643             onForkCount.incrementAndGet();
1644             return false;
1645         }
1646         @Override
1647         public boolean onComplete(Subtask<? extends T> subtask) {
1648             onCompleteCount.incrementAndGet();
1649             return false;
1650         }
1651         @Override
1652         public Void result() {
1653             return null;
1654         }
1655         int onForkCount() {
1656             return onForkCount.get();
1657         }
1658         int onCompleteCount() {
1659             return onCompleteCount.get();
1660         }
1661     }
1662 
1663     /**
1664      * A joiner that cancels execution when a subtask completes. It also keeps a count
1665      * of the number of subtasks that are forked and the number of subtasks that complete.
1666      */
1667     private static class CancelAfterOneJoiner<T> implements Joiner<T, Void> {
1668         final AtomicInteger onForkCount = new AtomicInteger();
1669         final AtomicInteger onCompleteCount = new AtomicInteger();
1670         @Override
1671         public boolean onFork(Subtask<? extends T> subtask) {
1672             onForkCount.incrementAndGet();
1673             return false;
1674         }
1675         @Override
1676         public boolean onComplete(Subtask<? extends T> subtask) {
1677             onCompleteCount.incrementAndGet();
1678             return true;
1679         }
1680         @Override
1681         public Void result() {
1682             return null;
1683         }
1684         int onForkCount() {
1685             return onForkCount.get();
1686         }
1687         int onCompleteCount() {
1688             return onCompleteCount.get();
1689         }
1690     }
1691 
1692     /**
1693      * A runtime exception for tests.
1694      */
1695     private static class FooException extends RuntimeException {
1696         FooException() { }
1697         FooException(Throwable cause) { super(cause); }
1698     }
1699 
1700     /**
1701      * Returns the current time in milliseconds.
1702      */
1703     private long millisTime() {
1704         long now = System.nanoTime();
1705         return TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);
1706     }
1707 
1708     /**
1709      * Check the duration of a task
1710      * @param start start time, in milliseconds
1711      * @param min minimum expected duration, in milliseconds
1712      * @param max maximum expected duration, in milliseconds
1713      * @return the duration (now - start), in milliseconds
1714      */
1715     private long expectDuration(long start, long min, long max) {
1716         long duration = millisTime() - start;
1717         assertTrue(duration >= min,
1718                 "Duration " + duration + "ms, expected >= " + min + "ms");
1719         assertTrue(duration <= max,
1720                 "Duration " + duration + "ms, expected <= " + max + "ms");
1721         return duration;
1722     }
1723 
1724     /**
1725      * Interrupts a thread when it waits (timed or untimed) at location "{@code c.m}".
1726      * {@code c} is the fully qualified class name and {@code m} is the method name.
1727      */
1728     private void interruptThreadAt(Thread target, String location) throws InterruptedException {
1729         int index = location.lastIndexOf('.');
1730         String className = location.substring(0, index);
1731         String methodName = location.substring(index + 1);
1732 
1733         boolean found = false;
1734         while (!found) {
1735             Thread.State state = target.getState();
1736             assertTrue(state != TERMINATED);
1737             if ((state == WAITING || state == TIMED_WAITING)
1738                     && contains(target.getStackTrace(), className, methodName)) {
1739                 found = true;
1740             } else {
1741                 Thread.sleep(20);
1742             }
1743         }
1744         target.interrupt();
1745     }
1746 
1747     /**
1748      * Schedules the current thread to be interrupted when it waits (timed or untimed)
1749      * at the given location.
1750      */
1751     private void scheduleInterruptAt(String location) {
1752         Thread target = Thread.currentThread();
1753         scheduler.submit(() -> {
1754             interruptThreadAt(target, location);
1755             return null;
1756         });
1757     }
1758 
1759     /**
1760      * Returns true if the given stack trace contains an element for the given class
1761      * and method name.
1762      */
1763     private boolean contains(StackTraceElement[] stack, String className, String methodName) {
1764         return Arrays.stream(stack)
1765                 .anyMatch(e -> className.equals(e.getClassName())
1766                         && methodName.equals(e.getMethodName()));
1767     }
1768 }