1 /*
   2  * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /*
  25  * @test id=platform
  26  * @bug 8284199 8296779 8306647
  27  * @summary Basic tests for StructuredTaskScope
  28  * @enablePreview
  29  * @run junit/othervm -DthreadFactory=platform StructuredTaskScopeTest
  30  */
  31 
  32 /*
  33  * @test id=virtual
  34  * @enablePreview
  35  * @run junit/othervm -DthreadFactory=virtual StructuredTaskScopeTest
  36  */
  37 
  38 import java.time.Duration;
  39 import java.util.Arrays;
  40 import java.util.ArrayList;
  41 import java.util.List;
  42 import java.util.Set;
  43 import java.util.NoSuchElementException;
  44 import java.util.concurrent.Callable;
  45 import java.util.concurrent.ConcurrentHashMap;
  46 import java.util.concurrent.CountDownLatch;
  47 import java.util.concurrent.Executors;
  48 import java.util.concurrent.Future;
  49 import java.util.concurrent.LinkedTransferQueue;
  50 import java.util.concurrent.ThreadFactory;
  51 import java.util.concurrent.TimeUnit;
  52 import java.util.concurrent.RejectedExecutionException;
  53 import java.util.concurrent.ScheduledExecutorService;
  54 import java.util.concurrent.StructuredTaskScope;
  55 import java.util.concurrent.StructuredTaskScope.TimeoutException;
  56 import java.util.concurrent.StructuredTaskScope.Config;
  57 import java.util.concurrent.StructuredTaskScope.FailedException;
  58 import java.util.concurrent.StructuredTaskScope.Joiner;
  59 import java.util.concurrent.StructuredTaskScope.Subtask;
  60 import java.util.concurrent.StructureViolationException;
  61 import java.util.concurrent.atomic.AtomicBoolean;
  62 import java.util.concurrent.atomic.AtomicInteger;
  63 import java.util.concurrent.atomic.AtomicReference;
  64 import java.util.function.Function;
  65 import java.util.function.Predicate;
  66 import java.util.stream.Stream;
  67 import static java.lang.Thread.State.*;
  68 
  69 import org.junit.jupiter.api.Test;
  70 import org.junit.jupiter.api.BeforeAll;
  71 import org.junit.jupiter.api.AfterAll;
  72 import org.junit.jupiter.params.ParameterizedTest;
  73 import org.junit.jupiter.params.provider.MethodSource;
  74 import static org.junit.jupiter.api.Assertions.*;
  75 
  76 class StructuredTaskScopeTest {
  77     private static ScheduledExecutorService scheduler;
  78     private static List<ThreadFactory> threadFactories;
  79 
  80     @BeforeAll
  81     static void setup() throws Exception {
  82         scheduler = Executors.newSingleThreadScheduledExecutor();
  83 
  84         // thread factories
  85         String value = System.getProperty("threadFactory");
  86         List<ThreadFactory> list = new ArrayList<>();
  87         if (value == null || value.equals("platform"))
  88             list.add(Thread.ofPlatform().factory());
  89         if (value == null || value.equals("virtual"))
  90             list.add(Thread.ofVirtual().factory());
  91         assertTrue(list.size() > 0, "No thread factories for tests");
  92         threadFactories = list;
  93     }
  94 
  95     @AfterAll
  96     static void shutdown() {
  97         scheduler.shutdown();
  98     }
  99 
 100     private static Stream<ThreadFactory> factories() {
 101         return threadFactories.stream();
 102     }
 103 
 104     /**
 105      * Test that fork creates virtual threads when no ThreadFactory is configured.
 106      */
 107     @Test
 108     void testForkCreatesVirtualThread() throws Exception {
 109         Set<Thread> threads = ConcurrentHashMap.newKeySet();
 110         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
 111             for (int i = 0; i < 50; i++) {
 112                 // runnable
 113                 scope.fork(() -> {
 114                     threads.add(Thread.currentThread());
 115                 });
 116 
 117                 // callable
 118                 scope.fork(() -> {
 119                     threads.add(Thread.currentThread());
 120                     return null;
 121                 });
 122             }
 123             scope.join();
 124         }
 125         assertEquals(100, threads.size());
 126         threads.forEach(t -> assertTrue(t.isVirtual()));
 127     }
 128 
 129     /**
 130      * Test that fork create threads with the configured ThreadFactory.
 131      */
 132     @ParameterizedTest
 133     @MethodSource("factories")
 134     void testForkUsesThreadFactory(ThreadFactory factory) throws Exception {
 135         // TheadFactory that keeps reference to all threads it creates
 136         class RecordingThreadFactory implements ThreadFactory {
 137             final ThreadFactory delegate;
 138             final Set<Thread> threads = ConcurrentHashMap.newKeySet();
 139             RecordingThreadFactory(ThreadFactory delegate) {
 140                 this.delegate = delegate;
 141             }
 142             @Override
 143             public Thread newThread(Runnable task) {
 144                 Thread thread = delegate.newThread(task);
 145                 threads.add(thread);
 146                 return thread;
 147             }
 148             Set<Thread> threads() {
 149                 return threads;
 150             }
 151         }
 152         var recordingThreadFactory = new RecordingThreadFactory(factory);
 153         Set<Thread> threads = ConcurrentHashMap.newKeySet();
 154         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 155                 cf -> cf.withThreadFactory(recordingThreadFactory))) {
 156 
 157             for (int i = 0; i < 50; i++) {
 158                 // runnable
 159                 scope.fork(() -> {
 160                     threads.add(Thread.currentThread());
 161                 });
 162 
 163                 // callable
 164                 scope.fork(() -> {
 165                     threads.add(Thread.currentThread());
 166                     return null;
 167                 });
 168             }
 169             scope.join();
 170         }
 171         assertEquals(100, threads.size());
 172         assertEquals(recordingThreadFactory.threads(), threads);
 173     }
 174 
 175     /**
 176      * Test fork method is owner confined.
 177      */
 178     @ParameterizedTest
 179     @MethodSource("factories")
 180     void testForkConfined(ThreadFactory factory) throws Exception {
 181         try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
 182                 cf -> cf.withThreadFactory(factory))) {
 183 
 184             // random thread cannot fork
 185             try (var pool = Executors.newSingleThreadExecutor()) {
 186                 Future<Void> future = pool.submit(() -> {
 187                     assertThrows(WrongThreadException.class, () -> {
 188                         scope.fork(() -> null);
 189                     });
 190                     return null;
 191                 });
 192                 future.get();
 193             }
 194 
 195             // subtask cannot fork
 196             Subtask<Boolean> subtask = scope.fork(() -> {
 197                 assertThrows(WrongThreadException.class, () -> {
 198                     scope.fork(() -> null);
 199                 });
 200                 return true;
 201             });
 202             scope.join();
 203             assertTrue(subtask.get());
 204         }
 205     }
 206 
 207     /**
 208      * Test fork after join, no subtasks forked before join.
 209      */
 210     @ParameterizedTest
 211     @MethodSource("factories")
 212     void testForkAfterJoin1(ThreadFactory factory) throws Exception {
 213         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 214                 cf -> cf.withThreadFactory(factory))) {
 215             scope.join();
 216             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 217         }
 218     }
 219 
 220     /**
 221      * Test fork after join, subtasks forked before join.
 222      */
 223     @ParameterizedTest
 224     @MethodSource("factories")
 225     void testForkAfterJoin2(ThreadFactory factory) throws Exception {
 226         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 227                 cf -> cf.withThreadFactory(factory))) {
 228             scope.fork(() -> "foo");
 229             scope.join();
 230             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 231         }
 232     }
 233 
 234     /**
 235      * Test fork after join throws.
 236      */
 237     @ParameterizedTest
 238     @MethodSource("factories")
 239     void testForkAfterJoinThrows(ThreadFactory factory) throws Exception {
 240         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 241                 cf -> cf.withThreadFactory(factory))) {
 242             var latch = new CountDownLatch(1);
 243             var subtask1 = scope.fork(() -> {
 244                 latch.await();
 245                 return "foo";
 246             });
 247 
 248             // join throws
 249             Thread.currentThread().interrupt();
 250             assertThrows(InterruptedException.class, scope::join);
 251 
 252             // fork should throw
 253             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 254         }
 255     }
 256 
 257     /**
 258      * Test fork after task scope is cancelled. This test uses a custom Joiner to
 259      * cancel execution.
 260      */
 261     @ParameterizedTest
 262     @MethodSource("factories")
 263     void testForkAfterCancel2(ThreadFactory factory) throws Exception {
 264         var countingThreadFactory = new CountingThreadFactory(factory);
 265         var testJoiner = new CancelAfterOneJoiner<String>();
 266 
 267         try (var scope = StructuredTaskScope.open(testJoiner,
 268                 cf -> cf.withThreadFactory(countingThreadFactory))) {
 269 
 270             // fork subtask, the scope should be cancelled when the subtask completes
 271             var subtask1 = scope.fork(() -> "foo");
 272             while (!scope.isCancelled()) {
 273                 Thread.sleep(20);
 274             }
 275 
 276             assertEquals(1, countingThreadFactory.threadCount());
 277             assertEquals(1, testJoiner.onForkCount());
 278             assertEquals(1, testJoiner.onCompleteCount());
 279 
 280             // fork second subtask, it should not run
 281             var subtask2 = scope.fork(() -> "bar");
 282 
 283             // onFork should be invoked, newThread and onComplete should not be invoked
 284             assertEquals(1, countingThreadFactory.threadCount());
 285             assertEquals(2, testJoiner.onForkCount());
 286             assertEquals(1, testJoiner.onCompleteCount());
 287 
 288             scope.join();
 289 
 290             assertEquals(1, countingThreadFactory.threadCount());
 291             assertEquals(2, testJoiner.onForkCount());
 292             assertEquals(1, testJoiner.onCompleteCount());
 293             assertEquals("foo", subtask1.get());
 294             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
 295         }
 296     }
 297 
 298     /**
 299      * Test fork after task scope is closed.
 300      */
 301     @Test
 302     void testForkAfterClose() {
 303         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
 304             scope.close();
 305             assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
 306         }
 307     }
 308 
 309     /**
 310      * Test fork with a ThreadFactory that rejects creating a thread.
 311      */
 312     @Test
 313     void testForkRejectedExecutionException() {
 314         ThreadFactory factory = task -> null;
 315         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 316                 cf -> cf.withThreadFactory(factory))) {
 317             assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null));
 318         }
 319     }
 320 
 321     /**
 322      * Test join with no subtasks.
 323      */
 324     @Test
 325     void testJoinWithNoSubtasks() throws Exception {
 326         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
 327             scope.join();
 328         }
 329     }
 330 
 331     /**
 332      * Test join with a remaining subtask.
 333      */
 334     @ParameterizedTest
 335     @MethodSource("factories")
 336     void testJoinWithRemainingSubtasks(ThreadFactory factory) throws Exception {
 337         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 338                 cf -> cf.withThreadFactory(factory))) {
 339             Subtask<String> subtask = scope.fork(() -> {
 340                 Thread.sleep(Duration.ofMillis(100));
 341                 return "foo";
 342             });
 343             scope.join();
 344             assertEquals("foo", subtask.get());
 345         }
 346     }
 347 
 348     /**
 349      * Test join after join completed with a result.
 350      */
 351     @Test
 352     void testJoinAfterJoin1() throws Exception {
 353         var results = new LinkedTransferQueue<>(List.of("foo", "bar", "baz"));
 354         Joiner<Object, String> joiner = results::take;
 355         try (var scope = StructuredTaskScope.open(joiner)) {
 356             scope.fork(() -> "foo");
 357             assertEquals("foo", scope.join());
 358 
 359             // join already called
 360             for (int i = 0 ; i < 3; i++) {
 361                 assertThrows(IllegalStateException.class, scope::join);
 362             }
 363         }
 364     }
 365 
 366     /**
 367      * Test join after join completed with an exception.
 368      */
 369     @Test
 370     void testJoinAfterJoin2() throws Exception {
 371         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
 372             scope.fork(() -> { throw new FooException(); });
 373             Throwable ex = assertThrows(FailedException.class, scope::join);
 374             assertTrue(ex.getCause() instanceof FooException);
 375 
 376             // join already called
 377             for (int i = 0 ; i < 3; i++) {
 378                 assertThrows(IllegalStateException.class, scope::join);
 379             }
 380         }
 381     }
 382 
 383     /**
 384      * Test join after join completed with a timeout.
 385      */
 386     @Test
 387     void testJoinAfterJoin3() throws Exception {
 388         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
 389                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
 390             // wait for scope to be cancelled by timeout
 391             while (!scope.isCancelled()) {
 392                 Thread.sleep(20);
 393             }
 394             assertThrows(TimeoutException.class, scope::join);
 395 
 396             // join already called
 397             for (int i = 0 ; i < 3; i++) {
 398                 assertThrows(IllegalStateException.class, scope::join);
 399             }
 400         }
 401     }
 402 
 403     /**
 404      * Test join method is owner confined.
 405      */
 406     @ParameterizedTest
 407     @MethodSource("factories")
 408     void testJoinConfined(ThreadFactory factory) throws Exception {
 409         try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
 410                 cf -> cf.withThreadFactory(factory))) {
 411 
 412             // random thread cannot join
 413             try (var pool = Executors.newSingleThreadExecutor()) {
 414                 Future<Void> future = pool.submit(() -> {
 415                     assertThrows(WrongThreadException.class, scope::join);
 416                     return null;
 417                 });
 418                 future.get();
 419             }
 420 
 421             // subtask cannot join
 422             Subtask<Boolean> subtask = scope.fork(() -> {
 423                 assertThrows(WrongThreadException.class, () -> { scope.join(); });
 424                 return true;
 425             });
 426             scope.join();
 427             assertTrue(subtask.get());
 428         }
 429     }
 430 
 431     /**
 432      * Test join with interrupt status set.
 433      */
 434     @ParameterizedTest
 435     @MethodSource("factories")
 436     void testInterruptJoin1(ThreadFactory factory) throws Exception {
 437         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 438                 cf -> cf.withThreadFactory(factory))) {
 439 
 440             Subtask<String> subtask = scope.fork(() -> {
 441                 Thread.sleep(60_000);
 442                 return "foo";
 443             });
 444 
 445             // join should throw
 446             Thread.currentThread().interrupt();
 447             try {
 448                 scope.join();
 449                 fail("join did not throw");
 450             } catch (InterruptedException expected) {
 451                 assertFalse(Thread.interrupted());   // interrupt status should be cleared
 452             }
 453         }
 454     }
 455 
 456     /**
 457      * Test interrupt of thread blocked in join.
 458      */
 459     @ParameterizedTest
 460     @MethodSource("factories")
 461     void testInterruptJoin2(ThreadFactory factory) throws Exception {
 462         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 463                 cf -> cf.withThreadFactory(factory))) {
 464 
 465             var latch = new CountDownLatch(1);
 466             Subtask<String> subtask = scope.fork(() -> {
 467                 Thread.sleep(60_000);
 468                 return "foo";
 469             });
 470 
 471             // interrupt main thread when it blocks in join
 472             scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join");
 473             try {
 474                 scope.join();
 475                 fail("join did not throw");
 476             } catch (InterruptedException expected) {
 477                 assertFalse(Thread.interrupted());   // interrupt status should be clear
 478             }
 479         }
 480     }
 481 
 482     /**
 483      * Test join when scope is cancelled.
 484      */
 485     @ParameterizedTest
 486     @MethodSource("factories")
 487     void testJoinWhenCancelled(ThreadFactory factory) throws Exception {
 488         var countingThreadFactory = new CountingThreadFactory(factory);
 489         var testJoiner = new CancelAfterOneJoiner<String>();
 490 
 491         try (var scope = StructuredTaskScope.open(testJoiner,
 492                     cf -> cf.withThreadFactory(countingThreadFactory))) {
 493 
 494             // fork subtask, the scope should be cancelled when the subtask completes
 495             var subtask1 = scope.fork(() -> "foo");
 496             while (!scope.isCancelled()) {
 497                 Thread.sleep(20);
 498             }
 499 
 500             // fork second subtask, it should not run
 501             var subtask2 = scope.fork(() -> {
 502                 Thread.sleep(Duration.ofDays(1));
 503                 return "bar";
 504             });
 505 
 506             scope.join();
 507 
 508             assertEquals("foo", subtask1.get());
 509             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
 510         }
 511     }
 512 
 513     /**
 514      * Test join after scope is closed.
 515      */
 516     @Test
 517     void testJoinAfterClose() throws Exception {
 518         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
 519             scope.close();
 520             assertThrows(IllegalStateException.class, () -> scope.join());
 521         }
 522     }
 523 
 524     /**
 525      * Test join with timeout, subtasks finish before timeout expires.
 526      */
 527     @ParameterizedTest
 528     @MethodSource("factories")
 529     void testJoinWithTimeout1(ThreadFactory factory) throws Exception {
 530         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 531                 cf -> cf.withThreadFactory(factory)
 532                         .withTimeout(Duration.ofDays(1)))) {
 533 
 534             Subtask<String> subtask = scope.fork(() -> {
 535                 Thread.sleep(Duration.ofSeconds(1));
 536                 return "foo";
 537             });
 538 
 539             scope.join();
 540 
 541             assertFalse(scope.isCancelled());
 542             assertEquals("foo", subtask.get());
 543         }
 544     }
 545 
 546     /**
 547      * Test join with timeout, timeout expires before subtasks finish.
 548      */
 549     @ParameterizedTest
 550     @MethodSource("factories")
 551     void testJoinWithTimeout2(ThreadFactory factory) throws Exception {
 552         long startMillis = millisTime();
 553         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 554                 cf -> cf.withThreadFactory(factory)
 555                         .withTimeout(Duration.ofSeconds(2)))) {
 556 
 557             Subtask<Void> subtask = scope.fork(() -> {
 558                 Thread.sleep(Duration.ofDays(1));
 559                 return null;
 560             });
 561 
 562             assertThrows(TimeoutException.class, scope::join);
 563             expectDuration(startMillis, /*min*/1900, /*max*/20_000);
 564 
 565             assertTrue(scope.isCancelled());
 566             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 567         }
 568     }
 569 
 570     /**
 571      * Test join with timeout that has already expired.
 572      */
 573     @ParameterizedTest
 574     @MethodSource("factories")
 575     void testJoinWithTimeout3(ThreadFactory factory) throws Exception {
 576         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 577                 cf -> cf.withThreadFactory(factory)
 578                         .withTimeout(Duration.ofSeconds(-1)))) {
 579 
 580             Subtask<Void> subtask = scope.fork(() -> {
 581                 Thread.sleep(Duration.ofDays(1));
 582                 return null;
 583             });
 584 
 585             assertThrows(TimeoutException.class, scope::join);
 586 
 587             assertTrue(scope.isCancelled());
 588             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 589         }
 590     }
 591 
 592     /**
 593      * Test that cancelling execution interrupts unfinished threads. This test uses
 594      * a custom Joiner to cancel execution.
 595      */
 596     @ParameterizedTest
 597     @MethodSource("factories")
 598     void testCancelInterruptsThreads2(ThreadFactory factory) throws Exception {
 599         var testJoiner = new CancelAfterOneJoiner<String>();
 600 
 601         try (var scope = StructuredTaskScope.open(testJoiner,
 602                 cf -> cf.withThreadFactory(factory))) {
 603 
 604             // fork subtask1 that runs for a long time
 605             var started = new CountDownLatch(1);
 606             var interrupted = new CountDownLatch(1);
 607             var subtask1 = scope.fork(() -> {
 608                 started.countDown();
 609                 try {
 610                     Thread.sleep(Duration.ofDays(1));
 611                 } catch (InterruptedException e) {
 612                     interrupted.countDown();
 613                 }
 614             });
 615             started.await();
 616 
 617             // fork subtask2, the scope should be cancelled when the subtask completes
 618             var subtask2 = scope.fork(() -> "bar");
 619             while (!scope.isCancelled()) {
 620                 Thread.sleep(20);
 621             }
 622 
 623             // subtask1 should be interrupted
 624             interrupted.await();
 625 
 626             scope.join();
 627             assertEquals(Subtask.State.UNAVAILABLE, subtask1.state());
 628             assertEquals("bar", subtask2.get());
 629         }
 630     }
 631 
 632     /**
 633      * Test that timeout interrupts unfinished threads.
 634      */
 635     @ParameterizedTest
 636     @MethodSource("factories")
 637     void testTimeoutInterruptsThreads(ThreadFactory factory) throws Exception {
 638         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 639                 cf -> cf.withThreadFactory(factory)
 640                         .withTimeout(Duration.ofSeconds(2)))) {
 641 
 642             var started = new AtomicBoolean();
 643             var interrupted = new CountDownLatch(1);
 644             Subtask<Void> subtask = scope.fork(() -> {
 645                 started.set(true);
 646                 try {
 647                     Thread.sleep(Duration.ofDays(1));
 648                 } catch (InterruptedException e) {
 649                     interrupted.countDown();
 650                 }
 651                 return null;
 652             });
 653 
 654             while (!scope.isCancelled()) {
 655                 Thread.sleep(50);
 656             }
 657 
 658             // if subtask started then it should be interrupted
 659             if (started.get()) {
 660                 interrupted.await();
 661             }
 662 
 663             assertThrows(TimeoutException.class, scope::join);
 664 
 665             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 666         }
 667     }
 668 
 669     /**
 670      * Test close without join, no subtasks forked.
 671      */
 672     @Test
 673     void testCloseWithoutJoin1() {
 674         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
 675             // do nothing
 676         }
 677     }
 678 
 679     /**
 680      * Test close without join, subtasks forked.
 681      */
 682     @ParameterizedTest
 683     @MethodSource("factories")
 684     void testCloseWithoutJoin2(ThreadFactory factory) {
 685         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 686                 cf -> cf.withThreadFactory(factory))) {
 687             Subtask<String> subtask = scope.fork(() -> {
 688                 Thread.sleep(Duration.ofDays(1));
 689                 return null;
 690             });
 691 
 692             // first call to close should throw
 693             assertThrows(IllegalStateException.class, scope::close);
 694 
 695             // subsequent calls to close should not throw
 696             for (int i = 0; i < 3; i++) {
 697                 scope.close();
 698             }
 699 
 700             // subtask result/exception not available
 701             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 702             assertThrows(IllegalStateException.class, subtask::get);
 703             assertThrows(IllegalStateException.class, subtask::exception);
 704         }
 705     }
 706 
 707     /**
 708      * Test close after join throws. Close should not throw as join attempted.
 709      */
 710     @ParameterizedTest
 711     @MethodSource("factories")
 712     void testCloseAfterJoinThrows(ThreadFactory factory) throws Exception {
 713         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 714                 cf -> cf.withThreadFactory(factory))) {
 715             var subtask = scope.fork(() -> {
 716                 Thread.sleep(Duration.ofDays(1));
 717                 return null;
 718             });
 719 
 720             // join throws
 721             Thread.currentThread().interrupt();
 722             assertThrows(InterruptedException.class, scope::join);
 723             assertThrows(IllegalStateException.class, subtask::get);
 724 
 725         }  // close should not throw
 726     }
 727 
 728     /**
 729      * Test close method is owner confined.
 730      */
 731     @ParameterizedTest
 732     @MethodSource("factories")
 733     void testCloseConfined(ThreadFactory factory) throws Exception {
 734         try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
 735                 cf -> cf.withThreadFactory(factory))) {
 736 
 737             // random thread cannot close scope
 738             try (var pool = Executors.newCachedThreadPool(factory)) {
 739                 Future<Boolean> future = pool.submit(() -> {
 740                     assertThrows(WrongThreadException.class, scope::close);
 741                     return null;
 742                 });
 743                 future.get();
 744             }
 745 
 746             // subtask cannot close
 747             Subtask<Boolean> subtask = scope.fork(() -> {
 748                 assertThrows(WrongThreadException.class, scope::close);
 749                 return true;
 750             });
 751             scope.join();
 752             assertTrue(subtask.get());
 753         }
 754     }
 755 
 756     /**
 757      * Test close with interrupt status set.
 758      */
 759     @ParameterizedTest
 760     @MethodSource("factories")
 761     void testInterruptClose1(ThreadFactory factory) throws Exception {
 762         var testJoiner = new CancelAfterOneJoiner<String>();
 763         try (var scope = StructuredTaskScope.open(testJoiner,
 764                 cf -> cf.withThreadFactory(factory))) {
 765 
 766             // fork first subtask, a straggler as it continues after being interrupted
 767             var started = new CountDownLatch(1);
 768             var done = new AtomicBoolean();
 769             scope.fork(() -> {
 770                 started.countDown();
 771                 try {
 772                     Thread.sleep(Duration.ofDays(1));
 773                 } catch (InterruptedException e) {
 774                     // interrupted by cancel, expected
 775                 }
 776                 Thread.sleep(Duration.ofMillis(100)); // force close to wait
 777                 done.set(true);
 778                 return null;
 779             });
 780             started.await();
 781 
 782             // fork second subtask, the scope should be cancelled when this subtask completes
 783             scope.fork(() -> "bar");
 784             while (!scope.isCancelled()) {
 785                 Thread.sleep(20);
 786             }
 787 
 788             scope.join();
 789 
 790             // invoke close with interrupt status set
 791             Thread.currentThread().interrupt();
 792             try {
 793                 scope.close();
 794             } finally {
 795                 assertTrue(Thread.interrupted());   // clear interrupt status
 796                 assertTrue(done.get());
 797             }
 798         }
 799     }
 800 
 801     /**
 802      * Test interrupting thread waiting in close.
 803      */
 804     @ParameterizedTest
 805     @MethodSource("factories")
 806     void testInterruptClose2(ThreadFactory factory) throws Exception {
 807         var testJoiner = new CancelAfterOneJoiner<String>();
 808         try (var scope = StructuredTaskScope.open(testJoiner,
 809                 cf -> cf.withThreadFactory(factory))) {
 810 
 811             Thread mainThread = Thread.currentThread();
 812 
 813             // fork first subtask, a straggler as it continues after being interrupted
 814             var started = new CountDownLatch(1);
 815             var done = new AtomicBoolean();
 816             scope.fork(() -> {
 817                 started.countDown();
 818                 try {
 819                     Thread.sleep(Duration.ofDays(1));
 820                 } catch (InterruptedException e) {
 821                     // interrupted by cancel, expected
 822                 }
 823 
 824                 // interrupt main thread when it blocks in close
 825                 interruptThreadAt(mainThread, "java.util.concurrent.StructuredTaskScopeImpl.close");
 826 
 827                 Thread.sleep(Duration.ofMillis(100)); // force close to wait
 828                 done.set(true);
 829                 return null;
 830             });
 831             started.await();
 832 
 833             // fork second subtask, the scope should be cancelled when this subtask completes
 834             scope.fork(() -> "bar");
 835             while (!scope.isCancelled()) {
 836                 Thread.sleep(20);
 837             }
 838 
 839             scope.join();
 840 
 841             // main thread will be interrupted while blocked in close
 842             try {
 843                 scope.close();
 844             } finally {
 845                 assertTrue(Thread.interrupted());   // clear interrupt status
 846                 assertTrue(done.get());
 847             }
 848         }
 849     }
 850 
 851     /**
 852      * Test that closing an enclosing scope closes the thread flock of a nested scope.
 853      */
 854     @Test
 855     void testCloseThrowsStructureViolation() throws Exception {
 856         try (var scope1 = StructuredTaskScope.open(Joiner.awaitAll())) {
 857             try (var scope2 = StructuredTaskScope.open(Joiner.awaitAll())) {
 858 
 859                 // close enclosing scope
 860                 try {
 861                     scope1.close();
 862                     fail("close did not throw");
 863                 } catch (StructureViolationException expected) { }
 864 
 865                 // underlying flock should be closed
 866                 var executed = new AtomicBoolean();
 867                 Subtask<?> subtask = scope2.fork(() -> executed.set(true));
 868                 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 869                 scope2.join();
 870                 assertFalse(executed.get());
 871                 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 872             }
 873         }
 874     }
 875 
 876     /**
 877      * Test that isCancelled returns true after close.
 878      */
 879     @Test
 880     void testIsCancelledAfterClose() throws Exception {
 881         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
 882             assertFalse(scope.isCancelled());
 883             scope.close();
 884             assertTrue(scope.isCancelled());
 885         }
 886     }
 887 
 888     /**
 889      * Test Joiner.onFork throwing exception.
 890      */
 891     @Test
 892     void testOnForkThrows() throws Exception {
 893         var joiner = new Joiner<String, Void>() {
 894             @Override
 895             public boolean onFork(Subtask<? extends String> subtask) {
 896                 throw new FooException();
 897             }
 898             @Override
 899             public Void result() {
 900                 return null;
 901             }
 902         };
 903         try (var scope = StructuredTaskScope.open(joiner)) {
 904             assertThrows(FooException.class, () -> scope.fork(() -> "foo"));
 905         }
 906     }
 907 
 908     /**
 909      * Test Joiner.onFork returning true to cancel execution.
 910      */
 911     @Test
 912     void testOnForkCancelsExecution() throws Exception {
 913         var joiner = new Joiner<String, Void>() {
 914             @Override
 915             public boolean onFork(Subtask<? extends String> subtask) {
 916                 return true;
 917             }
 918             @Override
 919             public Void result() {
 920                 return null;
 921             }
 922         };
 923         try (var scope = StructuredTaskScope.open(joiner)) {
 924             assertFalse(scope.isCancelled());
 925             scope.fork(() -> "foo");
 926             assertTrue(scope.isCancelled());
 927             scope.join();
 928         }
 929     }
 930 
 931     /**
 932      * Test Joiner.onComplete throwing exception causes UHE to be invoked.
 933      */
 934     @Test
 935     void testOnCompleteThrows() throws Exception {
 936         var joiner = new Joiner<String, Void>() {
 937             @Override
 938             public boolean onComplete(Subtask<? extends String> subtask) {
 939                 throw new FooException();
 940             }
 941             @Override
 942             public Void result() {
 943                 return null;
 944             }
 945         };
 946         var excRef = new AtomicReference<Throwable>();
 947         Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e);
 948         ThreadFactory factory = Thread.ofVirtual()
 949                 .uncaughtExceptionHandler(uhe)
 950                 .factory();
 951         try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
 952             scope.fork(() -> "foo");
 953             scope.join();
 954             assertInstanceOf(FooException.class, excRef.get());
 955         }
 956     }
 957 
 958     /**
 959      * Test Joiner.onComplete returning true to cancel execution.
 960      */
 961     @Test
 962     void testOnCompleteCancelsExecution() throws Exception {
 963         var joiner = new Joiner<String, Void>() {
 964             @Override
 965             public boolean onComplete(Subtask<? extends String> subtask) {
 966                 return true;
 967             }
 968             @Override
 969             public Void result() {
 970                 return null;
 971             }
 972         };
 973         try (var scope = StructuredTaskScope.open(joiner)) {
 974             assertFalse(scope.isCancelled());
 975             scope.fork(() -> "foo");
 976             while (!scope.isCancelled()) {
 977                 Thread.sleep(10);
 978             }
 979             scope.join();
 980         }
 981     }
 982 
 983     /**
 984      * Test toString.
 985      */
 986     @Test
 987     void testToString() throws Exception {
 988         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
 989                 cf -> cf.withName("duke"))) {
 990 
 991             // open
 992             assertTrue(scope.toString().contains("duke"));
 993 
 994             // closed
 995             scope.close();
 996             assertTrue(scope.toString().contains("duke"));
 997         }
 998     }
 999 
