< prev index next >

test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java

Print this page

   1 /*
   2  * Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /*
  25  * @test id=platform
  26  * @bug 8284199 8296779 8306647
  27  * @summary Basic tests for StructuredTaskScope
  28  * @enablePreview
  29  * @run junit/othervm -DthreadFactory=platform StructuredTaskScopeTest
  30  */
  31 
  32 /*
  33  * @test id=virtual
  34  * @enablePreview
  35  * @run junit/othervm -DthreadFactory=virtual StructuredTaskScopeTest
  36  */
  37 
  38 import java.time.Duration;
  39 import java.io.IOException;
  40 import java.time.Instant;
  41 import java.util.Arrays;
  42 import java.util.ArrayList;
  43 import java.util.List;
  44 import java.util.Set;

  45 import java.util.concurrent.Callable;
  46 import java.util.concurrent.ConcurrentHashMap;
  47 import java.util.concurrent.CountDownLatch;
  48 import java.util.concurrent.Executors;
  49 import java.util.concurrent.ExecutionException;
  50 import java.util.concurrent.Future;

  51 import java.util.concurrent.ThreadFactory;
  52 import java.util.concurrent.TimeoutException;
  53 import java.util.concurrent.TimeUnit;
  54 import java.util.concurrent.RejectedExecutionException;
  55 import java.util.concurrent.ScheduledExecutorService;
  56 import java.util.concurrent.StructuredTaskScope;
  57 import java.util.concurrent.StructuredTaskScope.Subtask;
  58 import java.util.concurrent.StructuredTaskScope.ShutdownOnSuccess;
  59 import java.util.concurrent.StructuredTaskScope.ShutdownOnFailure;
  60 import java.util.concurrent.StructureViolationException;
  61 import java.util.concurrent.atomic.AtomicBoolean;
  62 import java.util.concurrent.atomic.AtomicInteger;
  63 import java.util.concurrent.atomic.AtomicReference;
  64 import java.util.function.Supplier;
  65 import java.util.stream.Stream;
  66 import static java.lang.Thread.State.*;
  67 
  68 import org.junit.jupiter.api.Test;
  69 import org.junit.jupiter.api.BeforeAll;
  70 import org.junit.jupiter.api.AfterAll;
  71 import org.junit.jupiter.params.ParameterizedTest;
  72 import org.junit.jupiter.params.provider.MethodSource;
  73 import static org.junit.jupiter.api.Assertions.*;
  74 
  75 class StructuredTaskScopeTest {
  76     private static ScheduledExecutorService scheduler;
  77     private static List<ThreadFactory> threadFactories;
  78 
  79     @BeforeAll
  80     static void setup() throws Exception {
  81         scheduler = Executors.newSingleThreadScheduledExecutor();
  82 
  83         // thread factories
  84         String value = System.getProperty("threadFactory");
  85         List<ThreadFactory> list = new ArrayList<>();
  86         if (value == null || value.equals("platform"))
  87             list.add(Thread.ofPlatform().factory());
  88         if (value == null || value.equals("virtual"))
  89             list.add(Thread.ofVirtual().factory());
  90         assertTrue(list.size() > 0, "No thread factories for tests");
  91         threadFactories = list;
  92     }
  93 
  94     @AfterAll
  95     static void shutdown() {
  96         scheduler.shutdown();
  97     }
  98 
  99     private static Stream<ThreadFactory> factories() {
 100         return threadFactories.stream();
 101     }
 102 
 103     /**
 104      * Test that fork creates a new thread for each task.
 105      */
 106     @ParameterizedTest
 107     @MethodSource("factories")
 108     void testForkCreatesThread(ThreadFactory factory) throws Exception {
 109         Set<Long> tids = ConcurrentHashMap.newKeySet();
 110         try (var scope = new StructuredTaskScope<Object>(null, factory)) {
 111             for (int i = 0; i < 100; i++) {
 112                 scope.fork(() -> {
 113                     tids.add(Thread.currentThread().threadId());
 114                     return null;
 115                 });
 116             }
 117             scope.join();
 118         }
 119         assertEquals(100, tids.size());
 120     }
 121 
 122     /**
 123      * Test that fork creates a new virtual thread for each task.
 124      */
 125     @Test
 126     void testForkCreateVirtualThread() throws Exception {
 127         Set<Thread> threads = ConcurrentHashMap.newKeySet();
 128         try (var scope = new StructuredTaskScope<Object>()) {
 129             for (int i = 0; i < 100; i++) {






 130                 scope.fork(() -> {
 131                     threads.add(Thread.currentThread());
 132                     return null;
 133                 });
 134             }
 135             scope.join();
 136         }
 137         assertEquals(100, threads.size());
 138         threads.forEach(t -> assertTrue(t.isVirtual()));
 139     }
 140 
 141     /**
 142      * Test that fork creates a new thread with the given thread factory.
 143      */
 144     @ParameterizedTest
 145     @MethodSource("factories")
 146     void testForkUsesFactory(ThreadFactory factory) throws Exception {
 147         var count = new AtomicInteger();
 148         ThreadFactory countingFactory = task -> {
 149             count.incrementAndGet();
 150             return factory.newThread(task);
 151         };
 152         try (var scope = new StructuredTaskScope<Object>(null, countingFactory)) {
 153             for (int i = 0; i < 100; i++) {
 154                 scope.fork(() -> null);

























 155             }
 156             scope.join();
 157         }
 158         assertEquals(100, count.get());

 159     }
 160 
 161     /**
 162      * Test fork is confined to threads in the scope "tree".
 163      */
 164     @ParameterizedTest
 165     @MethodSource("factories")
 166     void testForkConfined(ThreadFactory factory) throws Exception {
 167         try (var scope1 = new StructuredTaskScope<Boolean>();
 168              var scope2 = new StructuredTaskScope<Boolean>()) {
 169 
 170             // thread in scope1 cannot fork thread in scope2
 171             Subtask<Boolean> subtask1 = scope1.fork(() -> {
 172                 assertThrows(WrongThreadException.class, () -> {
 173                     scope2.fork(() -> null);
 174                 });
 175                 return true;
 176             });
 177 
 178             // thread in scope2 can fork thread in scope1
 179             Subtask<Boolean> subtask2 = scope2.fork(() -> {
 180                 scope1.fork(() -> null);
 181                 return true;
 182             });
 183 
 184             scope2.join();
 185             scope1.join();
 186 
 187             assertTrue(subtask1.get());
 188             assertTrue(subtask2.get());
 189 
 190             // random thread cannot fork
 191             try (var pool = Executors.newSingleThreadExecutor()) {
 192                 Future<Void> future = pool.submit(() -> {
 193                     assertThrows(WrongThreadException.class, () -> {
 194                         scope1.fork(() -> null);
 195                     });
 196                     assertThrows(WrongThreadException.class, () -> {
 197                         scope2.fork(() -> null);
 198                     });
 199                     return null;
 200                 });
 201                 future.get();
 202             }
 203         }
 204     }
 205 
 206     /**
 207      * Test fork after join completes.
 208      */
 209     @ParameterizedTest
 210     @MethodSource("factories")
 211     void testForkAfterJoin(ThreadFactory factory) throws Exception {
 212         try (var scope = new StructuredTaskScope<String>(null, factory)) {
 213             // round 1
 214             var subtask1 = scope.fork(() -> "foo");
 215             assertThrows(IllegalStateException.class, subtask1::get);
 216             scope.join();
 217             assertEquals("foo", subtask1.get());
 218 
 219             // round 2
 220             var subtask2 = scope.fork(() -> "bar");
 221             assertEquals("foo", subtask1.get());
 222             assertThrows(IllegalStateException.class, subtask2::get);
 223             scope.join();
 224             assertEquals("foo", subtask1.get());
 225             assertEquals("bar", subtask2.get());

 226 
 227             // round 3
 228             var subtask3 = scope.fork(() -> "baz");
 229             assertEquals("foo", subtask1.get());
 230             assertEquals("bar", subtask2.get());
 231             assertThrows(IllegalStateException.class, subtask3::get);




 232             scope.join();
 233             assertEquals("foo", subtask1.get());
 234             assertEquals("bar", subtask2.get());
 235             assertEquals("baz", subtask3.get());
 236         }
 237     }
 238 
 239     /**
 240      * Test fork after join throws.
 241      */
 242     @ParameterizedTest
 243     @MethodSource("factories")
 244     void testForkAfterJoinThrows(ThreadFactory factory) throws Exception {
 245         try (var scope = new StructuredTaskScope<String>(null, factory)) {

 246             var latch = new CountDownLatch(1);
 247             var subtask1 = scope.fork(() -> {
 248                 latch.await();
 249                 return "foo";
 250             });
 251 
 252             // join throws
 253             Thread.currentThread().interrupt();
 254             assertThrows(InterruptedException.class, scope::join);
 255 
 256             // allow subtask1 to finish
 257             latch.countDown();
 258 
 259             // continue to fork
 260             var subtask2 = scope.fork(() -> "bar");
 261             assertThrows(IllegalStateException.class, subtask1::get);
 262             assertThrows(IllegalStateException.class, subtask2::get);
 263             scope.join();
 264             assertEquals("foo", subtask1.get());
 265             assertEquals("bar", subtask2.get());
 266         }
 267     }
 268 
 269     /**
 270      * Test fork after scope is shutdown.
 271      */
 272     @ParameterizedTest
 273     @MethodSource("factories")
 274     void testForkAfterShutdown(ThreadFactory factory) throws Exception {
 275         var executed = new AtomicBoolean();
 276         try (var scope = new StructuredTaskScope<Object>(null, factory)) {
 277             scope.shutdown();
 278             Subtask<String> subtask = scope.fork(() -> {
 279                 executed.set(true);
 280                 return null;
 281             });

















 282             scope.join();
 283             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 284             assertThrows(IllegalStateException.class, subtask::get);
 285             assertThrows(IllegalStateException.class, subtask::exception);



 286         }
 287         assertFalse(executed.get());
 288     }
 289 
 290     /**
 291      * Test fork after scope is closed.
 292      */
 293     @ParameterizedTest
 294     @MethodSource("factories")
 295     void testForkAfterClose(ThreadFactory factory) throws Exception {
 296         try (var scope = new StructuredTaskScope<Object>(null, factory)) {

 297             scope.close();
 298             assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
 299         }
 300     }
 301 
 302     /**
 303      * Test fork when the thread factory rejects creating a thread.
 304      */
 305     @Test
 306     void testForkRejectedExecutionException() throws Exception {
 307         ThreadFactory factory = task -> null;
 308         try (var scope = new StructuredTaskScope(null, factory)) {

 309             assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null));
 310             scope.join();
 311         }
 312     }
 313 
 314     /**
 315      * Test join with no subtasks.
 316      */
 317     @Test
 318     void testJoinWithNoSubtasks() throws Exception {
 319         try (var scope = new StructuredTaskScope()) {
 320             scope.join();
 321         }
 322     }
 323 
 324     /**
 325      * Test join with unfinished subtasks.
 326      */
 327     @ParameterizedTest
 328     @MethodSource("factories")
 329     void testJoinWithSubtasks(ThreadFactory factory) throws Exception {
 330         try (var scope = new StructuredTaskScope(null, factory)) {

 331             Subtask<String> subtask = scope.fork(() -> {
 332                 Thread.sleep(Duration.ofMillis(50));
 333                 return "foo";
 334             });
 335             scope.join();
 336             assertEquals("foo", subtask.get());
 337         }
 338     }
 339 

















 340     /**
 341      * Test join is owner confined.
 342      */
 343     @ParameterizedTest
 344     @MethodSource("factories")
 345     void testJoinConfined(ThreadFactory factory) throws Exception {
 346         try (var scope = new StructuredTaskScope<Boolean>()) {

 347 
 348             // thread in scope cannot join
 349             Subtask<Boolean> subtask = scope.fork(() -> {
 350                 assertThrows(WrongThreadException.class, () -> { scope.join(); });
 351                 return true;
 352             });
 353 
 354             scope.join();
 355 
 356             assertTrue(subtask.get());
 357 
 358             // random thread cannot join
 359             try (var pool = Executors.newSingleThreadExecutor()) {
 360                 Future<Void> future = pool.submit(() -> {
 361                     assertThrows(WrongThreadException.class, scope::join);
 362                     return null;
 363                 });
 364                 future.get();
 365             }
 366         }
 367     }
 368 
 369     /**
 370      * Test join with interrupt status set.
 371      */
 372     @ParameterizedTest
 373     @MethodSource("factories")
 374     void testInterruptJoin1(ThreadFactory factory) throws Exception {
 375         try (var scope = new StructuredTaskScope(null, factory)) {


 376             var latch = new CountDownLatch(1);
 377 
 378             Subtask<String> subtask = scope.fork(() -> {
 379                 latch.await();
 380                 return "foo";
 381             });
 382 
 383             // join should throw
 384             Thread.currentThread().interrupt();
 385             try {
 386                 scope.join();
 387                 fail("join did not throw");
 388             } catch (InterruptedException expected) {
 389                 assertFalse(Thread.interrupted());   // interrupt status should be clear
 390             } finally {
 391                 // let task continue
 392                 latch.countDown();
 393             }
 394 
 395             // join should complete
 396             scope.join();
 397             assertEquals("foo", subtask.get());
 398         }
 399     }
 400 
 401     /**
 402      * Test interrupt of thread blocked in join.
 403      */
 404     @ParameterizedTest
 405     @MethodSource("factories")
 406     void testInterruptJoin2(ThreadFactory factory) throws Exception {
 407         try (var scope = new StructuredTaskScope(null, factory)) {


 408             var latch = new CountDownLatch(1);
 409             Subtask<String> subtask = scope.fork(() -> {
 410                 latch.await();
 411                 return "foo";
 412             });
 413 
 414             // join should throw
 415             scheduleInterruptAt("java.util.concurrent.StructuredTaskScope.join");
 416             try {
 417                 scope.join();
 418                 fail("join did not throw");
 419             } catch (InterruptedException expected) {
 420                 assertFalse(Thread.interrupted());   // interrupt status should be clear
 421             } finally {
 422                 // let task continue
 423                 latch.countDown();
 424             }
 425 
 426             // join should complete
 427             scope.join();
 428             assertEquals("foo", subtask.get());
 429         }
 430     }
 431 
 432     /**
 433      * Test join when scope is shutdown.
 434      */
 435     @ParameterizedTest
 436     @MethodSource("factories")
 437     void testJoinWithShutdown1(ThreadFactory factory) throws Exception {
 438         try (var scope = new StructuredTaskScope<String>(null, factory)) {
 439             var interrupted = new CountDownLatch(1);
 440             var finish = new CountDownLatch(1);
 441 
 442             Subtask<String> subtask = scope.fork(() -> {
 443                 try {
 444                     Thread.sleep(Duration.ofDays(1));
 445                 } catch (InterruptedException e) {
 446                     interrupted.countDown();
 447                 }
 448                 finish.await();
 449                 return "foo";
 450             });
 451 
 452             scope.shutdown();      // should interrupt task
 453 
 454             interrupted.await();
 455 
 456             scope.join();
 457 
 458             // signal task to finish
 459             finish.countDown();
 460         }
 461     }
 462 
 463     /**
 464      * Test shutdown when owner is blocked in join.
 465      */
 466     @ParameterizedTest
 467     @MethodSource("factories")
 468     void testJoinWithShutdown2(ThreadFactory factory) throws Exception {
 469         class MyScope<T> extends StructuredTaskScope<T> {
 470             MyScope(ThreadFactory factory) {
 471                 super(null, factory);
 472             }
 473             @Override
 474             protected void handleComplete(Subtask<? extends T> subtask) {
 475                 shutdown();
 476             }
 477         }
 478 
 479         try (var scope = new MyScope<String>(factory)) {
 480             Subtask<String> subtask1 = scope.fork(() -> {
 481                 Thread.sleep(Duration.ofMillis(50));
 482                 return "foo";
 483             });
 484             Subtask<String> subtask2 = scope.fork(() -> {
 485                 Thread.sleep(Duration.ofDays(1));
 486                 return "bar";
 487             });
 488 
 489             // join should wakeup when shutdown is called
 490             scope.join();
 491 
 492             // task1 should have completed successfully
 493             assertEquals(Subtask.State.SUCCESS, subtask1.state());
 494             assertEquals("foo", subtask1.get());
 495             assertThrows(IllegalStateException.class, subtask1::exception);
 496 
 497             // task2 result/exception not available
 498             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
 499             assertThrows(IllegalStateException.class, subtask2::get);
 500             assertThrows(IllegalStateException.class, subtask2::exception);
 501         }
 502     }
 503 
 504     /**
 505      * Test join after scope is closed.
 506      */
 507     @Test
 508     void testJoinAfterClose() throws Exception {
 509         try (var scope = new StructuredTaskScope()) {
 510             scope.join();
 511             scope.close();
 512             assertThrows(IllegalStateException.class, () -> scope.join());
 513             assertThrows(IllegalStateException.class, () -> scope.joinUntil(Instant.now()));
 514         }
 515     }
 516 
 517     /**
 518      * Test joinUntil, subtasks finish before deadline expires.
 519      */
 520     @ParameterizedTest
 521     @MethodSource("factories")
 522     void testJoinUntil1(ThreadFactory factory) throws Exception {
 523         try (var scope = new StructuredTaskScope<String>(null, factory)) {



 524             Subtask<String> subtask = scope.fork(() -> {
 525                 try {
 526                     Thread.sleep(Duration.ofSeconds(2));
 527                 } catch (InterruptedException e) { }
 528                 return "foo";
 529             });
 530 
 531             long startMillis = millisTime();
 532             scope.joinUntil(Instant.now().plusSeconds(30));
 533             expectDuration(startMillis, /*min*/1900, /*max*/20_000);
 534             assertEquals("foo", subtask.get());
 535         }
 536     }
 537 
 538     /**
 539      * Test joinUntil, deadline expires before subtasks finish.
 540      */
 541     @ParameterizedTest
 542     @MethodSource("factories")
 543     void testJoinUntil2(ThreadFactory factory) throws Exception {
 544         try (var scope = new StructuredTaskScope<Object>(null, factory)) {




 545             Subtask<Void> subtask = scope.fork(() -> {
 546                 Thread.sleep(Duration.ofDays(1));
 547                 return null;
 548             });
 549 
 550             long startMillis = millisTime();
 551             try {
 552                 scope.joinUntil(Instant.now().plusSeconds(2));
 553             } catch (TimeoutException e) {
 554                 expectDuration(startMillis, /*min*/1900, /*max*/20_000);

 555             }



 556             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 557         }
 558     }
 559 
 560     /**
 561      * Test joinUntil many times.
 562      */
 563     @ParameterizedTest
 564     @MethodSource("factories")
 565     void testJoinUntil3(ThreadFactory factory) throws Exception {
 566         try (var scope = new StructuredTaskScope<String>(null, factory)) {
 567             Subtask<String> subtask = scope.fork(() -> {
 568                 Thread.sleep(Duration.ofDays(1));
 569                 return null;
 570             });
 571 
 572             for (int i = 0; i < 3; i++) {
 573                 try {
 574                     scope.joinUntil(Instant.now().plusMillis(50));
 575                     fail("joinUntil did not throw");
 576                 } catch (TimeoutException expected) {
 577                     assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 578                 }
 579             }
 580         }
 581     }
 582 
 583     /**
 584      * Test joinUntil with a deadline that has already expired.
 585      */
 586     @ParameterizedTest
 587     @MethodSource("factories")
 588     void testJoinUntil4(ThreadFactory factory) throws Exception {
 589         try (var scope = new StructuredTaskScope<Object>(null, factory)) {
 590             Subtask<Void> subtask = scope.fork(() -> {
 591                 Thread.sleep(Duration.ofDays(1));
 592                 return null;
 593             });
 594 
 595             // now
 596             try {
 597                 scope.joinUntil(Instant.now());
 598                 fail("joinUntil did not throw");
 599             } catch (TimeoutException expected) {
 600                 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 601             }
 602 
 603             // in the past
 604             try {
 605                 scope.joinUntil(Instant.now().minusSeconds(1));
 606                 fail("joinUntil did not throw");
 607             } catch (TimeoutException expected) {
 608                 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 609             }


 610         }
 611     }
 612 
 613     /**
 614      * Test joinUntil with interrupt status set.
 615      */
 616     @ParameterizedTest
 617     @MethodSource("factories")
 618     void testInterruptJoinUntil1(ThreadFactory factory) throws Exception {
 619         try (var scope = new StructuredTaskScope<String>(null, factory)) {
 620             var latch = new CountDownLatch(1);
 621 
 622             Subtask<String> subtask = scope.fork(() -> {
 623                 latch.await();
 624                 return "foo";










 625             });

 626 
 627             // joinUntil should throw
 628             Thread.currentThread().interrupt();
 629             try {
 630                 scope.joinUntil(Instant.now().plusSeconds(30));
 631                 fail("joinUntil did not throw");
 632             } catch (InterruptedException expected) {
 633                 assertFalse(Thread.interrupted());   // interrupt status should be clear
 634             } finally {
 635                 // let task continue
 636                 latch.countDown();
 637             }
 638 
 639             // join should complete


 640             scope.join();
 641             assertEquals("foo", subtask.get());


 642         }
 643     }
 644 
 645     /**
 646      * Test interrupt of thread blocked in joinUntil.
 647      */
 648     @ParameterizedTest
 649     @MethodSource("factories")
 650     void testInterruptJoinUntil2(ThreadFactory factory) throws Exception {
 651         try (var scope = new StructuredTaskScope(null, factory)) {
 652             var latch = new CountDownLatch(1);

 653 
 654             Subtask<String> subtask = scope.fork(() -> {
 655                 latch.await();
 656                 return "foo";







 657             });
 658 
 659             // joinUntil should throw
 660             scheduleInterruptAt("java.util.concurrent.StructuredTaskScope.joinUntil");







 661             try {
 662                 scope.joinUntil(Instant.now().plusSeconds(30));
 663                 fail("joinUntil did not throw");
 664             } catch (InterruptedException expected) {
 665                 assertFalse(Thread.interrupted());   // interrupt status should be clear
 666             } finally {
 667                 // let task continue
 668                 latch.countDown();
 669             }



 670 
 671             // join should complete
 672             scope.join();
 673             assertEquals("foo", subtask.get());




 674         }
 675     }
 676 
 677     /**
 678      * Test that shutdown interrupts unfinished subtasks.
 679      */
 680     @ParameterizedTest
 681     @MethodSource("factories")
 682     void testShutdownInterruptsThreads1(ThreadFactory factory) throws Exception {
 683         try (var scope = new StructuredTaskScope<Object>(null, factory)) {
 684             var interrupted = new AtomicBoolean();
 685             var latch = new CountDownLatch(1);
 686             var subtask = scope.fork(() -> {
 687                 try {
 688                     Thread.sleep(Duration.ofDays(1));
 689                 } catch (InterruptedException e) {
 690                     interrupted.set(true);
 691                 } finally {
 692                     latch.countDown();
 693                 }
 694                 return null;
 695             });
 696 
 697             scope.shutdown();
 698 
 699             // wait for task to complete
 700             latch.await();
 701             assertTrue(interrupted.get());
 702 
 703             scope.join();
 704 
 705             // subtask result/exception not available
 706             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 707             assertThrows(IllegalStateException.class, subtask::get);
 708             assertThrows(IllegalStateException.class, subtask::exception);
 709         }
 710     }
 711 
 712     /**
 713      * Test that shutdown does not interrupt current thread.
 714      */
 715     @ParameterizedTest
 716     @MethodSource("factories")
 717     void testShutdownInterruptsThreads2(ThreadFactory factory) throws Exception {
 718         try (var scope = new StructuredTaskScope<Object>(null, factory)) {
 719             var interrupted = new AtomicBoolean();
 720             var latch = new CountDownLatch(1);
 721             var subtask = scope.fork(() -> {
 722                 try {
 723                     scope.shutdown();
 724                     interrupted.set(Thread.currentThread().isInterrupted());
 725                 } finally {
 726                     latch.countDown();
 727                 }
 728                 return null;
 729             });
 730 
 731             // wait for task to complete
 732             latch.await();
 733             assertFalse(interrupted.get());

 734 
 735             scope.join();
 736         }
 737     }
 738 
 739     /**
 740      * Test shutdown wakes join.
 741      */
 742     @ParameterizedTest
 743     @MethodSource("factories")
 744     void testShutdownWakesJoin(ThreadFactory factory) throws Exception {
 745         try (var scope = new StructuredTaskScope<Object>(null, factory)) {
 746             var latch = new CountDownLatch(1);
 747             scope.fork(() -> {
 748                 Thread.sleep(Duration.ofMillis(100));  // give time for join to block
 749                 scope.shutdown();
 750                 latch.await();
 751                 return null;
 752             });
 753 
 754             scope.join();
 755 
 756             // join woke up, allow task to complete
 757             latch.countDown();
 758         }
 759     }
 760 
 761     /**
 762      * Test shutdown after scope is closed.
 763      */
 764     @Test
 765     void testShutdownAfterClose() throws Exception {
 766         try (var scope = new StructuredTaskScope<Object>()) {
 767             scope.join();
 768             scope.close();
 769             assertThrows(IllegalStateException.class, scope::shutdown);
 770         }
 771     }
 772 
 773     /**
 774      * Test shutdown is confined to threads in the scope "tree".
 775      */
 776     @ParameterizedTest
 777     @MethodSource("factories")
 778     void testShutdownConfined(ThreadFactory factory) throws Exception {
 779         try (var scope1 = new StructuredTaskScope<Boolean>();
 780              var scope2 = new StructuredTaskScope<Boolean>()) {
 781 
 782             // thread in scope1 cannot shutdown scope2
 783             Subtask<Boolean> subtask1 = scope1.fork(() -> {
 784                 assertThrows(WrongThreadException.class, scope2::shutdown);
 785                 return true;
 786             });
 787 
 788             // wait for task in scope1 to complete to avoid racing with task in scope2
 789             while (subtask1.state() == Subtask.State.UNAVAILABLE) {
 790                 Thread.sleep(10);
 791             }
 792 
 793             // thread in scope2 shutdown scope1
 794             Subtask<Boolean> subtask2 = scope2.fork(() -> {
 795                 scope1.shutdown();
 796                 return true;
 797             });
 798 
 799             scope2.join();
 800             scope1.join();
 801 
 802             assertTrue(subtask1.get());
 803             assertTrue(subtask1.get());
 804 
 805             // random thread cannot shutdown
 806             try (var pool = Executors.newSingleThreadExecutor()) {
 807                 Future<Void> future = pool.submit(() -> {
 808                     assertThrows(WrongThreadException.class, scope1::shutdown);
 809                     assertThrows(WrongThreadException.class, scope2::shutdown);
 810                     return null;
 811                 });
 812                 future.get();
 813             }
 814         }
 815     }
 816 
 817     /**
 818      * Test isShutdown.
 819      */
 820     @Test
 821     void testIsShutdown() {
 822         try (var scope = new StructuredTaskScope<Object>()) {
 823             assertFalse(scope.isShutdown());   // before shutdown
 824             scope.shutdown();
 825             assertTrue(scope.isShutdown());    // after shutdown
 826             scope.close();
 827             assertTrue(scope.isShutdown());    // after cose
 828         }
 829     }
 830 
 831     /**
 832      * Test close without join, no subtasks forked.
 833      */
 834     @Test
 835     void testCloseWithoutJoin1() {
 836         try (var scope = new StructuredTaskScope<Object>()) {
 837             // do nothing
 838         }
 839     }
 840 
 841     /**
 842      * Test close without join, unfinished subtasks.
 843      */
 844     @ParameterizedTest
 845     @MethodSource("factories")
 846     void testCloseWithoutJoin2(ThreadFactory factory) {
 847         try (var scope = new StructuredTaskScope<String>(null, factory)) {
 848             Subtask<String> subtask = scope.fork(() -> {
 849                 Thread.sleep(Duration.ofDays(1));
 850                 return null;
 851             });
 852             assertThrows(IllegalStateException.class, scope::close);
 853 
 854             // subtask result/exception not available
 855             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 856             assertThrows(IllegalStateException.class, subtask::get);
 857             assertThrows(IllegalStateException.class, subtask::exception);
 858         }
 859     }
 860 
 861     /**
 862      * Test close without join, unfinished subtasks forked after join.
 863      */
 864     @ParameterizedTest
 865     @MethodSource("factories")
 866     void testCloseWithoutJoin3(ThreadFactory factory) throws Exception {
 867         try (var scope = new StructuredTaskScope(null, factory)) {
 868             scope.fork(() -> "foo");
 869             scope.join();
 870 
 871             Subtask<String> subtask = scope.fork(() -> {
 872                 Thread.sleep(Duration.ofDays(1));
 873                 return null;
 874             });
 875             assertThrows(IllegalStateException.class, scope::close);
 876 
 877             // subtask result/exception not available
 878             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 879             assertThrows(IllegalStateException.class, subtask::get);
 880             assertThrows(IllegalStateException.class, subtask::exception);
 881         }
 882     }
 883 
 884     /**
 885      * Test close after join throws. Close should not throw as join attempted.
 886      */
 887     @ParameterizedTest
 888     @MethodSource("factories")
 889     void testCloseAfterJoinThrows(ThreadFactory factory) throws Exception {
 890         try (var scope = new StructuredTaskScope<Object>()) {
 891             var subtask = scope.fork(() -> {
 892                 Thread.sleep(Duration.ofDays(1));
 893                 return null;
 894             });
 895 
 896             // join throws
 897             Thread.currentThread().interrupt();
 898             assertThrows(InterruptedException.class, scope::join);
 899             assertThrows(IllegalStateException.class, subtask::get);
 900         }
 901     }
 902 
 903     /**
 904      * Test close after joinUntil throws. Close should not throw as join attempted.
 905      */
 906     @ParameterizedTest
 907     @MethodSource("factories")
 908     void testCloseAfterJoinUntilThrows(ThreadFactory factory) throws Exception {
 909         try (var scope = new StructuredTaskScope<Object>()) {
 910             var subtask = scope.fork(() -> {
 911                 Thread.sleep(Duration.ofDays(1));
 912                 return null;
 913             });
 914 
 915             // joinUntil throws
 916             assertThrows(TimeoutException.class, () -> scope.joinUntil(Instant.now()));
 917             assertThrows(IllegalStateException.class, subtask::get);
 918         }
 919     }
 920 
 921     /**
 922      * Test close is owner confined.
 923      */
 924     @ParameterizedTest
 925     @MethodSource("factories")
 926     void testCloseConfined(ThreadFactory factory) throws Exception {
 927         try (var scope = new StructuredTaskScope<Boolean>()) {
 928 
 929             // attempt to close from thread in scope
 930             Subtask<Boolean> subtask = scope.fork(() -> {
 931                 assertThrows(WrongThreadException.class, scope::close);
 932                 return true;
 933             });
 934 
 935             scope.join();
 936             assertTrue(subtask.get());
 937 
 938             // random thread cannot close scope
 939             try (var pool = Executors.newCachedThreadPool(factory)) {
 940                 Future<Boolean> future = pool.submit(() -> {
 941                     assertThrows(WrongThreadException.class, scope::close);
 942                     return null;
 943                 });
 944                 future.get();
 945             }
 946         }
 947     }
 948 
 949     /**
 950      * Test close with interrupt status set.
 951      */
 952     @ParameterizedTest
 953     @MethodSource("factories")
 954     void testInterruptClose1(ThreadFactory factory) throws Exception {
 955         try (var scope = new StructuredTaskScope<Object>(null, factory)) {





 956             var done = new AtomicBoolean();
 957             scope.fork(() -> {

 958                 try {
 959                     Thread.sleep(Duration.ofDays(1));
 960                 } catch (InterruptedException e) {
 961                     // interrupted by shutdown, expected
 962                 }
 963                 Thread.sleep(Duration.ofMillis(100)); // force close to wait
 964                 done.set(true);
 965                 return null;
 966             });







 967 
 968             scope.shutdown();
 969             scope.join();
 970 
 971             // invoke close with interrupt status set
 972             Thread.currentThread().interrupt();
 973             try {
 974                 scope.close();
 975             } finally {
 976                 assertTrue(Thread.interrupted());   // clear interrupt status
 977                 assertTrue(done.get());
 978             }
 979         }
 980     }
 981 
 982     /**
 983      * Test interrupting thread waiting in close.
 984      */
 985     @ParameterizedTest
 986     @MethodSource("factories")
 987     void testInterruptClose2(ThreadFactory factory) throws Exception {
 988         try (var scope = new StructuredTaskScope<Object>(null, factory)) {
 989             var done = new AtomicBoolean();


 990             Thread mainThread = Thread.currentThread();




 991             scope.fork(() -> {

 992                 try {
 993                     Thread.sleep(Duration.ofDays(1));
 994                 } catch (InterruptedException e) {
 995                     // interrupted by shutdown, expected
 996                 }
 997 
 998                 // interrupt main thread when it blocks in close
 999                 interruptThreadAt(mainThread, "java.util.concurrent.StructuredTaskScope.close");
1000 
1001                 Thread.sleep(Duration.ofMillis(100)); // force close to wait
1002                 done.set(true);
1003                 return null;
1004             });







1005 
1006             scope.shutdown();   // interrupts task
1007             scope.join();


1008             try {
1009                 scope.close();
1010             } finally {
1011                 assertTrue(Thread.interrupted()); // clear interrupt status
1012                 assertTrue(done.get());
1013             }
1014         }
1015     }
1016 
1017     /**
1018      * Test that closing an enclosing scope closes the thread flock of a nested scope.
1019      */
1020     @Test
1021     void testCloseThrowsStructureViolation() throws Exception {
1022         try (var scope1 = new StructuredTaskScope<Object>()) {
1023             try (var scope2 = new StructuredTaskScope<Object>()) {
1024 
1025                 // join + close enclosing scope
1026                 scope1.join();
1027                 try {
1028                     scope1.close();
1029                     fail("close did not throw");
1030                 } catch (StructureViolationException expected) { }
1031 
1032                 // underlying flock should be closed, fork should return a cancelled task
1033                 var executed = new AtomicBoolean();
1034                 Subtask<Void> subtask = scope2.fork(() -> {
1035                     executed.set(true);
1036                     return null;
1037                 });
1038                 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1039                 scope2.join();
1040                 assertFalse(executed.get());

1041             }
1042         }
1043     }
1044 
1045     /**
1046      * A StructuredTaskScope that collects the subtasks notified to the handleComplete method.
1047      */
1048     private static class CollectAll<T> extends StructuredTaskScope<T> {
1049         private final Set<Subtask<? extends T>> subtasks = ConcurrentHashMap.newKeySet();
1050 
1051         CollectAll(ThreadFactory factory) {
1052             super(null, factory);
1053         }
1054 
1055         @Override
1056         protected void handleComplete(Subtask<? extends T> subtask) {
1057             subtasks.add(subtask);
1058         }
1059 
1060         Set<Subtask<? extends T>> subtasks() {
1061             return subtasks;
1062         }
1063 
1064         Subtask<? extends T> find(Callable<T> task) {
1065             return subtasks.stream()
1066                     .filter(h -> task.equals(h.task()))
1067                     .findAny()
1068                     .orElseThrow();
1069         }
1070     }
1071 
1072     /**
1073      * Test that handleComplete method is invoked for tasks that complete before shutdown.
1074      */
1075     @ParameterizedTest
1076     @MethodSource("factories")
1077     void testHandleCompleteBeforeShutdown(ThreadFactory factory) throws Exception {
1078         try (var scope = new CollectAll<String>(factory)) {
1079             Callable<String> task1 = () -> "foo";
1080             Callable<String> task2 = () -> { throw new FooException(); };
1081             scope.fork(task1);
1082             scope.fork(task2);
1083             scope.join();
1084 
1085             var subtask1 = scope.find(task1);
1086             assertEquals("foo", subtask1.get());
1087 
1088             var subtask2 = scope.find(task2);
1089             assertTrue(subtask2.exception() instanceof FooException);
1090         }
1091     }
1092 
1093     /**
1094      * Test that handleComplete method is not invoked for tasks that finish after shutdown
1095      * or are forked after shutdown.
1096      */
1097     @ParameterizedTest
1098     @MethodSource("factories")
1099     void testHandleCompleteAfterShutdown(ThreadFactory factory) throws Exception {
1100         try (var scope = new CollectAll<String>(factory)) {
1101             Callable<String> task1 = () -> {
1102                 try {
1103                     Thread.sleep(Duration.ofDays(1));
1104                 } catch (InterruptedException ignore) { }
1105                 return "foo";
1106             };
1107             Callable<String> task2 = () -> {
1108                 Thread.sleep(Duration.ofDays(1));
1109                 return "bar";
1110             };
1111             Callable<String> task3 = () -> "baz";
1112 
1113             // forked before shutdown, will complete after shutdown
1114             var subtask1 = scope.fork(task1);
1115             var subtask2 = scope.fork(task2);
1116 
1117             scope.shutdown();
1118 
1119             // forked after shutdown
1120             var subtask3 = scope.fork(task3);
1121 
1122             scope.join();
1123 
1124             // handleComplete should not be called
1125             for (int i = 0; i < 3; i++) {
1126                 assertEquals(0, scope.subtasks().size());
1127                 Thread.sleep(20);
1128             }
1129 
1130             assertEquals(Subtask.State.UNAVAILABLE, subtask1.state());
1131             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1132             assertEquals(Subtask.State.UNAVAILABLE, subtask3.state());
1133         }
1134     }
1135 
1136     /**
1137      * Test that the default handleComplete throws IllegalArgumentException if called
1138      * with a running task.
1139      */
1140     @Test
1141     void testHandleCompleteThrows() throws Exception {
1142         class TestScope<T> extends StructuredTaskScope<T> {
1143             protected void handleComplete(Subtask<? extends T> subtask) {
1144                 super.handleComplete(subtask);

1145             }
1146         }
1147 
1148         try (var scope = new TestScope<String>()) {
1149             var subtask = scope.fork(() -> {
1150                 Thread.sleep(Duration.ofDays(1));
1151                 return "foo";
1152             });
1153 
1154             // running task
1155             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1156             assertThrows(IllegalArgumentException.class, () -> scope.handleComplete(subtask));
1157             scope.shutdown();
1158 
1159             // null task
1160             assertThrows(NullPointerException.class, () -> scope.handleComplete(null));
1161 
1162             scope.join();
1163         }
1164     }
1165 
1166     /**
1167      * Test ensureOwnerAndJoined.
1168      */
1169     @ParameterizedTest
1170     @MethodSource("factories")
1171     void testEnsureOwnerAndJoined(ThreadFactory factory) throws Exception {
1172         class MyScope<T> extends StructuredTaskScope<T> {
1173             MyScope(ThreadFactory factory) {
1174                 super(null, factory);
1175             }
1176             void invokeEnsureOwnerAndJoined() {
1177                 super.ensureOwnerAndJoined();

1178             }
1179         }
1180 
1181         try (var scope = new MyScope<Boolean>(factory)) {
1182             // owner thread, before join
1183             scope.fork(() -> true);
1184             assertThrows(IllegalStateException.class, () -> {
1185                 scope.invokeEnsureOwnerAndJoined();
1186             });
1187 
1188             // owner thread, after join
1189             scope.join();
1190             scope.invokeEnsureOwnerAndJoined();
1191 
1192             // thread in scope cannot invoke ensureOwnerAndJoined
1193             Subtask<Boolean> subtask = scope.fork(() -> {
1194                 assertThrows(WrongThreadException.class, () -> {
1195                     scope.invokeEnsureOwnerAndJoined();
1196                 });
1197                 return true;
1198             });
1199             scope.join();
1200             assertTrue(subtask.get());
1201 
1202             // random thread cannot invoke ensureOwnerAndJoined
1203             try (var pool = Executors.newSingleThreadExecutor()) {
1204                 Future<Void> future = pool.submit(() -> {
1205                     assertThrows(WrongThreadException.class, () -> {
1206                         scope.invokeEnsureOwnerAndJoined();
1207                     });
1208                     return null;
1209                 });
1210                 future.get();
1211             }
1212         }
1213     }
1214 
1215     /**
1216      * Test ensureOwnerAndJoined after the task scope has been closed.
1217      */
1218     @ParameterizedTest
1219     @MethodSource("factories")
1220     void testEnsureOwnerAndJoinedAfterClose(ThreadFactory factory) throws Exception {
1221         class MyScope<T> extends StructuredTaskScope<T> {
1222             MyScope(ThreadFactory factory) {
1223                 super(null, factory);
1224             }
1225             public void invokeEnsureOwnerAndJoined() {
1226                 super.ensureOwnerAndJoined();

1227             }
1228         }
1229 
1230         // ensureOwnerAndJoined after close, join invoked
1231         try (var scope = new MyScope<String>(factory)) {
1232             scope.fork(() -> "foo");



1233             scope.join();
1234             scope.close();
1235             scope.invokeEnsureOwnerAndJoined();  // should not throw
1236         }
1237 
1238         // ensureOwnerAndJoined after close, join not invoked
1239         try (var scope = new MyScope<String>(factory)) {
1240             scope.fork(() -> "foo");
1241             assertThrows(IllegalStateException.class, scope::close);
1242             scope.invokeEnsureOwnerAndJoined();  // should not throw
1243         }
1244     }
1245 
1246 
1247     /**
1248      * Test toString.
1249      */
1250     @Test
1251     void testToString() throws Exception {
1252         ThreadFactory factory = Thread.ofVirtual().factory();
1253         try (var scope = new StructuredTaskScope<Object>("duke", factory)) {
1254             // open
1255             assertTrue(scope.toString().contains("duke"));
1256 
1257             // shutdown
1258             scope.shutdown();
1259             assertTrue(scope.toString().contains("duke"));
1260 
1261             // closed
1262             scope.join();
1263             scope.close();
1264             assertTrue(scope.toString().contains("duke"));
1265         }
1266     }
1267 
1268     /**
1269      * Test Subtask with task that completes successfully.
1270      */
1271     @ParameterizedTest
1272     @MethodSource("factories")
1273     void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception {
1274         try (var scope = new StructuredTaskScope<String>(null, factory)) {
1275             Callable<String> task = () -> "foo";
1276             Subtask<String> subtask = scope.fork(task);
1277 
1278             // before join, owner thread
1279             assertEquals(task, subtask.task());

1280             assertThrows(IllegalStateException.class, subtask::get);
1281             assertThrows(IllegalStateException.class, subtask::exception);
1282 
1283             scope.join();
1284 
1285             // after join
1286             assertEquals(task, subtask.task());
1287             assertEquals(Subtask.State.SUCCESS, subtask.state());
1288             assertEquals("foo", subtask.get());
1289             assertThrows(IllegalStateException.class, subtask::exception);
1290         }
1291     }
1292 
1293     /**
1294      * Test Subtask with task that fails.
1295      */
1296     @ParameterizedTest
1297     @MethodSource("factories")
1298     void testSubtaskWhenFailed(ThreadFactory factory) throws Exception {
1299         try (var scope = new StructuredTaskScope<String>(null, factory)) {
1300             Callable<String> task = () -> { throw new FooException(); };
1301             Subtask<String> subtask = scope.fork(task);
1302 
1303             // before join, owner thread
1304             assertEquals(task, subtask.task());

1305             assertThrows(IllegalStateException.class, subtask::get);
1306             assertThrows(IllegalStateException.class, subtask::exception);
1307 
1308             scope.join();
1309 
1310             // after join
1311             assertEquals(task, subtask.task());
1312             assertEquals(Subtask.State.FAILED, subtask.state());
1313             assertThrows(IllegalStateException.class, subtask::get);
1314             assertTrue(subtask.exception() instanceof FooException);
1315         }
1316     }
1317 
1318     /**
1319      * Test Subtask with a task that has not completed.
1320      */
1321     @ParameterizedTest
1322     @MethodSource("factories")
1323     void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception {
1324         try (var scope = new StructuredTaskScope<Object>(null, factory)) {
1325             Callable<Void> task = () -> {

1326                 Thread.sleep(Duration.ofDays(1));
1327                 return null;
1328             };
1329             Subtask<Void> subtask = scope.fork(task);
1330 
1331             // before join
1332             assertEquals(task, subtask.task());
1333             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1334             assertThrows(IllegalStateException.class, subtask::get);
1335             assertThrows(IllegalStateException.class, subtask::exception);
1336 
1337             // attempt join, join throws
1338             Thread.currentThread().interrupt();
1339             assertThrows(InterruptedException.class, scope::join);
1340 
1341             // after join
1342             assertEquals(task, subtask.task());
1343             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1344             assertThrows(IllegalStateException.class, subtask::get);
1345             assertThrows(IllegalStateException.class, subtask::exception);
1346         }
1347     }
1348 
1349     /**
1350      * Test Subtask when forked after shutdown.
1351      */
1352     @ParameterizedTest
1353     @MethodSource("factories")
1354     void testSubtaskWhenShutdown(ThreadFactory factory) throws Exception {
1355         try (var scope = new StructuredTaskScope<Object>(null, factory)) {
1356             Callable<Void> task = () -> {
1357                 Thread.sleep(Duration.ofDays(1));
1358                 return null;
1359             };


1360 
1361             scope.shutdown();



1362 
1363             // fork after shutdown
1364             Subtask<Void> subtask = scope.fork(task);
1365             scope.join();
1366             assertEquals(task, subtask.task());

1367             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1368             assertThrows(IllegalStateException.class, subtask::get);
1369             assertThrows(IllegalStateException.class, subtask::exception);
1370         }
1371     }
1372 
1373     /**
1374      * Test Subtask::toString.
1375      */
1376     @Test
1377     void testSubtaskToString() throws Exception {
1378         try (var scope = new StructuredTaskScope<Object>()) {
1379             // success
1380             var subtask1 = scope.fork(() -> "foo");
1381             scope.join();
1382             assertTrue(subtask1.toString().contains("Completed successfully"));
1383 
1384             // failed
1385             var subtask2 = scope.fork(() -> { throw new FooException(); });





1386             scope.join();


1387             assertTrue(subtask2.toString().contains("Failed"));


1388 
1389             // not completed
1390             Callable<Void> sleepForDay = () -> {
1391                 Thread.sleep(Duration.ofDays(1));
1392                 return null;
1393             };
1394             var subtask3 = scope.fork(sleepForDay);
1395             assertTrue(subtask3.toString().contains("Unavailable"));



1396 
1397             scope.shutdown();















1398 
1399             // forked after shutdown
1400             var subtask4 = scope.fork(sleepForDay);
1401             assertTrue(subtask4.toString().contains("Unavailable"));















1402 
1403             scope.join();







1404         }
1405     }
1406 
1407     /**
1408      * Test ShutdownOnSuccess with no completed tasks.




































1409      */
1410     @Test
1411     void testShutdownOnSuccess1() throws Exception {
1412         try (var scope = new ShutdownOnSuccess<Object>()) {
1413             assertThrows(IllegalStateException.class, () -> scope.result());
1414             assertThrows(IllegalStateException.class, () -> scope.result(e -> null));



1415         }
1416     }
1417 
1418     /**
1419      * Test ShutdownOnSuccess with tasks that complete successfully.
1420      */
1421     @ParameterizedTest
1422     @MethodSource("factories")
1423     void testShutdownOnSuccess2(ThreadFactory factory) throws Exception {
1424         try (var scope = new ShutdownOnSuccess<String>(null, factory)) {

1425             scope.fork(() -> "foo");
1426             scope.join();  // ensures foo completes first
1427             scope.fork(() -> "bar");
1428             scope.join();
1429             assertEquals("foo", scope.result());
1430             assertEquals("foo", scope.result(e -> null));
1431         }
1432     }
1433 
1434     /**
1435      * Test ShutdownOnSuccess with a task that completes successfully with a null result.

1436      */
1437     @ParameterizedTest
1438     @MethodSource("factories")
1439     void testShutdownOnSuccess3(ThreadFactory factory) throws Exception {
1440         try (var scope = new ShutdownOnSuccess<Object>(null, factory)) {

1441             scope.fork(() -> null);
1442             scope.join();
1443             assertNull(scope.result());
1444             assertNull(scope.result(e -> null));
1445         }
1446     }
1447 
1448     /**
1449      * Test ShutdownOnSuccess with tasks that complete succcessfully and tasks that fail.

1450      */
1451     @ParameterizedTest
1452     @MethodSource("factories")
1453     void testShutdownOnSuccess4(ThreadFactory factory) throws Exception {
1454         try (var scope = new ShutdownOnSuccess<String>(null, factory)) {

1455             scope.fork(() -> "foo");
1456             scope.fork(() -> { throw new ArithmeticException(); });
1457             scope.join();
1458             assertEquals("foo", scope.result());
1459             assertEquals("foo", scope.result(e -> null));
1460         }
1461     }
1462 
1463     /**
1464      * Test ShutdownOnSuccess with a task that fails.
1465      */
1466     @ParameterizedTest
1467     @MethodSource("factories")
1468     void testShutdownOnSuccess5(ThreadFactory factory) throws Exception {
1469         try (var scope = new ShutdownOnSuccess<Object>(null, factory)) {
1470             scope.fork(() -> { throw new ArithmeticException(); });
1471             scope.join();
1472             Throwable ex = assertThrows(ExecutionException.class, () -> scope.result());
1473             assertTrue(ex.getCause() instanceof ArithmeticException);
1474             ex = assertThrows(FooException.class, () -> scope.result(e -> new FooException(e)));
1475             assertTrue(ex.getCause() instanceof ArithmeticException);









1476         }
1477     }
1478 
1479     /**
1480      * Test ShutdownOnSuccess methods are confined to the owner.
1481      */
1482     @ParameterizedTest
1483     @MethodSource("factories")
1484     void testShutdownOnSuccessConfined(ThreadFactory factory) throws Exception {
1485         // owner before join
1486         try (var scope = new ShutdownOnSuccess<Boolean>(null, factory)) {
1487             scope.fork(() -> { throw new FooException(); });
1488             assertThrows(IllegalStateException.class, scope::result);
1489             assertThrows(IllegalStateException.class, () -> {
1490                 scope.result(e -> new RuntimeException(e));
1491             });
1492             scope.join();
1493         }

1494 
1495         // non-owner
1496         try (var scope = new ShutdownOnSuccess<Boolean>(null, factory)) {
1497             Subtask<Boolean> subtask = scope.fork(() -> {
1498                 assertThrows(WrongThreadException.class, scope::result);
1499                 assertThrows(WrongThreadException.class, () -> {
1500                     scope.result(e -> new RuntimeException(e));
1501                 });
1502                 return true;
1503             });
1504             scope.join();
1505             assertTrue(subtask.get());




1506         }
1507     }
1508 
1509     /**
1510      * Test ShutdownOnFailure with no completed tasks.
1511      */
1512     @Test
1513     void testShutdownOnFailure1() throws Throwable {
1514         try (var scope = new ShutdownOnFailure()) {
1515             assertTrue(scope.exception().isEmpty());
1516             scope.throwIfFailed();
1517             scope.throwIfFailed(e -> new FooException(e));
1518         }
1519     }
1520 
1521     /**
1522      * Test ShutdownOnFailure with tasks that complete successfully.
1523      */
1524     @ParameterizedTest
1525     @MethodSource("factories")
1526     void testShutdownOnFailure2(ThreadFactory factory) throws Throwable {
1527         try (var scope = new ShutdownOnFailure(null, factory)) {
1528             scope.fork(() -> "foo");
1529             scope.fork(() -> "bar");
1530             scope.join();
1531 
1532             // no exception
1533             assertTrue(scope.exception().isEmpty());
1534             scope.throwIfFailed();
1535             scope.throwIfFailed(e -> new FooException(e));






1536         }
1537     }
1538 
1539     /**
1540      * Test ShutdownOnFailure with tasks that complete succcessfully and tasks that fail.
1541      */
1542     @ParameterizedTest
1543     @MethodSource("factories")
1544     void testShutdownOnFailure3(ThreadFactory factory) throws Throwable {
1545         try (var scope = new ShutdownOnFailure(null, factory)) {
1546 
1547             // one task completes successfully, the other fails
1548             scope.fork(() -> "foo");
1549             scope.fork(() -> { throw new ArithmeticException(); });
1550             scope.join();
1551 
1552             Throwable ex = scope.exception().orElse(null);
1553             assertTrue(ex instanceof ArithmeticException);



1554 
1555             ex = assertThrows(ExecutionException.class, () -> scope.throwIfFailed());
1556             assertTrue(ex.getCause() instanceof ArithmeticException);
1557 
1558             ex = assertThrows(FooException.class,
1559                               () -> scope.throwIfFailed(e -> new FooException(e)));
1560             assertTrue(ex.getCause() instanceof ArithmeticException);


1561         }
1562     }
1563 
1564     /**
1565      * Test ShutdownOnFailure methods are confined to the owner.
1566      */
1567     @ParameterizedTest
1568     @MethodSource("factories")
1569     void testShutdownOnFailureConfined(ThreadFactory factory) throws Exception {
1570         // owner before join
1571         try (var scope = new ShutdownOnFailure(null, factory)) {
1572             scope.fork(() -> "foo");
1573             assertThrows(IllegalStateException.class, scope::exception);
1574             assertThrows(IllegalStateException.class, scope::throwIfFailed);
1575             assertThrows(IllegalStateException.class, () -> {
1576                 scope.throwIfFailed(e -> new RuntimeException(e));
1577             });
1578             scope.join();
1579         }

1580 
1581         // non-owner
1582         try (var scope = new ShutdownOnFailure(null, factory)) {
1583             Subtask<Boolean> subtask = scope.fork(() -> {
1584                 assertThrows(WrongThreadException.class, scope::exception);
1585                 assertThrows(WrongThreadException.class, scope::throwIfFailed);
1586                 assertThrows(WrongThreadException.class, () -> {
1587                     scope.throwIfFailed(e -> new RuntimeException(e));
1588                 });
1589                 return true;
1590             });



























































1591             scope.join();
1592             assertTrue(subtask.get());











1593         }
1594     }
1595 
1596     /**
1597      * Test for NullPointerException.
1598      */
1599     @Test
1600     void testNulls() throws Exception {
1601         assertThrows(NullPointerException.class, () -> new StructuredTaskScope("", null));
1602         try (var scope = new StructuredTaskScope<Object>()) {
1603             assertThrows(NullPointerException.class, () -> scope.fork(null));
1604             assertThrows(NullPointerException.class, () -> scope.joinUntil(null));
1605         }



1606 
1607         assertThrows(NullPointerException.class, () -> new ShutdownOnSuccess<Object>("", null));
1608         try (var scope = new ShutdownOnSuccess<Object>()) {
1609             assertThrows(NullPointerException.class, () -> scope.fork(null));
1610             assertThrows(NullPointerException.class, () -> scope.joinUntil(null));
1611             assertThrows(NullPointerException.class, () -> scope.result(null));

1612         }
1613 
1614         assertThrows(NullPointerException.class, () -> new ShutdownOnFailure("", null));
1615         try (var scope = new ShutdownOnFailure()) {
1616             assertThrows(NullPointerException.class, () -> scope.fork(null));
1617             assertThrows(NullPointerException.class, () -> scope.joinUntil(null));
1618             assertThrows(NullPointerException.class, () -> scope.throwIfFailed(null));




















1619         }
1620     }
1621 




























