1 /*
   2  * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /*
  25  * @test id=platform
  26  * @bug 8284199 8296779 8306647
  27  * @summary Basic tests for StructuredTaskScope
  28  * @enablePreview
  29  * @run junit/othervm -DthreadFactory=platform StructuredTaskScopeTest
  30  */
  31 
  32 /*
  33  * @test id=virtual
  34  * @enablePreview
  35  * @run junit/othervm -DthreadFactory=virtual StructuredTaskScopeTest
  36  */
  37 
  38 import java.time.Duration;
  39 import java.util.Arrays;
  40 import java.util.ArrayList;
  41 import java.util.List;
  42 import java.util.Set;
  43 import java.util.NoSuchElementException;
  44 import java.util.concurrent.Callable;
  45 import java.util.concurrent.ConcurrentHashMap;
  46 import java.util.concurrent.CountDownLatch;
  47 import java.util.concurrent.Executors;
  48 import java.util.concurrent.ExecutionException;
  49 import java.util.concurrent.Future;
  50 import java.util.concurrent.LinkedTransferQueue;
  51 import java.util.concurrent.ThreadFactory;
  52 import java.util.concurrent.TimeoutException;
  53 import java.util.concurrent.TimeUnit;
  54 import java.util.concurrent.RejectedExecutionException;
  55 import java.util.concurrent.ScheduledExecutorService;
  56 import java.util.concurrent.StructuredTaskScope;
  57 import java.util.concurrent.StructuredTaskScope.*;
  58 import java.util.concurrent.StructureViolationException;
  59 import java.util.concurrent.atomic.AtomicBoolean;
  60 import java.util.concurrent.atomic.AtomicInteger;
  61 import java.util.function.Function;
  62 import java.util.function.Predicate;
  63 import java.util.stream.Stream;
  64 import static java.lang.Thread.State.*;
  65 
  66 import org.junit.jupiter.api.Test;
  67 import org.junit.jupiter.api.BeforeAll;
  68 import org.junit.jupiter.api.AfterAll;
  69 import org.junit.jupiter.params.ParameterizedTest;
  70 import org.junit.jupiter.params.provider.MethodSource;
  71 import static org.junit.jupiter.api.Assertions.*;
  72 
  73 class StructuredTaskScopeTest {
  74     private static ScheduledExecutorService scheduler;
  75     private static List<ThreadFactory> threadFactories;
  76 
  77     @BeforeAll
  78     static void setup() throws Exception {
  79         scheduler = Executors.newSingleThreadScheduledExecutor();
  80 
  81         // thread factories
  82         String value = System.getProperty("threadFactory");
  83         List<ThreadFactory> list = new ArrayList<>();
  84         if (value == null || value.equals("platform"))
  85             list.add(Thread.ofPlatform().factory());
  86         if (value == null || value.equals("virtual"))
  87             list.add(Thread.ofVirtual().factory());
  88         assertTrue(list.size() > 0, "No thread factories for tests");
  89         threadFactories = list;
  90     }
  91 
  92     @AfterAll
  93     static void shutdown() {
  94         scheduler.shutdown();
  95     }
  96 
  97     private static Stream<ThreadFactory> factories() {
  98         return threadFactories.stream();
  99     }
 100 
 101     /**
 102      * Test that fork creates virtual threads when no ThreadFactory is configured.
 103      */
 104     @Test
 105     void testForkCreateVirtualThread() throws Exception {
 106         Set<Thread> threads = ConcurrentHashMap.newKeySet();
 107         try (var scope = StructuredTaskScope.open(Policy.ignoreAll())) {
 108             for (int i = 0; i < 50; i++) {
 109                 // runnable
 110                 scope.fork(() -> {
 111                     threads.add(Thread.currentThread());
 112                 });
 113 
 114                 // callable
 115                 scope.fork(() -> {
 116                     threads.add(Thread.currentThread());
 117                     return null;
 118                 });
 119             }
 120             scope.join();
 121         }
 122         assertEquals(100, threads.size());
 123         threads.forEach(t -> assertTrue(t.isVirtual()));
 124     }
 125 
 126     /**
 127      * Test that fork create threads with the configured ThreadFactory.
 128      */
 129     @ParameterizedTest
 130     @MethodSource("factories")
 131     void testForkUsesThreadFactory(ThreadFactory factory) throws Exception {
 132         // TheadFactory that keeps reference to all threads it creates
 133         class RecordingThreadFactory implements ThreadFactory {
 134             final ThreadFactory delegate;
 135             final Set<Thread> threads = ConcurrentHashMap.newKeySet();
 136             RecordingThreadFactory(ThreadFactory delegate) {
 137                 this.delegate = delegate;
 138             }
 139             @Override
 140             public Thread newThread(Runnable task) {
 141                 Thread thread = delegate.newThread(task);
 142                 threads.add(thread);
 143                 return thread;
 144             }
 145             Set<Thread> threads() {
 146                 return threads;
 147             }
 148         }
 149         var recordingThreadFactory = new RecordingThreadFactory(factory);
 150         Set<Thread> threads = ConcurrentHashMap.newKeySet();
 151         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 152                 cf -> cf.withThreadFactory(recordingThreadFactory))) {
 153 
 154             for (int i = 0; i < 50; i++) {
 155                 // runnable
 156                 scope.fork(() -> {
 157                     threads.add(Thread.currentThread());
 158                 });
 159 
 160                 // callable
 161                 scope.fork(() -> {
 162                     threads.add(Thread.currentThread());
 163                     return null;
 164                 });
 165             }
 166             scope.join();
 167         }
 168         assertEquals(100, threads.size());
 169         assertEquals(recordingThreadFactory.threads(), threads);
 170     }
 171 
 172     /**
 173      * Test fork is owner confined.
 174      */
 175     @ParameterizedTest
 176     @MethodSource("factories")
 177     void testForkConfined(ThreadFactory factory) throws Exception {
 178         try (var scope = StructuredTaskScope.open(Policy.<Boolean>ignoreAll(),
 179                 cf -> cf.withThreadFactory(factory))) {
 180 
 181             // thread in scope cannot fork
 182             Subtask<Boolean> subtask = scope.fork(() -> {
 183                 assertThrows(WrongThreadException.class, () -> {
 184                     scope.fork(() -> null);
 185                 });
 186                 return true;
 187             });
 188             scope.join();
 189             assertTrue(subtask.get());
 190 
 191             // random thread cannot fork
 192             try (var pool = Executors.newSingleThreadExecutor()) {
 193                 Future<Void> future = pool.submit(() -> {
 194                     assertThrows(WrongThreadException.class, () -> {
 195                         scope.fork(() -> null);
 196                     });
 197                     return null;
 198                 });
 199                 future.get();
 200             }
 201         }
 202     }
 203 
 204     /**
 205      * Test fork after join, no subtasks forked before join.
 206      */
 207     @ParameterizedTest
 208     @MethodSource("factories")
 209     void testForkAfterJoin1(ThreadFactory factory) throws Exception {
 210         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 211                 cf -> cf.withThreadFactory(factory))) {
 212             scope.join();
 213             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 214         }
 215     }
 216 
 217     /**
 218      * Test fork after join, subtasks forked before join.
 219      */
 220     @ParameterizedTest
 221     @MethodSource("factories")
 222     void testForkAfterJoin2(ThreadFactory factory) throws Exception {
 223         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 224                 cf -> cf.withThreadFactory(factory))) {
 225             scope.fork(() -> "foo");
 226             scope.join();
 227             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 228         }
 229     }
 230 
 231     /**
 232      * Test fork after join throws.
 233      */
 234     @ParameterizedTest
 235     @MethodSource("factories")
 236     void testForkAfterJoinThrows(ThreadFactory factory) throws Exception {
 237         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 238                 cf -> cf.withThreadFactory(factory))) {
 239             var latch = new CountDownLatch(1);
 240             var subtask1 = scope.fork(() -> {
 241                 latch.await();
 242                 return "foo";
 243             });
 244 
 245             // join throws
 246             Thread.currentThread().interrupt();
 247             assertThrows(InterruptedException.class, scope::join);
 248 
 249             // allow subtask1 to finish
 250             latch.countDown();
 251 
 252             // continue to fork
 253             var subtask2 = scope.fork(() -> "bar");
 254             scope.join();
 255             assertEquals("foo", subtask1.get());
 256             assertEquals("bar", subtask2.get());
 257         }
 258     }
 259 
 260     /**
 261      * Test fork after task scope is cancelled.
 262      */
 263     @ParameterizedTest
 264     @MethodSource("factories")
 265     void testForkAfterCancel(ThreadFactory factory) throws Exception {
 266         var countingThreadFactory = new CountingThreadFactory(factory);
 267         var testPolicy = new CancelAfterOnePolicy<String>();
 268 
 269         try (var scope = StructuredTaskScope.open(testPolicy,
 270                 cf -> cf.withThreadFactory(countingThreadFactory))) {
 271 
 272             // fork subtask, the scope should be cancelled when the subtask completes
 273             var subtask1 = scope.fork(() -> "foo");
 274             while (!scope.isCancelled()) {
 275                 Thread.sleep(20);
 276             }
 277 
 278             assertEquals(1, countingThreadFactory.threadCount());
 279             assertEquals(1, testPolicy.onForkCount());
 280             assertEquals(1, testPolicy.onCompleteCount());
 281 
 282             // fork second subtask, it should not run
 283             var subtask2 = scope.fork(() -> "bar");
 284 
 285             // onFork should be invoked, newThread and onComplete should not be invoked
 286             assertEquals(1, countingThreadFactory.threadCount());
 287             assertEquals(2, testPolicy.onForkCount());
 288             assertEquals(1, testPolicy.onCompleteCount());
 289 
 290             scope.join();
 291 
 292             assertEquals(1, countingThreadFactory.threadCount());
 293             assertEquals(2, testPolicy.onForkCount());
 294             assertEquals(1, testPolicy.onCompleteCount());
 295             assertEquals("foo", subtask1.get());
 296             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
 297         }
 298     }
 299 
 300     /**
 301      * Test fork after task scope is closed.
 302      */
 303     @ParameterizedTest
 304     @MethodSource("factories")
 305     void testForkAfterClose(ThreadFactory factory) throws Exception {
 306         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 307                 cf -> cf.withThreadFactory(factory))) {
 308             scope.close();
 309             assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
 310         }
 311     }
 312 
 313     /**
 314      * Test fork when the ThreadFactory rejects creating a thread.
 315      */
 316     @Test
 317     void testForkRejectedExecutionException() throws Exception {
 318         ThreadFactory factory = task -> null;
 319         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 320                 cf -> cf.withThreadFactory(factory))) {
 321             assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null));
 322         }
 323     }
 324 
 325     /**
 326      * Test join with no subtasks.
 327      */
 328     @Test
 329     void testJoinWithNoSubtasks() throws Exception {
 330         try (var scope = StructuredTaskScope.open(Policy.ignoreAll())) {
 331             scope.join();
 332         }
 333     }
 334 
 335     /**
 336      * Test join with a running subtask.
 337      */
 338     @ParameterizedTest
 339     @MethodSource("factories")
 340     void testJoinWithSubtasks(ThreadFactory factory) throws Exception {
 341         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 342                 cf -> cf.withThreadFactory(factory))) {
 343             Subtask<String> subtask = scope.fork(() -> {
 344                 Thread.sleep(Duration.ofMillis(100));
 345                 return "foo";
 346             });
 347             scope.join();
 348             assertEquals("foo", subtask.get());
 349         }
 350     }
 351 
 352     /**
 353      * Test repeated calls to join.
 354      */
 355     @Test
 356     void testJoinAfterJoin() throws Exception {
 357         var results = new LinkedTransferQueue<>(List.of("foo", "bar", "baz"));
 358         Policy<Object, String> policy = results::take;
 359         try (var scope = StructuredTaskScope.open(policy)) {
 360             scope.fork(() -> "foo");
 361 
 362             // each call to join should invoke Policy::result
 363             assertEquals("foo", scope.join());
 364             assertEquals("bar", scope.join());
 365             assertEquals("baz", scope.join());
 366         }
 367     }
 368 
 369     /**
 370      * Test join is owner confined.
 371      */
 372     @ParameterizedTest
 373     @MethodSource("factories")
 374     void testJoinConfined(ThreadFactory factory) throws Exception {
 375         try (var scope = StructuredTaskScope.open(Policy.<Boolean>ignoreAll(),
 376                 cf -> cf.withThreadFactory(factory))) {
 377 
 378             // thread in scope cannot join
 379             Subtask<Boolean> subtask = scope.fork(() -> {
 380                 assertThrows(WrongThreadException.class, () -> { scope.join(); });
 381                 return true;
 382             });
 383 
 384             scope.join();
 385 
 386             assertTrue(subtask.get());
 387 
 388             // random thread cannot join
 389             try (var pool = Executors.newSingleThreadExecutor()) {
 390                 Future<Void> future = pool.submit(() -> {
 391                     assertThrows(WrongThreadException.class, scope::join);
 392                     return null;
 393                 });
 394                 future.get();
 395             }
 396         }
 397     }
 398 
 399     /**
 400      * Test join with interrupt status set.
 401      */
 402     @ParameterizedTest
 403     @MethodSource("factories")
 404     void testInterruptJoin1(ThreadFactory factory) throws Exception {
 405         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 406                 cf -> cf.withThreadFactory(factory))) {
 407 
 408             var latch = new CountDownLatch(1);
 409 
 410             Subtask<String> subtask = scope.fork(() -> {
 411                 latch.await();
 412                 return "foo";
 413             });
 414 
 415             // join should throw
 416             Thread.currentThread().interrupt();
 417             try {
 418                 scope.join();
 419                 fail("join did not throw");
 420             } catch (InterruptedException expected) {
 421                 assertFalse(Thread.interrupted());   // interrupt status should be cleared
 422             } finally {
 423                 // let task continue
 424                 latch.countDown();
 425             }
 426 
 427             // join should complete
 428             scope.join();
 429             assertEquals("foo", subtask.get());
 430         }
 431     }
 432 
 433     /**
 434      * Test interrupt of thread blocked in join.
 435      */
 436     @ParameterizedTest
 437     @MethodSource("factories")
 438     void testInterruptJoin2(ThreadFactory factory) throws Exception {
 439         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 440                 cf -> cf.withThreadFactory(factory))) {
 441 
 442             var latch = new CountDownLatch(1);
 443             Subtask<String> subtask = scope.fork(() -> {
 444                 latch.await();
 445                 return "foo";
 446             });
 447 
 448             // join should throw
 449             scheduleInterruptAt("java.util.concurrent.StructuredTaskScope.join");
 450             try {
 451                 scope.join();
 452                 fail("join did not throw");
 453             } catch (InterruptedException expected) {
 454                 assertFalse(Thread.interrupted());   // interrupt status should be clear
 455             } finally {
 456                 // let task continue
 457                 latch.countDown();
 458             }
 459 
 460             // join should complete
 461             scope.join();
 462             assertEquals("foo", subtask.get());
 463         }
 464     }
 465 
 466     /**
 467      * Test join when scope is cancelled.
 468      */
 469     @ParameterizedTest
 470     @MethodSource("factories")
 471     void testJoinWhenCancelled(ThreadFactory factory) throws Exception {
 472         var countingThreadFactory = new CountingThreadFactory(factory);
 473         var testPolicy = new CancelAfterOnePolicy<String>();
 474 
 475         try (var scope = StructuredTaskScope.open(testPolicy,
 476                     cf -> cf.withThreadFactory(countingThreadFactory))) {
 477 
 478             // fork subtask, the scope should be cancelled when the subtask completes
 479             var subtask1 = scope.fork(() -> "foo");
 480             while (!scope.isCancelled()) {
 481                 Thread.sleep(20);
 482             }
 483 
 484             // fork second subtask, it should not run
 485             var subtask2 = scope.fork(() -> {
 486                 Thread.sleep(Duration.ofDays(1));
 487                 return "bar";
 488             });
 489 
 490             scope.join();
 491 
 492             assertEquals("foo", subtask1.get());
 493             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
 494         }
 495     }
 496 
 497     /**
 498      * Test join after scope is closed.
 499      */
 500     @Test
 501     void testJoinAfterClose() throws Exception {
 502         try (var scope = StructuredTaskScope.open(Policy.ignoreAll())) {
 503             scope.close();
 504             assertThrows(IllegalStateException.class, () -> scope.join());
 505         }
 506     }
 507 
 508     /**
 509      * Test join with timeout, subtasks finish before timeout expires.
 510      */
 511     @ParameterizedTest
 512     @MethodSource("factories")
 513     void testJoinWithTimeout1(ThreadFactory factory) throws Exception {
 514         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 515                 cf -> cf.withThreadFactory(factory)
 516                         .withTimeout(Duration.ofDays(1)))) {
 517 
 518             Subtask<String> subtask = scope.fork(() -> {
 519                 Thread.sleep(Duration.ofSeconds(1));
 520                 return "foo";
 521             });
 522 
 523             scope.join();
 524 
 525             assertFalse(scope.isCancelled());
 526             assertEquals("foo", subtask.get());
 527         }
 528     }
 529 
 530     /**
 531      * Test join with timeout, timeout expires before subtasks finish.
 532      */
 533     @ParameterizedTest
 534     @MethodSource("factories")
 535     void testJoinWithTimeout2(ThreadFactory factory) throws Exception {
 536         long startMillis = millisTime();
 537         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 538                 cf -> cf.withThreadFactory(factory)
 539                         .withTimeout(Duration.ofSeconds(2)))) {
 540 
 541             Subtask<Void> subtask = scope.fork(() -> {
 542                 Thread.sleep(Duration.ofDays(1));
 543                 return null;
 544             });
 545 
 546             try {
 547                 scope.join();
 548                 fail();
 549             } catch (ExecutionException e) {
 550                 assertTrue(e.getCause() instanceof TimeoutException);
 551             }
 552             expectDuration(startMillis, /*min*/1900, /*max*/20_000);
 553 
 554             assertTrue(scope.isCancelled());
 555             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 556         }
 557     }
 558 
 559     /**
 560      * Test join with timeout that has already expired.
 561      */
 562     @ParameterizedTest
 563     @MethodSource("factories")
 564     void testJoinWithTimeout3(ThreadFactory factory) throws Exception {
 565         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 566                 cf -> cf.withThreadFactory(factory)
 567                         .withTimeout(Duration.ofSeconds(-1)))) {
 568 
 569             Subtask<Void> subtask = scope.fork(() -> {
 570                 Thread.sleep(Duration.ofDays(1));
 571                 return null;
 572             });
 573 
 574             try {
 575                 scope.join();
 576                 fail();
 577             } catch (ExecutionException e) {
 578                 assertTrue(e.getCause() instanceof TimeoutException);
 579             }
 580             assertTrue(scope.isCancelled());
 581             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 582         }
 583     }
 584 
 585     /**
 586      * Test that cancelling exceutions interrupts unfinished threads.
 587      */
 588     @ParameterizedTest
 589     @MethodSource("factories")
 590     void testCancelInterruptsThreads(ThreadFactory factory) throws Exception {
 591         var testPolicy = new CancelAfterOnePolicy<String>();
 592 
 593         try (var scope = StructuredTaskScope.open(testPolicy,
 594                 cf -> cf.withThreadFactory(factory))) {
 595 
 596             // fork subtask1 that runs for a long time
 597             var started = new CountDownLatch(1);
 598             var interrupted = new CountDownLatch(1);
 599             var subtask1 = scope.fork(() -> {
 600                 started.countDown();
 601                 try {
 602                     Thread.sleep(Duration.ofDays(1));
 603                 } catch (InterruptedException e) {
 604                     interrupted.countDown();
 605                 }
 606             });
 607             started.await();
 608 
 609             // fork subtask2, the scope should be cancelled when the subtask completes
 610             var subtask2 = scope.fork(() -> "bar");
 611             while (!scope.isCancelled()) {
 612                 Thread.sleep(20);
 613             }
 614 
 615             // subtask1 should be interrupted
 616             interrupted.await();
 617 
 618             scope.join();
 619 
 620             assertEquals(Subtask.State.UNAVAILABLE, subtask1.state());
 621             assertEquals("bar", subtask2.get());
 622         }
 623     }
 624 
 625     /**
 626      * Test that timeout interrupts unfinished threads.
 627      */
 628     @ParameterizedTest
 629     @MethodSource("factories")
 630     void testTimeoutInterruptsThreads(ThreadFactory factory) throws Exception {
 631         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 632                 cf -> cf.withThreadFactory(factory)
 633                         .withTimeout(Duration.ofSeconds(2)))) {
 634 
 635             var started = new AtomicBoolean();
 636             var interrupted = new CountDownLatch(1);
 637             Subtask<Void> subtask = scope.fork(() -> {
 638                 started.set(true);
 639                 try {
 640                     Thread.sleep(Duration.ofDays(1));
 641                 } catch (InterruptedException e) {
 642                     interrupted.countDown();
 643                 }
 644                 return null;
 645             });
 646 
 647             while (!scope.isCancelled()) {
 648                 Thread.sleep(50);
 649             }
 650 
 651             // if subtask started then it should be interrupted
 652             if (started.get()) {
 653                 interrupted.await();
 654             }
 655 
 656             try {
 657                 scope.join();
 658                 fail();
 659             } catch (ExecutionException e) {
 660                 assertTrue(e.getCause() instanceof TimeoutException);
 661             }
 662             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 663         }
 664     }
 665 
 666     /**
 667      * Test close without join, no subtasks forked.
 668      */
 669     @Test
 670     void testCloseWithoutJoin1() {
 671         try (var scope = StructuredTaskScope.open(Policy.ignoreAll())) {
 672             // do nothing
 673         }
 674     }
 675 
 676     /**
 677      * Test close without join, unfinished subtasks.
 678      */
 679     @ParameterizedTest
 680     @MethodSource("factories")
 681     void testCloseWithoutJoin2(ThreadFactory factory) {
 682         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 683                 cf -> cf.withThreadFactory(factory))) {
 684             Subtask<String> subtask = scope.fork(() -> {
 685                 Thread.sleep(Duration.ofDays(1));
 686                 return null;
 687             });
 688             assertThrows(IllegalStateException.class, scope::close);
 689 
 690             // subtask result/exception not available
 691             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 692             assertThrows(IllegalStateException.class, subtask::get);
 693             assertThrows(IllegalStateException.class, subtask::exception);
 694         }
 695     }
 696 
 697     /**
 698      * Test close after join throws. Close should not throw as join attempted.
 699      */
 700     @ParameterizedTest
 701     @MethodSource("factories")
 702     void testCloseAfterJoinThrows(ThreadFactory factory) throws Exception {
 703         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 704                 cf -> cf.withThreadFactory(factory))) {
 705             var subtask = scope.fork(() -> {
 706                 Thread.sleep(Duration.ofDays(1));
 707                 return null;
 708             });
 709 
 710             // join throws
 711             Thread.currentThread().interrupt();
 712             assertThrows(InterruptedException.class, scope::join);
 713             assertThrows(IllegalStateException.class, subtask::get);
 714 
 715         }  // close should not throw
 716     }
 717 
 718     /**
 719      * Test close is owner confined.
 720      */
 721     @ParameterizedTest
 722     @MethodSource("factories")
 723     void testCloseConfined(ThreadFactory factory) throws Exception {
 724         try (var scope = StructuredTaskScope.open(Policy.<Boolean>ignoreAll(),
 725                 cf -> cf.withThreadFactory(factory))) {
 726 
 727             // attempt to close from thread in scope
 728             Subtask<Boolean> subtask = scope.fork(() -> {
 729                 assertThrows(WrongThreadException.class, scope::close);
 730                 return true;
 731             });
 732 
 733             scope.join();
 734             assertTrue(subtask.get());
 735 
 736             // random thread cannot close scope
 737             try (var pool = Executors.newCachedThreadPool(factory)) {
 738                 Future<Boolean> future = pool.submit(() -> {
 739                     assertThrows(WrongThreadException.class, scope::close);
 740                     return null;
 741                 });
 742                 future.get();
 743             }
 744         }
 745     }
 746 
 747     /**
 748      * Test close with interrupt status set.
 749      */
 750     @ParameterizedTest
 751     @MethodSource("factories")
 752     void testInterruptClose1(ThreadFactory factory) throws Exception {
 753         var testPolicy = new CancelAfterOnePolicy<String>();
 754         try (var scope = StructuredTaskScope.open(testPolicy,
 755                 cf -> cf.withThreadFactory(factory))) {
 756 
 757             // fork first subtask, a straggler as it continues after being interrupted
 758             var started = new CountDownLatch(1);
 759             var done = new AtomicBoolean();
 760             scope.fork(() -> {
 761                 started.countDown();
 762                 try {
 763                     Thread.sleep(Duration.ofDays(1));
 764                 } catch (InterruptedException e) {
 765                     // interrupted by cancel, expected
 766                 }
 767                 Thread.sleep(Duration.ofMillis(100)); // force close to wait
 768                 done.set(true);
 769                 return null;
 770             });
 771             started.await();
 772 
 773             // fork second subtask, the scope should be cancelled when this subtask completes
 774             scope.fork(() -> "bar");
 775             while (!scope.isCancelled()) {
 776                 Thread.sleep(20);
 777             }
 778 
 779             scope.join();
 780 
 781             // invoke close with interrupt status set
 782             Thread.currentThread().interrupt();
 783             try {
 784                 scope.close();
 785             } finally {
 786                 assertTrue(Thread.interrupted());   // clear interrupt status
 787                 assertTrue(done.get());
 788             }
 789         }
 790     }
 791 
 792     /**
 793      * Test interrupting thread waiting in close.
 794      */
 795     @ParameterizedTest
 796     @MethodSource("factories")
 797     void testInterruptClose2(ThreadFactory factory) throws Exception {
 798         var testPolicy = new CancelAfterOnePolicy<String>();
 799         try (var scope = StructuredTaskScope.open(testPolicy,
 800                 cf -> cf.withThreadFactory(factory))) {
 801 
 802             Thread mainThread = Thread.currentThread();
 803 
 804             // fork first subtask, a straggler as it continues after being interrupted
 805             var started = new CountDownLatch(1);
 806             var done = new AtomicBoolean();
 807             scope.fork(() -> {
 808                 started.countDown();
 809                 try {
 810                     Thread.sleep(Duration.ofDays(1));
 811                 } catch (InterruptedException e) {
 812                     // interrupted by cancel, expected
 813                 }
 814 
 815                 // interrupt main thread when it blocks in close
 816                 interruptThreadAt(mainThread, "java.util.concurrent.StructuredTaskScope.close");
 817 
 818                 Thread.sleep(Duration.ofMillis(100)); // force close to wait
 819                 done.set(true);
 820                 return null;
 821             });
 822             started.await();
 823 
 824             // fork second subtask, the scope should be cancelled when this subtask completes
 825             scope.fork(() -> "bar");
 826             while (!scope.isCancelled()) {
 827                 Thread.sleep(20);
 828             }
 829 
 830             scope.join();
 831 
 832             // main thread will be interrupted while blocked in close
 833             try {
 834                 scope.close();
 835             } finally {
 836                 assertTrue(Thread.interrupted());   // clear interrupt status
 837                 assertTrue(done.get());
 838             }
 839         }
 840     }
 841 
 842     /**
 843      * Test that closing an enclosing scope closes the thread flock of a nested scope.
 844      */
 845     @Test
 846     void testCloseThrowsStructureViolation() throws Exception {
 847         try (var scope1 = StructuredTaskScope.open(Policy.ignoreAll())) {
 848             try (var scope2 = StructuredTaskScope.open(Policy.ignoreAll())) {
 849 
 850                 // close enclosing scope
 851                 try {
 852                     scope1.close();
 853                     fail("close did not throw");
 854                 } catch (StructureViolationException expected) { }
 855 
 856                 // underlying flock should be closed
 857                 var executed = new AtomicBoolean();
 858                 Subtask<?> subtask = scope2.fork(() -> executed.set(true));
 859                 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 860                 scope2.join();
 861                 assertFalse(executed.get());
 862                 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 863             }
 864         }
 865     }
 866 
 867     /**
 868      * Test that isCancelled returns true after close.
 869      */
 870     @Test
 871     void testIsCancelledAfterClose() throws Exception {
 872         try (var scope = StructuredTaskScope.open(Policy.ignoreAll())) {
 873             assertFalse(scope.isCancelled());
 874             scope.close();
 875             assertTrue(scope.isCancelled());
 876         }
 877     }
 878 
 879     /**
 880      * Test Policy.onFork throwing exception.
 881      */
 882     @Test
 883     void testOnForkThrows() throws Exception {
 884         var policy = new Policy<String, Void>() {
 885             @Override
 886             public boolean onFork(Subtask<? extends String> subtask) {
 887                 throw new FooException();
 888             }
 889             @Override
 890             public Void result() {
 891                 return null;
 892             }
 893         };
 894         try (var scope = StructuredTaskScope.open(policy)) {
 895             assertThrows(FooException.class, () -> scope.fork(() -> "foo"));
 896         }
 897     }
 898 
 899     /**
 900      * Test Policy.onFork returning true to cancel execution.
 901      */
 902     @Test
 903     void testOnForkCancelsExecution() throws Exception {
 904         var policy = new Policy<String, Void>() {
 905             @Override
 906             public boolean onFork(Subtask<? extends String> subtask) {
 907                 return true;
 908             }
 909             @Override
 910             public Void result() {
 911                 return null;
 912             }
 913         };
 914         try (var scope = StructuredTaskScope.open(policy)) {
 915             assertFalse(scope.isCancelled());
 916             scope.fork(() -> "foo");
 917             assertTrue(scope.isCancelled());
 918             scope.join();
 919         }
 920     }
 921 
 922     /**
 923      * Test Policy.onComplete returning true to cancel execution.
 924      */
 925     @Test
 926     void testOnCompleteCancelsExecution() throws Exception {
 927         var policy = new Policy<String, Void>() {
 928             @Override
 929             public boolean onComplete(Subtask<? extends String> subtask) {
 930                 return true;
 931             }
 932             @Override
 933             public Void result() {
 934                 return null;
 935             }
 936         };
 937         try (var scope = StructuredTaskScope.open(policy)) {
 938             assertFalse(scope.isCancelled());
 939             scope.fork(() -> "foo");
 940             while (!scope.isCancelled()) {
 941                 Thread.sleep(10);
 942             }
 943             scope.join();
 944         }
 945     }
 946 
 947     /**
 948      * Test toString.
 949      */
 950     @Test
 951     void testToString() throws Exception {
 952         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 953                 cf -> cf.withName("duke"))) {
 954 
 955             // open
 956             assertTrue(scope.toString().contains("duke"));
 957 
 958             // closed
 959             scope.close();
 960             assertTrue(scope.toString().contains("duke"));
 961         }
 962     }
 963 
 964     /**
 965      * Test Subtask with task that completes successfully.
 966      */
 967     @ParameterizedTest
 968     @MethodSource("factories")
 969     void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception {
 970         try (var scope = StructuredTaskScope.open(Policy.<String>ignoreAll(),
 971                 cf -> cf.withThreadFactory(factory))) {
 972 
 973             Subtask<String> subtask = scope.fork(() -> "foo");
 974 
 975             // before join
 976             assertThrows(IllegalStateException.class, subtask::get);
 977             assertThrows(IllegalStateException.class, subtask::exception);
 978 
 979             scope.join();
 980 
 981             // after join
 982             assertEquals(Subtask.State.SUCCESS, subtask.state());
 983             assertEquals("foo", subtask.get());
 984             assertThrows(IllegalStateException.class, subtask::exception);
 985         }
 986     }
 987 
 988     /**
 989      * Test Subtask with task that fails.
 990      */
 991     @ParameterizedTest
 992     @MethodSource("factories")
 993     void testSubtaskWhenFailed(ThreadFactory factory) throws Exception {
 994         try (var scope = StructuredTaskScope.open(Policy.<String>ignoreAll(),
 995                 cf -> cf.withThreadFactory(factory))) {
 996 
 997             Subtask<String> subtask = scope.fork(() -> { throw new FooException(); });
 998 
 999             // before join
1000             assertThrows(IllegalStateException.class, subtask::get);
1001             assertThrows(IllegalStateException.class, subtask::exception);
1002 
1003             scope.join();
1004 
1005             // after join
1006             assertEquals(Subtask.State.FAILED, subtask.state());
1007             assertThrows(IllegalStateException.class, subtask::get);
1008             assertTrue(subtask.exception() instanceof FooException);
1009         }
1010     }
1011 
1012     /**
1013      * Test Subtask with a task that has not completed.
1014      */
1015     @ParameterizedTest
1016     @MethodSource("factories")
1017     void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception {
1018         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
1019                 cf -> cf.withThreadFactory(factory))) {
1020             Subtask<Void> subtask = scope.fork(() -> {
1021                 Thread.sleep(Duration.ofDays(1));
1022                 return null;
1023             });
1024 
1025             // before join
1026             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1027             assertThrows(IllegalStateException.class, subtask::get);
1028             assertThrows(IllegalStateException.class, subtask::exception);
1029 
1030             // attempt join, join throws
1031             Thread.currentThread().interrupt();
1032             assertThrows(InterruptedException.class, scope::join);
1033 
1034             // after join
1035             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1036             assertThrows(IllegalStateException.class, subtask::get);
1037             assertThrows(IllegalStateException.class, subtask::exception);
1038         }
1039     }
1040 
1041     /**
1042      * Test Subtask forked after execution cancelled.
1043      */
1044     @ParameterizedTest
1045     @MethodSource("factories")
1046     void testSubtaskWhenCancelled(ThreadFactory factory) throws Exception {
1047         try (var scope = StructuredTaskScope.open(new CancelAfterOnePolicy<String>())) {
1048             scope.fork(() -> "foo");
1049             while (!scope.isCancelled()) {
1050                 Thread.sleep(20);
1051             }
1052 
1053             var subtask = scope.fork(() -> "foo");
1054 
1055             // before join
1056             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1057             assertThrows(IllegalStateException.class, subtask::get);
1058             assertThrows(IllegalStateException.class, subtask::exception);
1059 
1060             scope.join();
1061 
1062             // after join
1063             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1064             assertThrows(IllegalStateException.class, subtask::get);
1065             assertThrows(IllegalStateException.class, subtask::exception);
1066         }
1067     }
1068 
1069     /**
1070      * Test Subtask::toString.
1071      */
1072     @Test
1073     void testSubtaskToString() throws Exception {
1074         try (var scope = StructuredTaskScope.open(Policy.ignoreAll())) {
1075             var latch = new CountDownLatch(1);
1076             var subtask1 = scope.fork(() -> {
1077                 latch.await();
1078                 return "foo";
1079             });
1080             var subtask2 = scope.fork(() -> { throw new FooException(); });
1081 
1082             // subtask1 result is unavailable
1083             assertTrue(subtask1.toString().contains("Unavailable"));
1084             latch.countDown();
1085 
1086             scope.join();
1087 
1088             assertTrue(subtask1.toString().contains("Completed successfully"));
1089             assertTrue(subtask2.toString().contains("Failed"));
1090         }
1091     }
1092 
1093     /**
1094      * Test Policy.ignoreSuccessfulOrThrow() with no subtasks.
1095      */
1096     @Test
1097     void testThrowIfFailed1() throws Throwable {
1098         try (var scope = StructuredTaskScope.open(Policy.ignoreSuccessfulOrThrow())) {
1099             var result = scope.join();
1100             assertNull(result);
1101         }
1102     }
1103 
1104     /**
1105      * Test Policy.ignoreSuccessfulOrThrow() with subtasks that complete successfully.
1106      */
1107     @ParameterizedTest
1108     @MethodSource("factories")
1109     void testThrowIfFailed2(ThreadFactory factory) throws Throwable {
1110         try (var scope = StructuredTaskScope.open(Policy.<String>ignoreSuccessfulOrThrow(),
1111                 cf -> cf.withThreadFactory(factory))) {
1112             var subtask1 = scope.fork(() -> "foo");
1113             var subtask2 = scope.fork(() -> "bar");
1114             var result = scope.join();
1115             assertNull(result);
1116             assertEquals("foo", subtask1.get());
1117             assertEquals("bar", subtask2.get());
1118         }
1119     }
1120 
1121     /**
1122      * Test Policy.ignoreSuccessfulOrThrow() with a subtask that complete successfully and
1123      * a subtask that fails.
1124      */
1125     @ParameterizedTest
1126     @MethodSource("factories")
1127     void testThrowIfFailed3(ThreadFactory factory) throws Throwable {
1128         try (var scope = StructuredTaskScope.open(Policy.<String>ignoreSuccessfulOrThrow(),
1129                 cf -> cf.withThreadFactory(factory))) {
1130             scope.fork(() -> "foo");
1131             scope.fork(() -> { throw new FooException(); });
1132             try {
1133                 scope.join();
1134             } catch (ExecutionException e) {
1135                 assertTrue(e.getCause() instanceof FooException);
1136             }
1137         }
1138     }
1139 
1140     /**
1141      * Test Policy.allSuccessfulOrThrow() with no subtasks.
1142      */
1143     @Test
1144     void testAllSuccessful1() throws Throwable {
1145         try (var scope = StructuredTaskScope.open(Policy.allSuccessfulOrThrow())) {
1146             var subtasks = scope.join().toList();
1147             assertTrue(subtasks.isEmpty());
1148         }
1149     }
1150 
1151     /**
1152      * Test Policy.allSuccessfulOrThrow() with subtasks that complete successfully.
1153      */
1154     @ParameterizedTest
1155     @MethodSource("factories")
1156     void testAllSuccessful2(ThreadFactory factory) throws Throwable {
1157         try (var scope = StructuredTaskScope.open(Policy.<String>allSuccessfulOrThrow(),
1158                 cf -> cf.withThreadFactory(factory))) {
1159             var subtask1 = scope.fork(() -> "foo");
1160             var subtask2 = scope.fork(() -> "bar");
1161             var subtasks = scope.join().toList();
1162             assertEquals(List.of(subtask1, subtask2), subtasks);
1163             assertEquals("foo", subtask1.get());
1164             assertEquals("bar", subtask2.get());
1165         }
1166     }
1167 
1168     /**
1169      * Test Policy.allSuccessfulOrThrow() with a subtask that complete successfully and
1170      * a subtask that fails.
1171      */
1172     @ParameterizedTest
1173     @MethodSource("factories")
1174     void testAllSuccessful3(ThreadFactory factory) throws Throwable {
1175         try (var scope = StructuredTaskScope.open(Policy.<String>allSuccessfulOrThrow(),
1176                 cf -> cf.withThreadFactory(factory))) {
1177             scope.fork(() -> "foo");
1178             scope.fork(() -> { throw new FooException(); });
1179             try {
1180                 scope.join();
1181             } catch (ExecutionException e) {
1182                 assertTrue(e.getCause() instanceof FooException);
1183             }
1184         }
1185     }
1186 
1187     /**
1188      * Test Policy.anySuccessfulResultOrThrow() with no subtasks.
1189      */
1190     @Test
1191     void testAnySuccessful1() throws Exception {
1192         try (var scope = StructuredTaskScope.open(Policy.anySuccessfulResultOrThrow())) {
1193             try {
1194                 scope.join();
1195             } catch (ExecutionException e) {
1196                 assertTrue(e.getCause() instanceof NoSuchElementException);
1197             }
1198         }
1199     }
1200 
1201     /**
1202      * Test Policy.anySuccessfulResultOrThrow() with a subtask that completes successfully.
1203      */
1204     @ParameterizedTest
1205     @MethodSource("factories")
1206     void testAnySuccessful2(ThreadFactory factory) throws Exception {
1207         try (var scope = StructuredTaskScope.open(Policy.<String>anySuccessfulResultOrThrow(),
1208                 cf -> cf.withThreadFactory(factory))) {
1209             scope.fork(() -> "foo");
1210             String result = scope.join();
1211             assertEquals("foo", result);
1212         }
1213     }
1214 
1215     /**
1216      * Test Policy.anySuccessfulResultOrThrow() with a subtask that completes successfully
1217      * with a null result.
1218      */
1219     @ParameterizedTest
1220     @MethodSource("factories")
1221     void testAnySuccessful3(ThreadFactory factory) throws Exception {
1222         try (var scope = StructuredTaskScope.open(Policy.<String>anySuccessfulResultOrThrow(),
1223                 cf -> cf.withThreadFactory(factory))) {
1224             scope.fork(() -> null);
1225             String result = scope.join();
1226             assertNull(result);
1227         }
1228     }
1229 
1230     /**
1231      * Test Policy.anySuccessfulResultOrThrow() with a subtask that complete succcessfully and
1232      * a subtask that fails.
1233      */
1234     @ParameterizedTest
1235     @MethodSource("factories")
1236     void testAnySuccessful4(ThreadFactory factory) throws Exception {
1237         try (var scope = StructuredTaskScope.open(Policy.<String>anySuccessfulResultOrThrow(),
1238                 cf -> cf.withThreadFactory(factory))) {
1239             scope.fork(() -> "foo");
1240             scope.fork(() -> { throw new FooException(); });
1241             String first = scope.join();
1242             assertEquals("foo", first);
1243         }
1244     }
1245 
1246     /**
1247      * Test Policy.anySuccessfulResultOrThrow() with a subtask that fails.
1248      */
1249     @ParameterizedTest
1250     @MethodSource("factories")
1251     void testAnySuccessful5(ThreadFactory factory) throws Exception {
1252         try (var scope = StructuredTaskScope.open(Policy.anySuccessfulResultOrThrow(),
1253                 cf -> cf.withThreadFactory(factory))) {
1254             scope.fork(() -> { throw new FooException(); });
1255             Throwable ex = assertThrows(ExecutionException.class, scope::join);
1256             assertTrue(ex.getCause() instanceof FooException);
1257         }
1258     }
1259 
1260     /**
1261      * Test Policy.ignoreAll() with no subtasks.
1262      */
1263     @Test
1264     void testIgnoreFailures1() throws Throwable {
1265         try (var scope = StructuredTaskScope.open(Policy.ignoreAll())) {
1266             var result = scope.join();
1267             assertNull(result);
1268         }
1269     }
1270 
1271     /**
1272      * Test Policy.ignoreAll() with subtasks that complete successfully.
1273      */
1274     @ParameterizedTest
1275     @MethodSource("factories")
1276     void testIgnoreFailures2(ThreadFactory factory) throws Throwable {
1277         try (var scope = StructuredTaskScope.open(Policy.<String>ignoreAll(),
1278                 cf -> cf.withThreadFactory(factory))) {
1279             var subtask1 = scope.fork(() -> "foo");
1280             var subtask2 = scope.fork(() -> "bar");
1281             var result = scope.join();
1282             assertNull(result);
1283             assertEquals("foo", subtask1.get());
1284             assertEquals("bar", subtask2.get());
1285         }
1286     }
1287 
1288     /**
1289      * Test Policy.ignoreAll() with a subtask that complete successfully and
1290      * a subtask that fails.
1291      */
1292     @ParameterizedTest
1293     @MethodSource("factories")
1294     void testIgnoreFailures3(ThreadFactory factory) throws Throwable {
1295         try (var scope = StructuredTaskScope.open(Policy.<String>ignoreAll(),
1296                 cf -> cf.withThreadFactory(factory))) {
1297             var subtask1 = scope.fork(() -> "foo");
1298             var subtask2 = scope.fork(() -> { throw new FooException(); });
1299             var result = scope.join();
1300             assertNull(result);
1301             assertEquals("foo", subtask1.get());
1302             assertTrue(subtask2.exception() instanceof FooException);
1303         }
1304     }
1305 
1306     /**
1307      * Test Policy.all(Predicate) with no subtasks.
1308      */
1309     @Test
1310     void testAllForked1() throws Throwable {
1311         try (var scope = StructuredTaskScope.open(Policy.all(s -> false))) {
1312             var subtasks = scope.join();
1313             assertEquals(0, subtasks.count());
1314         }
1315     }
1316 
1317     /**
1318      * Test Policy.all(Predicate) with no cancellation.
1319      */
1320     @ParameterizedTest
1321     @MethodSource("factories")
1322     void testAllForked2(ThreadFactory factory) throws Exception {
1323         try (var scope = StructuredTaskScope.open(Policy.<String>all(s -> false),
1324                 cf -> cf.withThreadFactory(factory))) {
1325 
1326             var subtask1 = scope.fork(() -> "foo");
1327             var subtask2 = scope.fork(() -> { throw new FooException(); });
1328 
1329             var subtasks = scope.join().toList();
1330             assertEquals(2, subtasks.size());
1331 
1332             assertTrue(subtasks.get(0) == subtask1);
1333             assertTrue(subtasks.get(1) == subtask2);
1334             assertEquals("foo", subtask1.get());
1335             assertTrue(subtask2.exception() instanceof FooException);
1336         }
1337     }
1338 
1339     /**
1340      * Test Policy.all(Predicate) with cancellation after one subtask completes.
1341      */
1342     @ParameterizedTest
1343     @MethodSource("factories")
1344     void testAllForked3(ThreadFactory factory) throws Exception {
1345         try (var scope = StructuredTaskScope.open(Policy.<String>all(s -> true),
1346                 cf -> cf.withThreadFactory(factory))) {
1347 
1348             var subtask1 = scope.fork(() -> "foo");
1349             var subtask2 = scope.fork(() -> {
1350                 Thread.sleep(Duration.ofDays(1));
1351                 return "bar";
1352             });
1353 
1354             var subtasks = scope.join().toList();
1355 
1356             assertEquals(2, subtasks.size());
1357             assertTrue(subtasks.get(0) == subtask1);
1358             assertTrue(subtasks.get(1) == subtask2);
1359             assertEquals("foo", subtask1.get());
1360             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1361         }
1362     }
1363 
1364     /**
1365      * Test Policy.all(Predicate) with cancellation after serveral subtasks completes.
1366      */
1367     @ParameterizedTest
1368     @MethodSource("factories")
1369     void testAllForked4(ThreadFactory factory) throws Exception {
1370 
1371         // cancel execution if 2 or more failures
1372         class AtMostTwoFailures<T> implements Predicate<Subtask<? extends T>> {
1373             final AtomicInteger failedCount = new AtomicInteger();
1374             @Override
1375             public boolean test(Subtask<? extends T> subtask) {
1376                 return subtask.state() == Subtask.State.FAILED
1377                         && failedCount.incrementAndGet() >= 2;
1378             }
1379         }
1380         var policy = Policy.all(new AtMostTwoFailures<String>());
1381 
1382         try (var scope = StructuredTaskScope.open(policy)) {
1383             int forkCount = 0;
1384 
1385             // fork subtasks until execution cancelled
1386             while (!scope.isCancelled()) {
1387                 scope.fork(() -> "foo");
1388                 scope.fork(() -> { throw new FooException(); });
1389                 forkCount += 2;
1390                 Thread.sleep(Duration.ofMillis(10));
1391             }
1392 
1393             var subtasks = scope.join().toList();
1394             assertEquals(forkCount, subtasks.size());
1395 
1396             long failedCount = subtasks.stream()
1397                     .filter(s -> s.state() == Subtask.State.FAILED)
1398                     .count();
1399             assertTrue(failedCount >= 2);
1400         }
1401     }
1402 
1403     /**
1404      * Test Config equals/hashCode/toString
1405      */
1406     @Test
1407     void testConfigMethods() throws Exception {
1408         Function<Config, Config> testConfig = cf -> {
1409             var name = "duke";
1410             var threadFactory = Thread.ofPlatform().factory();
1411             var timeout = Duration.ofSeconds(10);
1412 
1413             assertEquals(cf, cf);
1414             assertEquals(cf.withName(name), cf.withName(name));
1415             assertEquals(cf.withThreadFactory(threadFactory), cf.withThreadFactory(threadFactory));
1416             assertEquals(cf.withTimeout(timeout), cf.withTimeout(timeout));
1417 
1418             assertNotEquals(cf, cf.withName(name));
1419             assertNotEquals(cf, cf.withThreadFactory(threadFactory));
1420             assertNotEquals(cf, cf.withTimeout(timeout));
1421 
1422             assertEquals(cf.withName(name).hashCode(), cf.withName(name).hashCode());
1423             assertEquals(cf.withThreadFactory(threadFactory).hashCode(),
1424                     cf.withThreadFactory(threadFactory).hashCode());
1425             assertEquals(cf.withTimeout(timeout).hashCode(), cf.withTimeout(timeout).hashCode());
1426 
1427             assertTrue(cf.withName(name).toString().contains(name));
1428             assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
1429             assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
1430 
1431             return cf;
1432         };
1433         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(), testConfig)) {
1434             // do nothing
1435         }
1436     }
1437 
1438     /**
1439      * Test Policy's default methods.
1440      */
1441     @Test
1442     void testPolicyDefaultMethods() throws Exception {
1443         try (var scope = StructuredTaskScope.open(new CancelAfterOnePolicy<String>())) {
1444 
1445             // need subtasks to test default methods
1446             var subtask1 = scope.fork(() -> "foo");
1447             while (!scope.isCancelled()) {
1448                 Thread.sleep(20);
1449             }
1450             var subtask2 = scope.fork(() -> "bar");
1451             scope.join();
1452 
1453             assertEquals(Subtask.State.SUCCESS, subtask1.state());
1454             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1455 
1456             // Policy that does not override default methods
1457             Policy<Object, Void> policy = () -> null;
1458             assertThrows(NullPointerException.class, () -> policy.onFork(null));
1459             assertThrows(NullPointerException.class, () -> policy.onComplete(null));
1460             assertThrows(IllegalArgumentException.class, () -> policy.onFork(subtask1));
1461             assertFalse(policy.onFork(subtask2));
1462             assertFalse(policy.onComplete(subtask1));
1463             assertThrows(IllegalArgumentException.class, () -> policy.onComplete(subtask2));
1464         }
1465     }
1466 
1467     /**
1468      * Test for NullPointerException.
1469      */
1470     @Test
1471     void testNulls() throws Exception {
1472         assertThrows(NullPointerException.class,
1473                 () -> StructuredTaskScope.open(null));
1474         assertThrows(NullPointerException.class,
1475                 () -> StructuredTaskScope.open(null, cf -> cf));
1476         assertThrows(NullPointerException.class,
1477                 () -> StructuredTaskScope.open(Policy.ignoreAll(), null));
1478         assertThrows(NullPointerException.class,
1479                 () -> StructuredTaskScope.open(Policy.ignoreAll(), cf -> null));
1480 
1481         assertThrows(NullPointerException.class, () -> Policy.all(null));
1482 
1483         // fork
1484         try (var scope = StructuredTaskScope.open(Policy.ignoreAll())) {
1485             assertThrows(NullPointerException.class, () -> scope.fork((Callable<Object>) null));
1486             assertThrows(NullPointerException.class, () -> scope.fork((Runnable) null));
1487         }
1488 
1489         // withXXX
1490         assertThrows(NullPointerException.class,
1491                 () -> StructuredTaskScope.open(Policy.ignoreAll(), cf -> cf.withName(null)));
1492         assertThrows(NullPointerException.class,
1493                 () -> StructuredTaskScope.open(Policy.ignoreAll(), cf -> cf.withThreadFactory(null)));
1494         assertThrows(NullPointerException.class,
1495                 () -> StructuredTaskScope.open(Policy.ignoreAll(), cf -> cf.withTimeout(null)));
1496     }
1497 
1498     /**
1499      * ThreadFactory that counts usage.
1500      */
1501     private static class CountingThreadFactory implements ThreadFactory {
1502         final ThreadFactory delegate;
1503         final AtomicInteger threadCount = new AtomicInteger();
1504         CountingThreadFactory(ThreadFactory delegate) {
1505             this.delegate = delegate;
1506         }
1507         @Override
1508         public Thread newThread(Runnable task) {
1509             threadCount.incrementAndGet();
1510             return delegate.newThread(task);
1511         }
1512         int threadCount() {
1513             return threadCount.get();
1514         }
1515     }
1516 
1517     /**
1518      * Policy that cancels execution when a subtask completes.
1519      */
1520     private static class CancelAfterOnePolicy<T> implements Policy<T, Void> {
1521         final AtomicInteger onForkCount = new AtomicInteger();
1522         final AtomicInteger onCompleteCount = new AtomicInteger();
1523         @Override
1524         public boolean onFork(Subtask<? extends T> subtask) {
1525             onForkCount.incrementAndGet();
1526             return false;
1527         }
1528         @Override
1529         public boolean onComplete(Subtask<? extends T> subtask) {
1530             onCompleteCount.incrementAndGet();
1531             return true;
1532         }
1533         @Override
1534         public Void result() {
1535             return null;
1536         }
1537         int onForkCount() {
1538             return onForkCount.get();
1539         }
1540         int onCompleteCount() {
1541             return onCompleteCount.get();
1542         }
1543     };
1544 
1545     /**
1546      * A runtime exception for tests.
1547      */
1548     private static class FooException extends RuntimeException {
1549         FooException() { }
1550         FooException(Throwable cause) { super(cause); }
1551     }
1552 
1553     /**
1554      * Returns the current time in milliseconds.
1555      */
1556     private long millisTime() {
1557         long now = System.nanoTime();
1558         return TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);
1559     }
1560 
1561     /**
1562      * Check the duration of a task
1563      * @param start start time, in milliseconds
1564      * @param min minimum expected duration, in milliseconds
1565      * @param max maximum expected duration, in milliseconds
1566      * @return the duration (now - start), in milliseconds
1567      */
1568     private long expectDuration(long start, long min, long max) {
1569         long duration = millisTime() - start;
1570         assertTrue(duration >= min,
1571                 "Duration " + duration + "ms, expected >= " + min + "ms");
1572         assertTrue(duration <= max,
1573                 "Duration " + duration + "ms, expected <= " + max + "ms");
1574         return duration;
1575     }
1576 
1577     /**
1578      * Interrupts a thread when it waits (timed or untimed) at location "{@code c.m}".
1579      * {@code c} is the fully qualified class name and {@code m} is the method name.
1580      */
1581     private void interruptThreadAt(Thread target, String location) throws InterruptedException {
1582         int index = location.lastIndexOf('.');
1583         String className = location.substring(0, index);
1584         String methodName = location.substring(index + 1);
1585 
1586         boolean found = false;
1587         while (!found) {
1588             Thread.State state = target.getState();
1589             assertTrue(state != TERMINATED);
1590             if ((state == WAITING || state == TIMED_WAITING)
1591                     && contains(target.getStackTrace(), className, methodName)) {
1592                 found = true;
1593             } else {
1594                 Thread.sleep(20);
1595             }
1596         }
1597         target.interrupt();
1598     }
1599 
1600     /**
1601      * Schedules the current thread to be interrupted when it waits (timed or untimed)
1602      * at the given location.
1603      */
1604     private void scheduleInterruptAt(String location) {
1605         Thread target = Thread.currentThread();
1606         scheduler.submit(() -> {
1607             interruptThreadAt(target, location);
1608             return null;
1609         });
1610     }
1611 
1612     /**
1613      * Returns true if the given stack trace contains an element for the given class
1614      * and method name.
1615      */
1616     private boolean contains(StackTraceElement[] stack, String className, String methodName) {
1617         return Arrays.stream(stack)
1618                 .anyMatch(e -> className.equals(e.getClassName())
1619                         && methodName.equals(e.getMethodName()));
1620     }
1621 }