1000     /**
1001      * Test Subtask with task that completes successfully.
1002      */
1003     @ParameterizedTest
1004     @MethodSource("factories")
1005     void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception {
1006         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1007                 cf -> cf.withThreadFactory(factory))) {
1008 
1009             Subtask<String> subtask = scope.fork(() -> "foo");
1010 
1011             // before join
1012             assertThrows(IllegalStateException.class, subtask::get);
1013             assertThrows(IllegalStateException.class, subtask::exception);
1014 
1015             scope.join();
1016 
1017             // after join
1018             assertEquals(Subtask.State.SUCCESS, subtask.state());
1019             assertEquals("foo", subtask.get());
1020             assertThrows(IllegalStateException.class, subtask::exception);
1021         }
1022     }
1023 
1024     /**
1025      * Test Subtask with task that fails.
1026      */
1027     @ParameterizedTest
1028     @MethodSource("factories")
1029     void testSubtaskWhenFailed(ThreadFactory factory) throws Exception {
1030         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1031                 cf -> cf.withThreadFactory(factory))) {
1032 
1033             Subtask<String> subtask = scope.fork(() -> { throw new FooException(); });
1034 
1035             // before join
1036             assertThrows(IllegalStateException.class, subtask::get);
1037             assertThrows(IllegalStateException.class, subtask::exception);
1038 
1039             scope.join();
1040 
1041             // after join
1042             assertEquals(Subtask.State.FAILED, subtask.state());
1043             assertThrows(IllegalStateException.class, subtask::get);
1044             assertTrue(subtask.exception() instanceof FooException);
1045         }
1046     }
1047 
1048     /**
1049      * Test Subtask with a task that has not completed.
1050      */
1051     @ParameterizedTest
1052     @MethodSource("factories")
1053     void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception {
1054         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
1055                 cf -> cf.withThreadFactory(factory))) {
1056             Subtask<Void> subtask = scope.fork(() -> {
1057                 Thread.sleep(Duration.ofDays(1));
1058                 return null;
1059             });
1060 
1061             // before join
1062             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1063             assertThrows(IllegalStateException.class, subtask::get);
1064             assertThrows(IllegalStateException.class, subtask::exception);
1065 
1066             // attempt join, join throws
1067             Thread.currentThread().interrupt();
1068             assertThrows(InterruptedException.class, scope::join);
1069 
1070             // after join
1071             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1072             assertThrows(IllegalStateException.class, subtask::get);
1073             assertThrows(IllegalStateException.class, subtask::exception);
1074         }
1075     }
1076 
1077     /**
1078      * Test Subtask forked after execution cancelled.
1079      */
1080     @ParameterizedTest
1081     @MethodSource("factories")
1082     void testSubtaskWhenCancelled(ThreadFactory factory) throws Exception {
1083         try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) {
1084             scope.fork(() -> "foo");
1085             while (!scope.isCancelled()) {
1086                 Thread.sleep(20);
1087             }
1088 
1089             var subtask = scope.fork(() -> "foo");
1090 
1091             // before join
1092             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1093             assertThrows(IllegalStateException.class, subtask::get);
1094             assertThrows(IllegalStateException.class, subtask::exception);
1095 
1096             scope.join();
1097 
1098             // after join
1099             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1100             assertThrows(IllegalStateException.class, subtask::get);
1101             assertThrows(IllegalStateException.class, subtask::exception);
1102         }
1103     }
1104 
1105     /**
1106      * Test Subtask::toString.
1107      */
1108     @Test
1109     void testSubtaskToString() throws Exception {
1110         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1111             var latch = new CountDownLatch(1);
1112             var subtask1 = scope.fork(() -> {
1113                 latch.await();
1114                 return "foo";
1115             });
1116             var subtask2 = scope.fork(() -> { throw new FooException(); });
1117 
1118             // subtask1 result is unavailable
1119             assertTrue(subtask1.toString().contains("Unavailable"));
1120             latch.countDown();
1121 
1122             scope.join();
1123 
1124             assertTrue(subtask1.toString().contains("Completed successfully"));
1125             assertTrue(subtask2.toString().contains("Failed"));
1126         }
1127     }
1128 
1129     /**
1130      * Test Joiner.allSuccessfulOrThrow() with no subtasks.
1131      */
1132     @Test
1133     void testAllSuccessfulOrThrow1() throws Throwable {
1134         try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow())) {
1135             var subtasks = scope.join().toList();
1136             assertTrue(subtasks.isEmpty());
1137         }
1138     }
1139 
1140     /**
1141      * Test Joiner.allSuccessfulOrThrow() with subtasks that complete successfully.
1142      */
1143     @ParameterizedTest
1144     @MethodSource("factories")
1145     void testAllSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
1146         try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1147                 cf -> cf.withThreadFactory(factory))) {
1148             var subtask1 = scope.fork(() -> "foo");
1149             var subtask2 = scope.fork(() -> "bar");
1150             var subtasks = scope.join().toList();
1151             assertEquals(List.of(subtask1, subtask2), subtasks);
1152             assertEquals("foo", subtask1.get());
1153             assertEquals("bar", subtask2.get());
1154         }
1155     }
1156 
1157     /**
1158      * Test Joiner.allSuccessfulOrThrow() with a subtask that complete successfully and
1159      * a subtask that fails.
1160      */
1161     @ParameterizedTest
1162     @MethodSource("factories")
1163     void testAllSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1164         try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
1165                 cf -> cf.withThreadFactory(factory))) {
1166             scope.fork(() -> "foo");
1167             scope.fork(() -> { throw new FooException(); });
1168             try {
1169                 scope.join();
1170             } catch (FailedException e) {
1171                 assertTrue(e.getCause() instanceof FooException);
1172             }
1173         }
1174     }
1175 
1176     /**
1177      * Test Joiner.anySuccessfulResultOrThrow() with no subtasks.
1178      */
1179     @Test
1180     void testAnySuccessfulResultOrThrow1() throws Exception {
1181         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
1182             try {
1183                 scope.join();
1184             } catch (FailedException e) {
1185                 assertTrue(e.getCause() instanceof NoSuchElementException);
1186             }
1187         }
1188     }
1189 
1190     /**
1191      * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully.
1192      */
1193     @ParameterizedTest
1194     @MethodSource("factories")
1195     void testAnySuccessfulResultOrThrow2(ThreadFactory factory) throws Exception {
1196         try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
1197                 cf -> cf.withThreadFactory(factory))) {
1198             scope.fork(() -> "foo");
1199             String result = scope.join();
1200             assertEquals("foo", result);
1201         }
1202     }
1203 
1204     /**
1205      * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully
1206      * with a null result.
1207      */
1208     @ParameterizedTest
1209     @MethodSource("factories")
1210     void testAnySuccessfulResultOrThrow3(ThreadFactory factory) throws Exception {
1211         try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
1212                 cf -> cf.withThreadFactory(factory))) {
1213             scope.fork(() -> null);
1214             String result = scope.join();
1215             assertNull(result);
1216         }
1217     }
1218 
1219     /**
1220      * Test Joiner.anySuccessfulResultOrThrow() with a subtask that complete succcessfully
1221      * and a subtask that fails.
1222      */
1223     @ParameterizedTest
1224     @MethodSource("factories")
1225     void testAnySuccessfulResultOrThrow4(ThreadFactory factory) throws Exception {
1226         try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
1227                 cf -> cf.withThreadFactory(factory))) {
1228             scope.fork(() -> "foo");
1229             scope.fork(() -> { throw new FooException(); });
1230             String first = scope.join();
1231             assertEquals("foo", first);
1232         }
1233     }
1234 
1235     /**
1236      * Test Joiner.anySuccessfulResultOrThrow() with a subtask that fails.
1237      */
1238     @ParameterizedTest
1239     @MethodSource("factories")
1240     void testAnySuccessfulResultOrThrow5(ThreadFactory factory) throws Exception {
1241         try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
1242                 cf -> cf.withThreadFactory(factory))) {
1243             scope.fork(() -> { throw new FooException(); });
1244             Throwable ex = assertThrows(FailedException.class, scope::join);
1245             assertTrue(ex.getCause() instanceof FooException);
1246         }
1247     }
1248 
1249     /**
1250      * Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks.
1251      */
1252     @Test
1253     void testAwaitSuccessfulOrThrow1() throws Throwable {
1254         try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {
1255             var result = scope.join();
1256             assertNull(result);
1257         }
1258     }
1259 
1260     /**
1261      * Test Joiner.awaitAllSuccessfulOrThrow() with subtasks that complete successfully.
1262      */
1263     @ParameterizedTest
1264     @MethodSource("factories")
1265     void testAwaitSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
1266         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1267                 cf -> cf.withThreadFactory(factory))) {
1268             var subtask1 = scope.fork(() -> "foo");
1269             var subtask2 = scope.fork(() -> "bar");
1270             var result = scope.join();
1271             assertNull(result);
1272             assertEquals("foo", subtask1.get());
1273             assertEquals("bar", subtask2.get());
1274         }
1275     }
1276 
1277     /**
1278      * Test Joiner.awaitAllSuccessfulOrThrow() with a subtask that complete successfully and
1279      * a subtask that fails.
1280      */
1281     @ParameterizedTest
1282     @MethodSource("factories")
1283     void testAwaitSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
1284         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
1285                 cf -> cf.withThreadFactory(factory))) {
1286             scope.fork(() -> "foo");
1287             scope.fork(() -> { throw new FooException(); });
1288             try {
1289                 scope.join();
1290             } catch (FailedException e) {
1291                 assertTrue(e.getCause() instanceof FooException);
1292             }
1293         }
1294     }
1295 
1296     /**
1297      * Test Joiner.awaitAll() with no subtasks.
1298      */
1299     @Test
1300     void testAwaitAll1() throws Throwable {
1301         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1302             var result = scope.join();
1303             assertNull(result);
1304         }
1305     }
1306 
1307     /**
1308      * Test Joiner.awaitAll() with subtasks that complete successfully.
1309      */
1310     @ParameterizedTest
1311     @MethodSource("factories")
1312     void testAwaitAll2(ThreadFactory factory) throws Throwable {
1313         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1314                 cf -> cf.withThreadFactory(factory))) {
1315             var subtask1 = scope.fork(() -> "foo");
1316             var subtask2 = scope.fork(() -> "bar");
1317             var result = scope.join();
1318             assertNull(result);
1319             assertEquals("foo", subtask1.get());
1320             assertEquals("bar", subtask2.get());
1321         }
1322     }
1323 
1324     /**
1325      * Test Joiner.awaitAll() with a subtask that complete successfully and a subtask
1326      * that fails.
1327      */
1328     @ParameterizedTest
1329     @MethodSource("factories")
1330     void testAwaitAll3(ThreadFactory factory) throws Throwable {
1331         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
1332                 cf -> cf.withThreadFactory(factory))) {
1333             var subtask1 = scope.fork(() -> "foo");
1334             var subtask2 = scope.fork(() -> { throw new FooException(); });
1335             var result = scope.join();
1336             assertNull(result);
1337             assertEquals("foo", subtask1.get());
1338             assertTrue(subtask2.exception() instanceof FooException);
1339         }
1340     }
1341 
1342     /**
1343      * Test Joiner.allUntil(Predicate) with no subtasks.
1344      */
1345     @Test
1346     void testAllUntil1() throws Throwable {
1347         try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) {
1348             var subtasks = scope.join();
1349             assertEquals(0, subtasks.count());
1350         }
1351     }
1352 
1353     /**
1354      * Test Joiner.allUntil(Predicate) with no cancellation.
1355      */
1356     @ParameterizedTest
1357     @MethodSource("factories")
1358     void testAllUntil2(ThreadFactory factory) throws Exception {
1359         try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false),
1360                 cf -> cf.withThreadFactory(factory))) {
1361 
1362             var subtask1 = scope.fork(() -> "foo");
1363             var subtask2 = scope.fork(() -> { throw new FooException(); });
1364 
1365             var subtasks = scope.join().toList();
1366             assertEquals(2, subtasks.size());
1367 
1368             assertTrue(subtasks.get(0) == subtask1);
1369             assertTrue(subtasks.get(1) == subtask2);
1370             assertEquals("foo", subtask1.get());
1371             assertTrue(subtask2.exception() instanceof FooException);
1372         }
1373     }
1374 
1375     /**
1376      * Test Joiner.allUntil(Predicate) with cancellation after one subtask completes.
1377      */
1378     @ParameterizedTest
1379     @MethodSource("factories")
1380     void testAllUntil3(ThreadFactory factory) throws Exception {
1381         try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> true),
1382                 cf -> cf.withThreadFactory(factory))) {
1383 
1384             var subtask1 = scope.fork(() -> "foo");
1385             var subtask2 = scope.fork(() -> {
1386                 Thread.sleep(Duration.ofDays(1));
1387                 return "bar";
1388             });
1389 
1390             var subtasks = scope.join().toList();
1391 
1392             assertEquals(2, subtasks.size());
1393             assertTrue(subtasks.get(0) == subtask1);
1394             assertTrue(subtasks.get(1) == subtask2);
1395             assertEquals("foo", subtask1.get());
1396             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1397         }
1398     }
1399 
1400     /**
1401      * Test Joiner.allUntil(Predicate) with cancellation after serveral subtasks complete.
1402      */
1403     @ParameterizedTest
1404     @MethodSource("factories")
1405     void testAllUntil4(ThreadFactory factory) throws Exception {
1406 
1407         // cancel execution after two or more failures
1408         class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> {
1409             final AtomicInteger failedCount = new AtomicInteger();
1410             @Override
1411             public boolean test(Subtask<? extends T> subtask) {
1412                 return subtask.state() == Subtask.State.FAILED
1413                         && failedCount.incrementAndGet() >= 2;
1414             }
1415         }
1416         var joiner = Joiner.allUntil(new CancelAfterTwoFailures<String>());
1417 
1418         try (var scope = StructuredTaskScope.open(joiner)) {
1419             int forkCount = 0;
1420 
1421             // fork subtasks until execution cancelled
1422             while (!scope.isCancelled()) {
1423                 scope.fork(() -> "foo");
1424                 scope.fork(() -> { throw new FooException(); });
1425                 forkCount += 2;
1426                 Thread.sleep(Duration.ofMillis(10));
1427             }
1428 
1429             var subtasks = scope.join().toList();
1430             assertEquals(forkCount, subtasks.size());
1431 
1432             long failedCount = subtasks.stream()
1433                     .filter(s -> s.state() == Subtask.State.FAILED)
1434                     .count();
1435             assertTrue(failedCount >= 2);
1436         }
1437     }
1438 
1439     /**
1440      * Test Test Joiner.allUntil(Predicate) where the Predicate's test method throws.
1441      */
1442     @Test
1443     void testAllUntil5() throws Exception {
1444         var joiner = Joiner.allUntil(_ -> { throw new FooException(); });
1445         var excRef = new AtomicReference<Throwable>();
1446         Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e);
1447         ThreadFactory factory = Thread.ofVirtual()
1448                 .uncaughtExceptionHandler(uhe)
1449                 .factory();
1450         try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
1451             scope.fork(() -> "foo");
1452             scope.join();
1453             assertInstanceOf(FooException.class, excRef.get());
1454         }
1455     }
1456 
1457     /**
1458      * Test Joiner default methods.
1459      */
1460     @Test
1461     void testJoinerDefaultMethods() throws Exception {
1462         try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) {
1463 
1464             // need subtasks to test default methods
1465             var subtask1 = scope.fork(() -> "foo");
1466             while (!scope.isCancelled()) {
1467                 Thread.sleep(20);
1468             }
1469             var subtask2 = scope.fork(() -> "bar");
1470             scope.join();
1471 
1472             assertEquals(Subtask.State.SUCCESS, subtask1.state());
1473             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1474 
1475             // Joiner that does not override default methods
1476             Joiner<Object, Void> joiner = () -> null;
1477             assertThrows(NullPointerException.class, () -> joiner.onFork(null));
1478             assertThrows(NullPointerException.class, () -> joiner.onComplete(null));
1479             assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1));
1480             assertFalse(joiner.onFork(subtask2));
1481             assertFalse(joiner.onComplete(subtask1));
1482             assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2));
1483         }
1484     }
1485 
1486     /**
1487      * Test Joiners onFork/onComplete methods with a subtask in an unexpected state.
1488      */
1489     @Test
1490     void testJoinersWithUnavailableResukt() throws Exception {
1491         try (var scope = StructuredTaskScope.open()) {
1492             var done = new CountDownLatch(1);
1493             var subtask = scope.fork(() -> {
1494                 done.await();
1495                 return null;
1496             });
1497 
1498             // onComplete with uncompleted task should throw IAE
1499             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1500             assertThrows(IllegalArgumentException.class,
1501                     () -> Joiner.allSuccessfulOrThrow().onComplete(subtask));
1502             assertThrows(IllegalArgumentException.class,
1503                     () -> Joiner.anySuccessfulResultOrThrow().onComplete(subtask));
1504             assertThrows(IllegalArgumentException.class,
1505                     () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(subtask));
1506             assertThrows(IllegalArgumentException.class,
1507                     () -> Joiner.awaitAll().onComplete(subtask));
1508             assertThrows(IllegalArgumentException.class,
1509                     () -> Joiner.allUntil(_ -> false).onComplete(subtask));
1510 
1511             done.countDown();
1512             scope.join();
1513 
1514             // onFork with completed task should throw IAE
1515             assertEquals(Subtask.State.SUCCESS, subtask.state());
1516             assertThrows(IllegalArgumentException.class,
1517                     () -> Joiner.allSuccessfulOrThrow().onFork(subtask));
1518             assertThrows(IllegalArgumentException.class,
1519                     () -> Joiner.anySuccessfulResultOrThrow().onFork(subtask));
1520             assertThrows(IllegalArgumentException.class,
1521                     () -> Joiner.awaitAllSuccessfulOrThrow().onFork(subtask));
1522             assertThrows(IllegalArgumentException.class,
1523                     () -> Joiner.awaitAll().onFork(subtask));
1524             assertThrows(IllegalArgumentException.class,
1525                     () -> Joiner.allUntil(_ -> false).onFork(subtask));
1526         }
1527 
1528     }
1529 
1530     /**
1531      * Test the Config function apply method throwing an exception.
1532      */
1533     @Test
1534     void testConfigFunctionThrows() throws Exception {
1535         assertThrows(FooException.class,
1536                 () -> StructuredTaskScope.open(Joiner.awaitAll(),
1537                                                cf -> { throw new FooException(); }));
1538     }
1539 
1540     /**
1541      * Test Config equals/hashCode/toString
1542      */
1543     @Test
1544     void testConfigMethods() throws Exception {
1545         Function<Config, Config> testConfig = cf -> {
1546             var name = "duke";
1547             var threadFactory = Thread.ofPlatform().factory();
1548             var timeout = Duration.ofSeconds(10);
1549 
1550             assertEquals(cf, cf);
1551             assertEquals(cf.withName(name), cf.withName(name));
1552             assertEquals(cf.withThreadFactory(threadFactory), cf.withThreadFactory(threadFactory));
1553             assertEquals(cf.withTimeout(timeout), cf.withTimeout(timeout));
1554 
1555             assertNotEquals(cf, cf.withName(name));
1556             assertNotEquals(cf, cf.withThreadFactory(threadFactory));
1557             assertNotEquals(cf, cf.withTimeout(timeout));
1558 
1559             assertEquals(cf.withName(name).hashCode(), cf.withName(name).hashCode());
1560             assertEquals(cf.withThreadFactory(threadFactory).hashCode(),
1561                     cf.withThreadFactory(threadFactory).hashCode());
1562             assertEquals(cf.withTimeout(timeout).hashCode(), cf.withTimeout(timeout).hashCode());
1563 
1564             assertTrue(cf.withName(name).toString().contains(name));
1565             assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
1566             assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
1567 
1568             return cf;
1569         };
1570         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), testConfig)) {
1571             // do nothing
1572         }
1573     }
1574 
1575     /**
1576      * Test for NullPointerException.
1577      */
1578     @Test
1579     void testNulls() throws Exception {
1580         assertThrows(NullPointerException.class,
1581                 () -> StructuredTaskScope.open(null));
1582         assertThrows(NullPointerException.class,
1583                 () -> StructuredTaskScope.open(null, cf -> cf));
1584         assertThrows(NullPointerException.class,
1585                 () -> StructuredTaskScope.open(Joiner.awaitAll(), null));
1586 
1587         assertThrows(NullPointerException.class, () -> Joiner.allUntil(null));
1588 
1589         // fork
1590         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1591             assertThrows(NullPointerException.class, () -> scope.fork((Callable<Object>) null));
1592             assertThrows(NullPointerException.class, () -> scope.fork((Runnable) null));
1593         }
1594 
1595         // Config and withXXX methods
1596         assertThrows(NullPointerException.class,
1597                 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> null));
1598         assertThrows(NullPointerException.class,
1599                 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withName(null)));
1600         assertThrows(NullPointerException.class,
1601                 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withThreadFactory(null)));
1602         assertThrows(NullPointerException.class,
1603                 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withTimeout(null)));
1604 
1605         // Joiner.onFork/onComplete
1606         assertThrows(NullPointerException.class,
1607                 () -> Joiner.awaitAllSuccessfulOrThrow().onFork(null));
1608         assertThrows(NullPointerException.class,
1609                 () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(null));
1610         assertThrows(NullPointerException.class,
1611                 () -> Joiner.awaitAll().onFork(null));
1612         assertThrows(NullPointerException.class,
1613                 () -> Joiner.awaitAll().onComplete(null));
1614         assertThrows(NullPointerException.class,
1615                 () -> Joiner.allSuccessfulOrThrow().onFork(null));
1616         assertThrows(NullPointerException.class,
1617                 () -> Joiner.allSuccessfulOrThrow().onComplete(null));
1618         assertThrows(NullPointerException.class,
1619                 () -> Joiner.anySuccessfulResultOrThrow().onFork(null));
1620         assertThrows(NullPointerException.class,
1621                 () -> Joiner.anySuccessfulResultOrThrow().onComplete(null));
1622     }
1623 
1624     /**
1625      * ThreadFactory that counts usage.
1626      */
1627     private static class CountingThreadFactory implements ThreadFactory {
1628         final ThreadFactory delegate;
1629         final AtomicInteger threadCount = new AtomicInteger();
1630         CountingThreadFactory(ThreadFactory delegate) {
1631             this.delegate = delegate;
1632         }
1633         @Override
1634         public Thread newThread(Runnable task) {
1635             threadCount.incrementAndGet();
1636             return delegate.newThread(task);
1637         }
1638         int threadCount() {
1639             return threadCount.get();
1640         }
1641     }
1642 
1643     /**
1644      * A joiner that counts that counts the number of subtasks that are forked and the
1645      * number of subtasks that complete.
1646      */
1647     private static class CountingJoiner<T> implements Joiner<T, Void> {
1648         final AtomicInteger onForkCount = new AtomicInteger();
1649         final AtomicInteger onCompleteCount = new AtomicInteger();
1650         @Override
1651         public boolean onFork(Subtask<? extends T> subtask) {
1652             onForkCount.incrementAndGet();
1653             return false;
1654         }
1655         @Override
1656         public boolean onComplete(Subtask<? extends T> subtask) {
1657             onCompleteCount.incrementAndGet();
1658             return false;
1659         }
1660         @Override
1661         public Void result() {
1662             return null;
1663         }
1664         int onForkCount() {
1665             return onForkCount.get();
1666         }
1667         int onCompleteCount() {
1668             return onCompleteCount.get();
1669         }
1670     }
1671 
1672     /**
1673      * A joiner that cancels execution when a subtask completes. It also keeps a count
1674      * of the number of subtasks that are forked and the number of subtasks that complete.
1675      */
1676     private static class CancelAfterOneJoiner<T> implements Joiner<T, Void> {
1677         final AtomicInteger onForkCount = new AtomicInteger();
1678         final AtomicInteger onCompleteCount = new AtomicInteger();
1679         @Override
1680         public boolean onFork(Subtask<? extends T> subtask) {
1681             onForkCount.incrementAndGet();
1682             return false;
1683         }
1684         @Override
1685         public boolean onComplete(Subtask<? extends T> subtask) {
1686             onCompleteCount.incrementAndGet();
1687             return true;
1688         }
1689         @Override
1690         public Void result() {
1691             return null;
1692         }
1693         int onForkCount() {
1694             return onForkCount.get();
1695         }
1696         int onCompleteCount() {
1697             return onCompleteCount.get();
1698         }
1699     }
1700 
1701     /**
1702      * A runtime exception for tests.
1703      */
1704     private static class FooException extends RuntimeException {
1705         FooException() { }
1706         FooException(Throwable cause) { super(cause); }
1707     }
1708 
1709     /**
1710      * Returns the current time in milliseconds.
1711      */
1712     private long millisTime() {
1713         long now = System.nanoTime();
1714         return TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);
1715     }
1716 
1717     /**
1718      * Check the duration of a task
1719      * @param start start time, in milliseconds
1720      * @param min minimum expected duration, in milliseconds
1721      * @param max maximum expected duration, in milliseconds
1722      * @return the duration (now - start), in milliseconds
1723      */
1724     private long expectDuration(long start, long min, long max) {
1725         long duration = millisTime() - start;
1726         assertTrue(duration >= min,
1727                 "Duration " + duration + "ms, expected >= " + min + "ms");
1728         assertTrue(duration <= max,
1729                 "Duration " + duration + "ms, expected <= " + max + "ms");
1730         return duration;
1731     }
1732 
1733     /**
1734      * Interrupts a thread when it waits (timed or untimed) at location "{@code c.m}".
1735      * {@code c} is the fully qualified class name and {@code m} is the method name.
1736      */
1737     private void interruptThreadAt(Thread target, String location) throws InterruptedException {
1738         int index = location.lastIndexOf('.');
1739         String className = location.substring(0, index);
1740         String methodName = location.substring(index + 1);
1741 
1742         boolean found = false;
1743         while (!found) {
1744             Thread.State state = target.getState();
1745             assertTrue(state != TERMINATED);
1746             if ((state == WAITING || state == TIMED_WAITING)
1747                     && contains(target.getStackTrace(), className, methodName)) {
1748                 found = true;
1749             } else {
1750                 Thread.sleep(20);
1751             }
1752         }
1753         target.interrupt();
1754     }
1755 
1756     /**
1757      * Schedules the current thread to be interrupted when it waits (timed or untimed)
1758      * at the given location.
1759      */
1760     private void scheduleInterruptAt(String location) {
1761         Thread target = Thread.currentThread();
1762         scheduler.submit(() -> {
1763             interruptThreadAt(target, location);
1764             return null;
1765         });
1766     }
1767 
1768     /**
1769      * Returns true if the given stack trace contains an element for the given class
1770      * and method name.
1771      */
1772     private boolean contains(StackTraceElement[] stack, String className, String methodName) {
1773         return Arrays.stream(stack)
1774                 .anyMatch(e -> className.equals(e.getClassName())
1775                         && methodName.equals(e.getMethodName()));
1776     }
1777 }