1622     /**
1623      * A runtime exception for tests.
1624      */
1625     private static class FooException extends RuntimeException {
1626         FooException() { }
1627         FooException(Throwable cause) { super(cause); }
1628     }
1629 
1630     /**
1631      * Returns the current time in milliseconds.
1632      */
1633     private long millisTime() {
1634         long now = System.nanoTime();
1635         return TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);
1636     }
1637 
1638     /**
1639      * Check the duration of a task
1640      * @param start start time, in milliseconds
1641      * @param min minimum expected duration, in milliseconds

   1 /*
   2  * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /*
  25  * @test id=platform
  26  * @bug 8284199 8296779 8306647
  27  * @summary Basic tests for StructuredTaskScope
  28  * @enablePreview
  29  * @run junit/othervm -DthreadFactory=platform StructuredTaskScopeTest
  30  */
  31 
  32 /*
  33  * @test id=virtual
  34  * @enablePreview
  35  * @run junit/othervm -DthreadFactory=virtual StructuredTaskScopeTest
  36  */
  37 
  38 import java.time.Duration;


  39 import java.util.Arrays;
  40 import java.util.ArrayList;
  41 import java.util.List;
  42 import java.util.Set;
  43 import java.util.NoSuchElementException;
  44 import java.util.concurrent.Callable;
  45 import java.util.concurrent.ConcurrentHashMap;
  46 import java.util.concurrent.CountDownLatch;
  47 import java.util.concurrent.Executors;
  48 import java.util.concurrent.ExecutionException;
  49 import java.util.concurrent.Future;
  50 import java.util.concurrent.LinkedTransferQueue;
  51 import java.util.concurrent.ThreadFactory;
  52 import java.util.concurrent.TimeoutException;
  53 import java.util.concurrent.TimeUnit;
  54 import java.util.concurrent.RejectedExecutionException;
  55 import java.util.concurrent.ScheduledExecutorService;
  56 import java.util.concurrent.StructuredTaskScope;
  57 import java.util.concurrent.StructuredTaskScope.*;


  58 import java.util.concurrent.StructureViolationException;
  59 import java.util.concurrent.atomic.AtomicBoolean;
  60 import java.util.concurrent.atomic.AtomicInteger;
  61 import java.util.function.Function;
  62 import java.util.function.Predicate;
  63 import java.util.stream.Stream;
  64 import static java.lang.Thread.State.*;
  65 
  66 import org.junit.jupiter.api.Test;
  67 import org.junit.jupiter.api.BeforeAll;
  68 import org.junit.jupiter.api.AfterAll;
  69 import org.junit.jupiter.params.ParameterizedTest;
  70 import org.junit.jupiter.params.provider.MethodSource;
  71 import static org.junit.jupiter.api.Assertions.*;
  72 
  73 class StructuredTaskScopeTest {
  74     private static ScheduledExecutorService scheduler;
  75     private static List<ThreadFactory> threadFactories;
  76 
  77     @BeforeAll
  78     static void setup() throws Exception {
  79         scheduler = Executors.newSingleThreadScheduledExecutor();
  80 
  81         // thread factories
  82         String value = System.getProperty("threadFactory");
  83         List<ThreadFactory> list = new ArrayList<>();
  84         if (value == null || value.equals("platform"))
  85             list.add(Thread.ofPlatform().factory());
  86         if (value == null || value.equals("virtual"))
  87             list.add(Thread.ofVirtual().factory());
  88         assertTrue(list.size() > 0, "No thread factories for tests");
  89         threadFactories = list;
  90     }
  91 
  92     @AfterAll
  93     static void shutdown() {
  94         scheduler.shutdown();
  95     }
  96 
  97     private static Stream<ThreadFactory> factories() {
  98         return threadFactories.stream();
  99     }
 100 
 101     /**
 102      * Test that fork creates virtual threads when no ThreadFactory is configured.



















 103      */
 104     @Test
 105     void testForkCreateVirtualThread() throws Exception {
 106         Set<Thread> threads = ConcurrentHashMap.newKeySet();
 107         try (var scope = StructuredTaskScope.open(Policy.ignoreAll())) {
 108             for (int i = 0; i < 50; i++) {
 109                 // runnable
 110                 scope.fork(() -> {
 111                     threads.add(Thread.currentThread());
 112                 });
 113 
 114                 // callable
 115                 scope.fork(() -> {
 116                     threads.add(Thread.currentThread());
 117                     return null;
 118                 });
 119             }
 120             scope.join();
 121         }
 122         assertEquals(100, threads.size());
 123         threads.forEach(t -> assertTrue(t.isVirtual()));
 124     }
 125 
 126     /**
 127      * Test that fork create threads with the configured ThreadFactory.
 128      */
 129     @ParameterizedTest
 130     @MethodSource("factories")
 131     void testForkUsesThreadFactory(ThreadFactory factory) throws Exception {
 132         // TheadFactory that keeps reference to all threads it creates
 133         class RecordingThreadFactory implements ThreadFactory {
 134             final ThreadFactory delegate;
 135             final Set<Thread> threads = ConcurrentHashMap.newKeySet();
 136             RecordingThreadFactory(ThreadFactory delegate) {
 137                 this.delegate = delegate;
 138             }
 139             @Override
 140             public Thread newThread(Runnable task) {
 141                 Thread thread = delegate.newThread(task);
 142                 threads.add(thread);
 143                 return thread;
 144             }
 145             Set<Thread> threads() {
 146                 return threads;
 147             }
 148         }
 149         var recordingThreadFactory = new RecordingThreadFactory(factory);
 150         Set<Thread> threads = ConcurrentHashMap.newKeySet();
 151         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 152                 cf -> cf.withThreadFactory(recordingThreadFactory))) {
 153 
 154             for (int i = 0; i < 50; i++) {
 155                 // runnable
 156                 scope.fork(() -> {
 157                     threads.add(Thread.currentThread());
 158                 });
 159 
 160                 // callable
 161                 scope.fork(() -> {
 162                     threads.add(Thread.currentThread());
 163                     return null;
 164                 });
 165             }
 166             scope.join();
 167         }
 168         assertEquals(100, threads.size());
 169         assertEquals(recordingThreadFactory.threads(), threads);
 170     }
 171 
 172     /**
 173      * Test fork is owner confined.
 174      */
 175     @ParameterizedTest
 176     @MethodSource("factories")
 177     void testForkConfined(ThreadFactory factory) throws Exception {
 178         try (var scope = StructuredTaskScope.open(Policy.<Boolean>ignoreAll(),
 179                 cf -> cf.withThreadFactory(factory))) {
 180 
 181             // thread in scope cannot fork
 182             Subtask<Boolean> subtask = scope.fork(() -> {
 183                 assertThrows(WrongThreadException.class, () -> {
 184                     scope.fork(() -> null);
 185                 });
 186                 return true;
 187             });
 188             scope.join();
 189             assertTrue(subtask.get());










 190 
 191             // random thread cannot fork
 192             try (var pool = Executors.newSingleThreadExecutor()) {
 193                 Future<Void> future = pool.submit(() -> {
 194                     assertThrows(WrongThreadException.class, () -> {
 195                         scope.fork(() -> null);



 196                     });
 197                     return null;
 198                 });
 199                 future.get();
 200             }
 201         }
 202     }
 203 
 204     /**
 205      * Test fork after join, no subtasks forked before join.
 206      */
 207     @ParameterizedTest
 208     @MethodSource("factories")
 209     void testForkAfterJoin1(ThreadFactory factory) throws Exception {
 210         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 211                 cf -> cf.withThreadFactory(factory))) {









 212             scope.join();
 213             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
 214         }
 215     }
 216 
 217     /**
 218      * Test fork after join, subtasks forked before join.
 219      */
 220     @ParameterizedTest
 221     @MethodSource("factories")
 222     void testForkAfterJoin2(ThreadFactory factory) throws Exception {
 223         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 224                 cf -> cf.withThreadFactory(factory))) {
 225             scope.fork(() -> "foo");
 226             scope.join();
 227             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));


 228         }
 229     }
 230 
 231     /**
 232      * Test fork after join throws.
 233      */
 234     @ParameterizedTest
 235     @MethodSource("factories")
 236     void testForkAfterJoinThrows(ThreadFactory factory) throws Exception {
 237         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 238                 cf -> cf.withThreadFactory(factory))) {
 239             var latch = new CountDownLatch(1);
 240             var subtask1 = scope.fork(() -> {
 241                 latch.await();
 242                 return "foo";
 243             });
 244 
 245             // join throws
 246             Thread.currentThread().interrupt();
 247             assertThrows(InterruptedException.class, scope::join);
 248 
 249             // allow subtask1 to finish
 250             latch.countDown();
 251 
 252             // continue to fork
 253             var subtask2 = scope.fork(() -> "bar");


 254             scope.join();
 255             assertEquals("foo", subtask1.get());
 256             assertEquals("bar", subtask2.get());
 257         }
 258     }
 259 
 260     /**
 261      * Test fork after task scope is cancelled.
 262      */
 263     @ParameterizedTest
 264     @MethodSource("factories")
 265     void testForkAfterCancel(ThreadFactory factory) throws Exception {
 266         var countingThreadFactory = new CountingThreadFactory(factory);
 267         var testPolicy = new CancelAfterOnePolicy<String>();
 268 
 269         try (var scope = StructuredTaskScope.open(testPolicy,
 270                 cf -> cf.withThreadFactory(countingThreadFactory))) {
 271 
 272             // fork subtask, the scope should be cancelled when the subtask completes
 273             var subtask1 = scope.fork(() -> "foo");
 274             while (!scope.isCancelled()) {
 275                 Thread.sleep(20);
 276             }
 277 
 278             assertEquals(1, countingThreadFactory.threadCount());
 279             assertEquals(1, testPolicy.onForkCount());
 280             assertEquals(1, testPolicy.onCompleteCount());
 281 
 282             // fork second subtask, it should not run
 283             var subtask2 = scope.fork(() -> "bar");
 284 
 285             // onFork should be invoked, newThread and onComplete should not be invoked
 286             assertEquals(1, countingThreadFactory.threadCount());
 287             assertEquals(2, testPolicy.onForkCount());
 288             assertEquals(1, testPolicy.onCompleteCount());
 289 
 290             scope.join();
 291 
 292             assertEquals(1, countingThreadFactory.threadCount());
 293             assertEquals(2, testPolicy.onForkCount());
 294             assertEquals(1, testPolicy.onCompleteCount());
 295             assertEquals("foo", subtask1.get());
 296             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
 297         }

 298     }
 299 
 300     /**
 301      * Test fork after task scope is closed.
 302      */
 303     @ParameterizedTest
 304     @MethodSource("factories")
 305     void testForkAfterClose(ThreadFactory factory) throws Exception {
 306         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 307                 cf -> cf.withThreadFactory(factory))) {
 308             scope.close();
 309             assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
 310         }
 311     }
 312 
 313     /**
 314      * Test fork when the ThreadFactory rejects creating a thread.
 315      */
 316     @Test
 317     void testForkRejectedExecutionException() throws Exception {
 318         ThreadFactory factory = task -> null;
 319         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 320                 cf -> cf.withThreadFactory(factory))) {
 321             assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null));

 322         }
 323     }
 324 
 325     /**
 326      * Test join with no subtasks.
 327      */
 328     @Test
 329     void testJoinWithNoSubtasks() throws Exception {
 330         try (var scope = StructuredTaskScope.open(Policy.ignoreAll())) {
 331             scope.join();
 332         }
 333     }
 334 
 335     /**
 336      * Test join with a running subtask.
 337      */
 338     @ParameterizedTest
 339     @MethodSource("factories")
 340     void testJoinWithSubtasks(ThreadFactory factory) throws Exception {
 341         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 342                 cf -> cf.withThreadFactory(factory))) {
 343             Subtask<String> subtask = scope.fork(() -> {
 344                 Thread.sleep(Duration.ofMillis(100));
 345                 return "foo";
 346             });
 347             scope.join();
 348             assertEquals("foo", subtask.get());
 349         }
 350     }
 351 
 352     /**
 353      * Test repeated calls to join.
 354      */
 355     @Test
 356     void testJoinAfterJoin() throws Exception {
 357         var results = new LinkedTransferQueue<>(List.of("foo", "bar", "baz"));
 358         Policy<Object, String> policy = results::take;
 359         try (var scope = StructuredTaskScope.open(policy)) {
 360             scope.fork(() -> "foo");
 361 
 362             // each call to join should invoke Policy::result
 363             assertEquals("foo", scope.join());
 364             assertEquals("bar", scope.join());
 365             assertEquals("baz", scope.join());
 366         }
 367     }
 368 
 369     /**
 370      * Test join is owner confined.
 371      */
 372     @ParameterizedTest
 373     @MethodSource("factories")
 374     void testJoinConfined(ThreadFactory factory) throws Exception {
 375         try (var scope = StructuredTaskScope.open(Policy.<Boolean>ignoreAll(),
 376                 cf -> cf.withThreadFactory(factory))) {
 377 
 378             // thread in scope cannot join
 379             Subtask<Boolean> subtask = scope.fork(() -> {
 380                 assertThrows(WrongThreadException.class, () -> { scope.join(); });
 381                 return true;
 382             });
 383 
 384             scope.join();
 385 
 386             assertTrue(subtask.get());
 387 
 388             // random thread cannot join
 389             try (var pool = Executors.newSingleThreadExecutor()) {
 390                 Future<Void> future = pool.submit(() -> {
 391                     assertThrows(WrongThreadException.class, scope::join);
 392                     return null;
 393                 });
 394                 future.get();
 395             }
 396         }
 397     }
 398 
 399     /**
 400      * Test join with interrupt status set.
 401      */
 402     @ParameterizedTest
 403     @MethodSource("factories")
 404     void testInterruptJoin1(ThreadFactory factory) throws Exception {
 405         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 406                 cf -> cf.withThreadFactory(factory))) {
 407 
 408             var latch = new CountDownLatch(1);
 409 
 410             Subtask<String> subtask = scope.fork(() -> {
 411                 latch.await();
 412                 return "foo";
 413             });
 414 
 415             // join should throw
 416             Thread.currentThread().interrupt();
 417             try {
 418                 scope.join();
 419                 fail("join did not throw");
 420             } catch (InterruptedException expected) {
 421                 assertFalse(Thread.interrupted());   // interrupt status should be cleared
 422             } finally {
 423                 // let task continue
 424                 latch.countDown();
 425             }
 426 
 427             // join should complete
 428             scope.join();
 429             assertEquals("foo", subtask.get());
 430         }
 431     }
 432 
 433     /**
 434      * Test interrupt of thread blocked in join.
 435      */
 436     @ParameterizedTest
 437     @MethodSource("factories")
 438     void testInterruptJoin2(ThreadFactory factory) throws Exception {
 439         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 440                 cf -> cf.withThreadFactory(factory))) {
 441 
 442             var latch = new CountDownLatch(1);
 443             Subtask<String> subtask = scope.fork(() -> {
 444                 latch.await();
 445                 return "foo";
 446             });
 447 
 448             // join should throw
 449             scheduleInterruptAt("java.util.concurrent.StructuredTaskScope.join");
 450             try {
 451                 scope.join();
 452                 fail("join did not throw");
 453             } catch (InterruptedException expected) {
 454                 assertFalse(Thread.interrupted());   // interrupt status should be clear
 455             } finally {
 456                 // let task continue
 457                 latch.countDown();
 458             }
 459 
 460             // join should complete
 461             scope.join();
 462             assertEquals("foo", subtask.get());
 463         }
 464     }
 465 
 466     /**
 467      * Test join when scope is cancelled.
 468      */
 469     @ParameterizedTest
 470     @MethodSource("factories")
 471     void testJoinWhenCancelled(ThreadFactory factory) throws Exception {
 472         var countingThreadFactory = new CountingThreadFactory(factory);
 473         var testPolicy = new CancelAfterOnePolicy<String>();













 474 
 475         try (var scope = StructuredTaskScope.open(testPolicy,
 476                     cf -> cf.withThreadFactory(countingThreadFactory))) {






 477 
 478             // fork subtask, the scope should be cancelled when the subtask completes
 479             var subtask1 = scope.fork(() -> "foo");
 480             while (!scope.isCancelled()) {
 481                 Thread.sleep(20);









 482             }

 483 
 484             // fork second subtask, it should not run
 485             var subtask2 = scope.fork(() -> {




 486                 Thread.sleep(Duration.ofDays(1));
 487                 return "bar";
 488             });
 489 

 490             scope.join();
 491 


 492             assertEquals("foo", subtask1.get());



 493             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());


 494         }
 495     }
 496 
 497     /**
 498      * Test join after scope is closed.
 499      */
 500     @Test
 501     void testJoinAfterClose() throws Exception {
 502         try (var scope = StructuredTaskScope.open(Policy.ignoreAll())) {

 503             scope.close();
 504             assertThrows(IllegalStateException.class, () -> scope.join());

 505         }
 506     }
 507 
 508     /**
 509      * Test join with timeout, subtasks finish before timeout expires.
 510      */
 511     @ParameterizedTest
 512     @MethodSource("factories")
 513     void testJoinWithTimeout1(ThreadFactory factory) throws Exception {
 514         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 515                 cf -> cf.withThreadFactory(factory)
 516                         .withTimeout(Duration.ofDays(1)))) {
 517 
 518             Subtask<String> subtask = scope.fork(() -> {
 519                 Thread.sleep(Duration.ofSeconds(1));


 520                 return "foo";
 521             });
 522 
 523             scope.join();
 524 
 525             assertFalse(scope.isCancelled());
 526             assertEquals("foo", subtask.get());
 527         }
 528     }
 529 
 530     /**
 531      * Test join with timeout, timeout expires before subtasks finish.
 532      */
 533     @ParameterizedTest
 534     @MethodSource("factories")
 535     void testJoinWithTimeout2(ThreadFactory factory) throws Exception {
 536         long startMillis = millisTime();
 537         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 538                 cf -> cf.withThreadFactory(factory)
 539                         .withTimeout(Duration.ofSeconds(2)))) {
 540 
 541             Subtask<Void> subtask = scope.fork(() -> {
 542                 Thread.sleep(Duration.ofDays(1));
 543                 return null;
 544             });
 545 

 546             try {
 547                 scope.join();
 548                 fail();
 549             } catch (ExecutionException e) {
 550                 assertTrue(e.getCause() instanceof TimeoutException);
 551             }
 552             expectDuration(startMillis, /*min*/1900, /*max*/20_000);
 553 
 554             assertTrue(scope.isCancelled());
 555             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 556         }
 557     }
 558 
 559     /**
 560      * Test join with timeout that has already expired.
 561      */
 562     @ParameterizedTest
 563     @MethodSource("factories")
 564     void testJoinWithTimeout3(ThreadFactory factory) throws Exception {
 565         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 566                 cf -> cf.withThreadFactory(factory)
 567                         .withTimeout(Duration.ofSeconds(-1)))) {













 568 







 569             Subtask<Void> subtask = scope.fork(() -> {
 570                 Thread.sleep(Duration.ofDays(1));
 571                 return null;
 572             });
 573 









 574             try {
 575                 scope.join();
 576                 fail();
 577             } catch (ExecutionException e) {
 578                 assertTrue(e.getCause() instanceof TimeoutException);
 579             }
 580             assertTrue(scope.isCancelled());
 581             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 582         }
 583     }
 584 
 585     /**
 586      * Test that cancelling exceutions interrupts unfinished threads.
 587      */
 588     @ParameterizedTest
 589     @MethodSource("factories")
 590     void testCancelInterruptsThreads(ThreadFactory factory) throws Exception {
 591         var testPolicy = new CancelAfterOnePolicy<String>();

 592 
 593         try (var scope = StructuredTaskScope.open(testPolicy,
 594                 cf -> cf.withThreadFactory(factory))) {
 595 
 596             // fork subtask1 that runs for a long time
 597             var started = new CountDownLatch(1);
 598             var interrupted = new CountDownLatch(1);
 599             var subtask1 = scope.fork(() -> {
 600                 started.countDown();
 601                 try {
 602                     Thread.sleep(Duration.ofDays(1));
 603                 } catch (InterruptedException e) {
 604                     interrupted.countDown();
 605                 }
 606             });
 607             started.await();
 608 
 609             // fork subtask2, the scope should be cancelled when the subtask completes
 610             var subtask2 = scope.fork(() -> "bar");
 611             while (!scope.isCancelled()) {
 612                 Thread.sleep(20);






 613             }
 614 
 615             // subtask1 should be interrupted
 616             interrupted.await();
 617 
 618             scope.join();
 619 
 620             assertEquals(Subtask.State.UNAVAILABLE, subtask1.state());
 621             assertEquals("bar", subtask2.get());
 622         }
 623     }
 624 
 625     /**
 626      * Test that timeout interrupts unfinished threads.
 627      */
 628     @ParameterizedTest
 629     @MethodSource("factories")
 630     void testTimeoutInterruptsThreads(ThreadFactory factory) throws Exception {
 631         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 632                 cf -> cf.withThreadFactory(factory)
 633                         .withTimeout(Duration.ofSeconds(2)))) {
 634 
 635             var started = new AtomicBoolean();
 636             var interrupted = new CountDownLatch(1);
 637             Subtask<Void> subtask = scope.fork(() -> {
 638                 started.set(true);
 639                 try {
 640                     Thread.sleep(Duration.ofDays(1));
 641                 } catch (InterruptedException e) {
 642                     interrupted.countDown();
 643                 }
 644                 return null;
 645             });
 646 
 647             while (!scope.isCancelled()) {
 648                 Thread.sleep(50);
 649             }
 650 
 651             // if subtask started then it should be interrupted
 652             if (started.get()) {
 653                 interrupted.await();
 654             }
 655 
 656             try {
 657                 scope.join();
 658                 fail();
 659             } catch (ExecutionException e) {
 660                 assertTrue(e.getCause() instanceof TimeoutException);



 661             }
 662             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 663         }
 664     }
 665 
 666     /**
 667      * Test close without join, no subtasks forked.
 668      */
 669     @Test
 670     void testCloseWithoutJoin1() {
 671         try (var scope = StructuredTaskScope.open(Policy.ignoreAll())) {
 672             // do nothing
 673         }
 674     }
 675 
 676     /**
 677      * Test close without join, unfinished subtasks.
 678      */
 679     @ParameterizedTest
 680     @MethodSource("factories")
 681     void testCloseWithoutJoin2(ThreadFactory factory) {
 682         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 683                 cf -> cf.withThreadFactory(factory))) {
 684             Subtask<String> subtask = scope.fork(() -> {
 685                 Thread.sleep(Duration.ofDays(1));







 686                 return null;
 687             });
 688             assertThrows(IllegalStateException.class, scope::close);







 689 
 690             // subtask result/exception not available
 691             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 692             assertThrows(IllegalStateException.class, subtask::get);
 693             assertThrows(IllegalStateException.class, subtask::exception);
 694         }
 695     }
 696 
 697     /**
 698      * Test close after join throws. Close should not throw as join attempted.
 699      */
 700     @ParameterizedTest
 701     @MethodSource("factories")
 702     void testCloseAfterJoinThrows(ThreadFactory factory) throws Exception {
 703         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 704                 cf -> cf.withThreadFactory(factory))) {

 705             var subtask = scope.fork(() -> {
 706                 Thread.sleep(Duration.ofDays(1));





 707                 return null;
 708             });
 709 
 710             // join throws
 711             Thread.currentThread().interrupt();
 712             assertThrows(InterruptedException.class, scope::join);
 713             assertThrows(IllegalStateException.class, subtask::get);
 714 
 715         }  // close should not throw

 716     }
 717 
 718     /**
 719      * Test close is owner confined.
 720      */
 721     @ParameterizedTest
 722     @MethodSource("factories")
 723     void testCloseConfined(ThreadFactory factory) throws Exception {
 724         try (var scope = StructuredTaskScope.open(Policy.<Boolean>ignoreAll(),
 725                 cf -> cf.withThreadFactory(factory))) {
 726 
 727             // attempt to close from thread in scope
 728             Subtask<Boolean> subtask = scope.fork(() -> {
 729                 assertThrows(WrongThreadException.class, scope::close);
 730                 return true;
 731             });
 732 
 733             scope.join();
 734             assertTrue(subtask.get());





















































































































































































 735 
 736             // random thread cannot close scope
 737             try (var pool = Executors.newCachedThreadPool(factory)) {
 738                 Future<Boolean> future = pool.submit(() -> {
 739                     assertThrows(WrongThreadException.class, scope::close);
 740                     return null;
 741                 });
 742                 future.get();
 743             }
 744         }
 745     }
 746 
 747     /**
 748      * Test close with interrupt status set.
 749      */
 750     @ParameterizedTest
 751     @MethodSource("factories")
 752     void testInterruptClose1(ThreadFactory factory) throws Exception {
 753         var testPolicy = new CancelAfterOnePolicy<String>();
 754         try (var scope = StructuredTaskScope.open(testPolicy,
 755                 cf -> cf.withThreadFactory(factory))) {
 756 
 757             // fork first subtask, a straggler as it continues after being interrupted
 758             var started = new CountDownLatch(1);
 759             var done = new AtomicBoolean();
 760             scope.fork(() -> {
 761                 started.countDown();
 762                 try {
 763                     Thread.sleep(Duration.ofDays(1));
 764                 } catch (InterruptedException e) {
 765                     // interrupted by cancel, expected
 766                 }
 767                 Thread.sleep(Duration.ofMillis(100)); // force close to wait
 768                 done.set(true);
 769                 return null;
 770             });
 771             started.await();
 772 
 773             // fork second subtask, the scope should be cancelled when this subtask completes
 774             scope.fork(() -> "bar");
 775             while (!scope.isCancelled()) {
 776                 Thread.sleep(20);
 777             }
 778 

 779             scope.join();
 780 
 781             // invoke close with interrupt status set
 782             Thread.currentThread().interrupt();
 783             try {
 784                 scope.close();
 785             } finally {
 786                 assertTrue(Thread.interrupted());   // clear interrupt status
 787                 assertTrue(done.get());
 788             }
 789         }
 790     }
 791 
 792     /**
 793      * Test interrupting thread waiting in close.
 794      */
 795     @ParameterizedTest
 796     @MethodSource("factories")
 797     void testInterruptClose2(ThreadFactory factory) throws Exception {
 798         var testPolicy = new CancelAfterOnePolicy<String>();
 799         try (var scope = StructuredTaskScope.open(testPolicy,
 800                 cf -> cf.withThreadFactory(factory))) {
 801 
 802             Thread mainThread = Thread.currentThread();
 803 
 804             // fork first subtask, a straggler as it continues after being interrupted
 805             var started = new CountDownLatch(1);
 806             var done = new AtomicBoolean();
 807             scope.fork(() -> {
 808                 started.countDown();
 809                 try {
 810                     Thread.sleep(Duration.ofDays(1));
 811                 } catch (InterruptedException e) {
 812                     // interrupted by cancel, expected
 813                 }
 814 
 815                 // interrupt main thread when it blocks in close
 816                 interruptThreadAt(mainThread, "java.util.concurrent.StructuredTaskScope.close");
 817 
 818                 Thread.sleep(Duration.ofMillis(100)); // force close to wait
 819                 done.set(true);
 820                 return null;
 821             });
 822             started.await();
 823 
 824             // fork second subtask, the scope should be cancelled when this subtask completes
 825             scope.fork(() -> "bar");
 826             while (!scope.isCancelled()) {
 827                 Thread.sleep(20);
 828             }
 829 

 830             scope.join();
 831 
 832             // main thread will be interrupted while blocked in close
 833             try {
 834                 scope.close();
 835             } finally {
 836                 assertTrue(Thread.interrupted());   // clear interrupt status
 837                 assertTrue(done.get());
 838             }
 839         }
 840     }
 841 
 842     /**
 843      * Test that closing an enclosing scope closes the thread flock of a nested scope.
 844      */
 845     @Test
 846     void testCloseThrowsStructureViolation() throws Exception {
 847         try (var scope1 = StructuredTaskScope.open(Policy.ignoreAll())) {
 848             try (var scope2 = StructuredTaskScope.open(Policy.ignoreAll())) {
 849 
 850                 // close enclosing scope

 851                 try {
 852                     scope1.close();
 853                     fail("close did not throw");
 854                 } catch (StructureViolationException expected) { }
 855 
 856                 // underlying flock should be closed
 857                 var executed = new AtomicBoolean();
 858                 Subtask<?> subtask = scope2.fork(() -> executed.set(true));



 859                 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 860                 scope2.join();
 861                 assertFalse(executed.get());
 862                 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
 863             }
 864         }
 865     }
 866 
 867     /**
 868      * Test that isCancelled returns true after close.

















































 869      */
 870     @Test
 871     void testIsCancelledAfterClose() throws Exception {
 872         try (var scope = StructuredTaskScope.open(Policy.ignoreAll())) {
 873             assertFalse(scope.isCancelled());
 874             scope.close();
 875             assertTrue(scope.isCancelled());






























 876         }
 877     }
 878 
 879     /**
 880      * Test Policy.onFork throwing exception.

 881      */
 882     @Test
 883     void testOnForkThrows() throws Exception {
 884         var policy = new Policy<String, Void>() {
 885             @Override
 886             public boolean onFork(Subtask<? extends String> subtask) {
 887                 throw new FooException();
 888             }
 889             @Override
 890             public Void result() {
 891                 return null;
 892             }
 893         };
 894         try (var scope = StructuredTaskScope.open(policy)) {
 895             assertThrows(FooException.class, () -> scope.fork(() -> "foo"));










 896         }
 897     }
 898 
 899     /**
 900      * Test Policy.onFork returning true to cancel execution.
 901      */
 902     @Test
 903     void testOnForkCancelsExecution() throws Exception {
 904         var policy = new Policy<String, Void>() {
 905             @Override
 906             public boolean onFork(Subtask<? extends String> subtask) {
 907                 return true;
 908             }
 909             @Override
 910             public Void result() {
 911                 return null;
 912             }
 913         };
 914         try (var scope = StructuredTaskScope.open(policy)) {
 915             assertFalse(scope.isCancelled());
 916             scope.fork(() -> "foo");
 917             assertTrue(scope.isCancelled());















 918             scope.join();












 919         }
 920     }
 921 
 922     /**
 923      * Test Policy.onComplete returning true to cancel execution.
 924      */
 925     @Test
 926     void testOnCompleteCancelsExecution() throws Exception {
 927         var policy = new Policy<String, Void>() {
 928             @Override
 929             public boolean onComplete(Subtask<? extends String> subtask) {
 930                 return true;
 931             }
 932             @Override
 933             public Void result() {
 934                 return null;
 935             }
 936         };
 937         try (var scope = StructuredTaskScope.open(policy)) {
 938             assertFalse(scope.isCancelled());

 939             scope.fork(() -> "foo");
 940             while (!scope.isCancelled()) {
 941                 Thread.sleep(10);
 942             }
 943             scope.join();









 944         }
 945     }
 946 

 947     /**
 948      * Test toString.
 949      */
 950     @Test
 951     void testToString() throws Exception {
 952         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
 953                 cf -> cf.withName("duke"))) {


 954 
 955             // open

 956             assertTrue(scope.toString().contains("duke"));
 957 
 958             // closed

 959             scope.close();
 960             assertTrue(scope.toString().contains("duke"));
 961         }
 962     }
 963 
 964     /**
 965      * Test Subtask with task that completes successfully.
 966      */
 967     @ParameterizedTest
 968     @MethodSource("factories")
 969     void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception {
 970         try (var scope = StructuredTaskScope.open(Policy.<String>ignoreAll(),
 971                 cf -> cf.withThreadFactory(factory))) {

 972 
 973             Subtask<String> subtask = scope.fork(() -> "foo");
 974 
 975             // before join
 976             assertThrows(IllegalStateException.class, subtask::get);
 977             assertThrows(IllegalStateException.class, subtask::exception);
 978 
 979             scope.join();
 980 
 981             // after join

 982             assertEquals(Subtask.State.SUCCESS, subtask.state());
 983             assertEquals("foo", subtask.get());
 984             assertThrows(IllegalStateException.class, subtask::exception);
 985         }
 986     }
 987 
 988     /**
 989      * Test Subtask with task that fails.
 990      */
 991     @ParameterizedTest
 992     @MethodSource("factories")
 993     void testSubtaskWhenFailed(ThreadFactory factory) throws Exception {
 994         try (var scope = StructuredTaskScope.open(Policy.<String>ignoreAll(),
 995                 cf -> cf.withThreadFactory(factory))) {

 996 
 997             Subtask<String> subtask = scope.fork(() -> { throw new FooException(); });
 998 
 999             // before join
1000             assertThrows(IllegalStateException.class, subtask::get);
1001             assertThrows(IllegalStateException.class, subtask::exception);
1002 
1003             scope.join();
1004 
1005             // after join

1006             assertEquals(Subtask.State.FAILED, subtask.state());
1007             assertThrows(IllegalStateException.class, subtask::get);
1008             assertTrue(subtask.exception() instanceof FooException);
1009         }
1010     }
1011 
1012     /**
1013      * Test Subtask with a task that has not completed.
1014      */
1015     @ParameterizedTest
1016     @MethodSource("factories")
1017     void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception {
1018         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
1019                 cf -> cf.withThreadFactory(factory))) {
1020             Subtask<Void> subtask = scope.fork(() -> {
1021                 Thread.sleep(Duration.ofDays(1));
1022                 return null;
1023             });

1024 
1025             // before join

1026             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1027             assertThrows(IllegalStateException.class, subtask::get);
1028             assertThrows(IllegalStateException.class, subtask::exception);
1029 
1030             // attempt join, join throws
1031             Thread.currentThread().interrupt();
1032             assertThrows(InterruptedException.class, scope::join);
1033 
1034             // after join

1035             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1036             assertThrows(IllegalStateException.class, subtask::get);
1037             assertThrows(IllegalStateException.class, subtask::exception);
1038         }
1039     }
1040 
1041     /**
1042      * Test Subtask forked after execution cancelled.
1043      */
1044     @ParameterizedTest
1045     @MethodSource("factories")
1046     void testSubtaskWhenCancelled(ThreadFactory factory) throws Exception {
1047         try (var scope = StructuredTaskScope.open(new CancelAfterOnePolicy<String>())) {
1048             scope.fork(() -> "foo");
1049             while (!scope.isCancelled()) {
1050                 Thread.sleep(20);
1051             }
1052 
1053             var subtask = scope.fork(() -> "foo");
1054 
1055             // before join
1056             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1057             assertThrows(IllegalStateException.class, subtask::get);
1058             assertThrows(IllegalStateException.class, subtask::exception);
1059 


1060             scope.join();
1061 
1062             // after join
1063             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
1064             assertThrows(IllegalStateException.class, subtask::get);
1065             assertThrows(IllegalStateException.class, subtask::exception);
1066         }
1067     }
1068 
1069     /**
1070      * Test Subtask::toString.
1071      */
1072     @Test
1073     void testSubtaskToString() throws Exception {
1074         try (var scope = StructuredTaskScope.open(Policy.ignoreAll())) {
1075             var latch = new CountDownLatch(1);
1076             var subtask1 = scope.fork(() -> {
1077                 latch.await();
1078                 return "foo";
1079             });

1080             var subtask2 = scope.fork(() -> { throw new FooException(); });
1081 
1082             // subtask1 result is unavailable
1083             assertTrue(subtask1.toString().contains("Unavailable"));
1084             latch.countDown();
1085 
1086             scope.join();
1087 
1088             assertTrue(subtask1.toString().contains("Completed successfully"));
1089             assertTrue(subtask2.toString().contains("Failed"));
1090         }
1091     }
1092 
1093     /**
1094      * Test Policy.ignoreSuccessfulOrThrow() with no subtasks.
1095      */
1096     @Test
1097     void testThrowIfFailed1() throws Throwable {
1098         try (var scope = StructuredTaskScope.open(Policy.ignoreSuccessfulOrThrow())) {
1099             var result = scope.join();
1100             assertNull(result);
1101         }
1102     }
1103 
1104     /**
1105      * Test Policy.ignoreSuccessfulOrThrow() with subtasks that complete successfully.
1106      */
1107     @ParameterizedTest
1108     @MethodSource("factories")
1109     void testThrowIfFailed2(ThreadFactory factory) throws Throwable {
1110         try (var scope = StructuredTaskScope.open(Policy.<String>ignoreSuccessfulOrThrow(),
1111                 cf -> cf.withThreadFactory(factory))) {
1112             var subtask1 = scope.fork(() -> "foo");
1113             var subtask2 = scope.fork(() -> "bar");
1114             var result = scope.join();
1115             assertNull(result);
1116             assertEquals("foo", subtask1.get());
1117             assertEquals("bar", subtask2.get());
1118         }
1119     }
1120 
1121     /**
1122      * Test Policy.ignoreSuccessfulOrThrow() with a subtask that complete successfully and
1123      * a subtask that fails.
1124      */
1125     @ParameterizedTest
1126     @MethodSource("factories")
1127     void testThrowIfFailed3(ThreadFactory factory) throws Throwable {
1128         try (var scope = StructuredTaskScope.open(Policy.<String>ignoreSuccessfulOrThrow(),
1129                 cf -> cf.withThreadFactory(factory))) {
1130             scope.fork(() -> "foo");
1131             scope.fork(() -> { throw new FooException(); });
1132             try {
1133                 scope.join();
1134             } catch (ExecutionException e) {
1135                 assertTrue(e.getCause() instanceof FooException);
1136             }
1137         }
1138     }
1139 
1140     /**
1141      * Test Policy.allSuccessfulOrThrow() with no subtasks.
1142      */
1143     @Test
1144     void testAllSuccessful1() throws Throwable {
1145         try (var scope = StructuredTaskScope.open(Policy.allSuccessfulOrThrow())) {
1146             var subtasks = scope.join().toList();
1147             assertTrue(subtasks.isEmpty());
1148         }
1149     }
1150 
1151     /**
1152      * Test Policy.allSuccessfulOrThrow() with subtasks that complete successfully.
1153      */
1154     @ParameterizedTest
1155     @MethodSource("factories")
1156     void testAllSuccessful2(ThreadFactory factory) throws Throwable {
1157         try (var scope = StructuredTaskScope.open(Policy.<String>allSuccessfulOrThrow(),
1158                 cf -> cf.withThreadFactory(factory))) {
1159             var subtask1 = scope.fork(() -> "foo");
1160             var subtask2 = scope.fork(() -> "bar");
1161             var subtasks = scope.join().toList();
1162             assertEquals(List.of(subtask1, subtask2), subtasks);
1163             assertEquals("foo", subtask1.get());
1164             assertEquals("bar", subtask2.get());
1165         }
1166     }
1167 
1168     /**
1169      * Test Policy.allSuccessfulOrThrow() with a subtask that complete successfully and
1170      * a subtask that fails.
1171      */
1172     @ParameterizedTest
1173     @MethodSource("factories")
1174     void testAllSuccessful3(ThreadFactory factory) throws Throwable {
1175         try (var scope = StructuredTaskScope.open(Policy.<String>allSuccessfulOrThrow(),
1176                 cf -> cf.withThreadFactory(factory))) {
1177             scope.fork(() -> "foo");
1178             scope.fork(() -> { throw new FooException(); });
1179             try {
1180                 scope.join();
1181             } catch (ExecutionException e) {
1182                 assertTrue(e.getCause() instanceof FooException);
1183             }
1184         }
1185     }
1186 
1187     /**
1188      * Test Policy.anySuccessfulResultOrThrow() with no subtasks.
1189      */
1190     @Test
1191     void testAnySuccessful1() throws Exception {
1192         try (var scope = StructuredTaskScope.open(Policy.anySuccessfulResultOrThrow())) {
1193             try {
1194                 scope.join();
1195             } catch (ExecutionException e) {
1196                 assertTrue(e.getCause() instanceof NoSuchElementException);
1197             }
1198         }
1199     }
1200 
1201     /**
1202      * Test Policy.anySuccessfulResultOrThrow() with a subtask that completes successfully.
1203      */
1204     @ParameterizedTest
1205     @MethodSource("factories")
1206     void testAnySuccessful2(ThreadFactory factory) throws Exception {
1207         try (var scope = StructuredTaskScope.open(Policy.<String>anySuccessfulResultOrThrow(),
1208                 cf -> cf.withThreadFactory(factory))) {
1209             scope.fork(() -> "foo");
1210             String result = scope.join();
1211             assertEquals("foo", result);



1212         }
1213     }
1214 
1215     /**
1216      * Test Policy.anySuccessfulResultOrThrow() with a subtask that completes successfully
1217      * with a null result.
1218      */
1219     @ParameterizedTest
1220     @MethodSource("factories")
1221     void testAnySuccessful3(ThreadFactory factory) throws Exception {
1222         try (var scope = StructuredTaskScope.open(Policy.<String>anySuccessfulResultOrThrow(),
1223                 cf -> cf.withThreadFactory(factory))) {
1224             scope.fork(() -> null);
1225             String result = scope.join();
1226             assertNull(result);

1227         }
1228     }
1229 
1230     /**
1231      * Test Policy.anySuccessfulResultOrThrow() with a subtask that complete succcessfully and
1232      * a subtask that fails.
1233      */
1234     @ParameterizedTest
1235     @MethodSource("factories")
1236     void testAnySuccessful4(ThreadFactory factory) throws Exception {
1237         try (var scope = StructuredTaskScope.open(Policy.<String>anySuccessfulResultOrThrow(),
1238                 cf -> cf.withThreadFactory(factory))) {
1239             scope.fork(() -> "foo");
1240             scope.fork(() -> { throw new FooException(); });
1241             String first = scope.join();
1242             assertEquals("foo", first);

1243         }
1244     }
1245 
1246     /**
1247      * Test Policy.anySuccessfulResultOrThrow() with a subtask that fails.
1248      */
1249     @ParameterizedTest
1250     @MethodSource("factories")
1251     void testAnySuccessful5(ThreadFactory factory) throws Exception {
1252         try (var scope = StructuredTaskScope.open(Policy.anySuccessfulResultOrThrow(),
1253                 cf -> cf.withThreadFactory(factory))) {
1254             scope.fork(() -> { throw new FooException(); });
1255             Throwable ex = assertThrows(ExecutionException.class, scope::join);
1256             assertTrue(ex.getCause() instanceof FooException);
1257         }
1258     }
1259 
1260     /**
1261      * Test Policy.ignoreAll() with no subtasks.
1262      */
1263     @Test
1264     void testIgnoreFailures1() throws Throwable {
1265         try (var scope = StructuredTaskScope.open(Policy.ignoreAll())) {
1266             var result = scope.join();
1267             assertNull(result);
1268         }
1269     }
1270 
1271     /**
1272      * Test Policy.ignoreAll() with subtasks that complete successfully.
1273      */
1274     @ParameterizedTest
1275     @MethodSource("factories")
1276     void testIgnoreFailures2(ThreadFactory factory) throws Throwable {
1277         try (var scope = StructuredTaskScope.open(Policy.<String>ignoreAll(),
1278                 cf -> cf.withThreadFactory(factory))) {
1279             var subtask1 = scope.fork(() -> "foo");
1280             var subtask2 = scope.fork(() -> "bar");
1281             var result = scope.join();
1282             assertNull(result);
1283             assertEquals("foo", subtask1.get());
1284             assertEquals("bar", subtask2.get());
1285         }
1286     }
1287 
1288     /**
1289      * Test Policy.ignoreAll() with a subtask that complete successfully and
1290      * a subtask that fails.
1291      */
1292     @ParameterizedTest
1293     @MethodSource("factories")
1294     void testIgnoreFailures3(ThreadFactory factory) throws Throwable {
1295         try (var scope = StructuredTaskScope.open(Policy.<String>ignoreAll(),
1296                 cf -> cf.withThreadFactory(factory))) {
1297             var subtask1 = scope.fork(() -> "foo");
1298             var subtask2 = scope.fork(() -> { throw new FooException(); });
1299             var result = scope.join();
1300             assertNull(result);
1301             assertEquals("foo", subtask1.get());
1302             assertTrue(subtask2.exception() instanceof FooException);
1303         }
1304     }
1305 
1306     /**
1307      * Test Policy.all(Predicate) with no subtasks.
1308      */
1309     @Test
1310     void testAllForked1() throws Throwable {
1311         try (var scope = StructuredTaskScope.open(Policy.all(s -> false))) {
1312             var subtasks = scope.join();
1313             assertEquals(0, subtasks.count());

1314         }
1315     }
1316 
1317     /**
1318      * Test Policy.all(Predicate) with no cancellation.
1319      */
1320     @ParameterizedTest
1321     @MethodSource("factories")
1322     void testAllForked2(ThreadFactory factory) throws Exception {
1323         try (var scope = StructuredTaskScope.open(Policy.<String>all(s -> false),
1324                 cf -> cf.withThreadFactory(factory))) {


1325 
1326             var subtask1 = scope.fork(() -> "foo");
1327             var subtask2 = scope.fork(() -> { throw new FooException(); });
1328 
1329             var subtasks = scope.join().toList();
1330             assertEquals(2, subtasks.size());
1331 
1332             assertTrue(subtasks.get(0) == subtask1);
1333             assertTrue(subtasks.get(1) == subtask2);
1334             assertEquals("foo", subtask1.get());
1335             assertTrue(subtask2.exception() instanceof FooException);
1336         }
1337     }
1338 
1339     /**
1340      * Test Policy.all(Predicate) with cancellation after one subtask completes.
1341      */
1342     @ParameterizedTest
1343     @MethodSource("factories")
1344     void testAllForked3(ThreadFactory factory) throws Exception {
1345         try (var scope = StructuredTaskScope.open(Policy.<String>all(s -> true),
1346                 cf -> cf.withThreadFactory(factory))) {




1347 
1348             var subtask1 = scope.fork(() -> "foo");
1349             var subtask2 = scope.fork(() -> {
1350                 Thread.sleep(Duration.ofDays(1));
1351                 return "bar";
1352             });
1353 
1354             var subtasks = scope.join().toList();

1355 
1356             assertEquals(2, subtasks.size());
1357             assertTrue(subtasks.get(0) == subtask1);
1358             assertTrue(subtasks.get(1) == subtask2);
1359             assertEquals("foo", subtask1.get());
1360             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1361         }
1362     }
1363 
1364     /**
1365      * Test Policy.all(Predicate) with cancellation after serveral subtasks completes.
1366      */
1367     @ParameterizedTest
1368     @MethodSource("factories")
1369     void testAllForked4(ThreadFactory factory) throws Exception {
1370 
1371         // cancel execution if 2 or more failures
1372         class AtMostTwoFailures<T> implements Predicate<Subtask<? extends T>> {
1373             final AtomicInteger failedCount = new AtomicInteger();
1374             @Override
1375             public boolean test(Subtask<? extends T> subtask) {
1376                 return subtask.state() == Subtask.State.FAILED
1377                         && failedCount.incrementAndGet() >= 2;
1378             }
1379         }
1380         var policy = Policy.all(new AtMostTwoFailures<String>());
1381 
1382         try (var scope = StructuredTaskScope.open(policy)) {
1383             int forkCount = 0;
1384 
1385             // fork subtasks until execution cancelled
1386             while (!scope.isCancelled()) {
1387                 scope.fork(() -> "foo");
1388                 scope.fork(() -> { throw new FooException(); });
1389                 forkCount += 2;
1390                 Thread.sleep(Duration.ofMillis(10));
1391             }
1392 
1393             var subtasks = scope.join().toList();
1394             assertEquals(forkCount, subtasks.size());
1395 
1396             long failedCount = subtasks.stream()
1397                     .filter(s -> s.state() == Subtask.State.FAILED)
1398                     .count();
1399             assertTrue(failedCount >= 2);
1400         }
1401     }
1402 
1403     /**
1404      * Test Config equals/hashCode/toString
1405      */
1406     @Test
1407     void testConfigMethods() throws Exception {
1408         Function<Config, Config> testConfig = cf -> {
1409             var name = "duke";
1410             var threadFactory = Thread.ofPlatform().factory();
1411             var timeout = Duration.ofSeconds(10);
1412 
1413             assertEquals(cf, cf);
1414             assertEquals(cf.withName(name), cf.withName(name));
1415             assertEquals(cf.withThreadFactory(threadFactory), cf.withThreadFactory(threadFactory));
1416             assertEquals(cf.withTimeout(timeout), cf.withTimeout(timeout));
1417 
1418             assertNotEquals(cf, cf.withName(name));
1419             assertNotEquals(cf, cf.withThreadFactory(threadFactory));
1420             assertNotEquals(cf, cf.withTimeout(timeout));
1421 
1422             assertEquals(cf.withName(name).hashCode(), cf.withName(name).hashCode());
1423             assertEquals(cf.withThreadFactory(threadFactory).hashCode(),
1424                     cf.withThreadFactory(threadFactory).hashCode());
1425             assertEquals(cf.withTimeout(timeout).hashCode(), cf.withTimeout(timeout).hashCode());
1426 
1427             assertTrue(cf.withName(name).toString().contains(name));
1428             assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
1429             assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
1430 
1431             return cf;
1432         };
1433         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(), testConfig)) {
1434             // do nothing
1435         }
1436     }
1437 
1438     /**
1439      * Test Policy's default methods.
1440      */
1441     @Test
1442     void testPolicyDefaultMethods() throws Exception {
1443         try (var scope = StructuredTaskScope.open(new CancelAfterOnePolicy<String>())) {
1444 
1445             // need subtasks to test default methods
1446             var subtask1 = scope.fork(() -> "foo");
1447             while (!scope.isCancelled()) {
1448                 Thread.sleep(20);
1449             }
1450             var subtask2 = scope.fork(() -> "bar");
1451             scope.join();
1452 
1453             assertEquals(Subtask.State.SUCCESS, subtask1.state());
1454             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
1455 
1456             // Policy that does not override default methods
1457             Policy<Object, Void> policy = () -> null;
1458             assertThrows(NullPointerException.class, () -> policy.onFork(null));
1459             assertThrows(NullPointerException.class, () -> policy.onComplete(null));
1460             assertThrows(IllegalArgumentException.class, () -> policy.onFork(subtask1));
1461             assertFalse(policy.onFork(subtask2));
1462             assertFalse(policy.onComplete(subtask1));
1463             assertThrows(IllegalArgumentException.class, () -> policy.onComplete(subtask2));
1464         }
1465     }
1466 
1467     /**
1468      * Test for NullPointerException.
1469      */
1470     @Test
1471     void testNulls() throws Exception {
1472         assertThrows(NullPointerException.class,
1473                 () -> StructuredTaskScope.open(null));
1474         assertThrows(NullPointerException.class,
1475                 () -> StructuredTaskScope.open(null, cf -> cf));
1476         assertThrows(NullPointerException.class,
1477                 () -> StructuredTaskScope.open(Policy.ignoreAll(), null));
1478         assertThrows(NullPointerException.class,
1479                 () -> StructuredTaskScope.open(Policy.ignoreAll(), cf -> null));
1480 
1481         assertThrows(NullPointerException.class, () -> Policy.all(null));
1482 
1483         // fork
1484         try (var scope = StructuredTaskScope.open(Policy.ignoreAll())) {
1485             assertThrows(NullPointerException.class, () -> scope.fork((Callable<Object>) null));
1486             assertThrows(NullPointerException.class, () -> scope.fork((Runnable) null));
1487         }
1488 
1489         // withXXX
1490         assertThrows(NullPointerException.class,
1491                 () -> StructuredTaskScope.open(Policy.ignoreAll(), cf -> cf.withName(null)));
1492         assertThrows(NullPointerException.class,
1493                 () -> StructuredTaskScope.open(Policy.ignoreAll(), cf -> cf.withThreadFactory(null)));
1494         assertThrows(NullPointerException.class,
1495                 () -> StructuredTaskScope.open(Policy.ignoreAll(), cf -> cf.withTimeout(null)));
1496     }
1497 
1498     /**
1499      * ThreadFactory that counts usage.
1500      */
1501     private static class CountingThreadFactory implements ThreadFactory {
1502         final ThreadFactory delegate;
1503         final AtomicInteger threadCount = new AtomicInteger();
1504         CountingThreadFactory(ThreadFactory delegate) {
1505             this.delegate = delegate;
1506         }
1507         @Override
1508         public Thread newThread(Runnable task) {
1509             threadCount.incrementAndGet();
1510             return delegate.newThread(task);
1511         }
1512         int threadCount() {
1513             return threadCount.get();
1514         }
1515     }
1516 
1517     /**
1518      * Policy that cancels execution when a subtask completes.
1519      */
1520     private static class CancelAfterOnePolicy<T> implements Policy<T, Void> {
1521         final AtomicInteger onForkCount = new AtomicInteger();
1522         final AtomicInteger onCompleteCount = new AtomicInteger();
1523         @Override
1524         public boolean onFork(Subtask<? extends T> subtask) {
1525             onForkCount.incrementAndGet();
1526             return false;
1527         }
1528         @Override
1529         public boolean onComplete(Subtask<? extends T> subtask) {
1530             onCompleteCount.incrementAndGet();
1531             return true;
1532         }
1533         @Override
1534         public Void result() {
1535             return null;
1536         }
1537         int onForkCount() {
1538             return onForkCount.get();
1539         }
1540         int onCompleteCount() {
1541             return onCompleteCount.get();
1542         }
1543     };
1544 
1545     /**
1546      * A runtime exception for tests.
1547      */
1548     private static class FooException extends RuntimeException {
1549         FooException() { }
1550         FooException(Throwable cause) { super(cause); }
1551     }
1552 
1553     /**
1554      * Returns the current time in milliseconds.
1555      */
1556     private long millisTime() {
1557         long now = System.nanoTime();
1558         return TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);
1559     }
1560 
1561     /**
1562      * Check the duration of a task
1563      * @param start start time, in milliseconds
1564      * @param min minimum expected duration, in milliseconds
< prev index next >