< prev index next >

test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java

Print this page
@@ -1,7 +1,7 @@
  /*
-  * Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved.
+  * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved.
   * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   *
   * This code is free software; you can redistribute it and/or modify it
   * under the terms of the GNU General Public License version 2 only, as
   * published by the Free Software Foundation.

@@ -34,36 +34,34 @@
   * @enablePreview
   * @run junit/othervm -DthreadFactory=virtual StructuredTaskScopeTest
   */
  
  import java.time.Duration;
- import java.io.IOException;
- import java.time.Instant;
  import java.util.Arrays;
  import java.util.ArrayList;
  import java.util.List;
  import java.util.Set;
+ import java.util.NoSuchElementException;
  import java.util.concurrent.Callable;
  import java.util.concurrent.ConcurrentHashMap;
  import java.util.concurrent.CountDownLatch;
  import java.util.concurrent.Executors;
  import java.util.concurrent.ExecutionException;
  import java.util.concurrent.Future;
+ import java.util.concurrent.LinkedTransferQueue;
  import java.util.concurrent.ThreadFactory;
  import java.util.concurrent.TimeoutException;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.RejectedExecutionException;
  import java.util.concurrent.ScheduledExecutorService;
  import java.util.concurrent.StructuredTaskScope;
- import java.util.concurrent.StructuredTaskScope.Subtask;
- import java.util.concurrent.StructuredTaskScope.ShutdownOnSuccess;
- import java.util.concurrent.StructuredTaskScope.ShutdownOnFailure;
+ import java.util.concurrent.StructuredTaskScope.*;
  import java.util.concurrent.StructureViolationException;
  import java.util.concurrent.atomic.AtomicBoolean;
  import java.util.concurrent.atomic.AtomicInteger;
- import java.util.concurrent.atomic.AtomicReference;
- import java.util.function.Supplier;
+ import java.util.function.Function;
+ import java.util.function.Predicate;
  import java.util.stream.Stream;
  import static java.lang.Thread.State.*;
  
  import org.junit.jupiter.api.Test;
  import org.junit.jupiter.api.BeforeAll;

@@ -99,36 +97,23 @@
      private static Stream<ThreadFactory> factories() {
          return threadFactories.stream();
      }
  
      /**
-      * Test that fork creates a new thread for each task.
-      */
-     @ParameterizedTest
-     @MethodSource("factories")
-     void testForkCreatesThread(ThreadFactory factory) throws Exception {
-         Set<Long> tids = ConcurrentHashMap.newKeySet();
-         try (var scope = new StructuredTaskScope<Object>(null, factory)) {
-             for (int i = 0; i < 100; i++) {
-                 scope.fork(() -> {
-                     tids.add(Thread.currentThread().threadId());
-                     return null;
-                 });
-             }
-             scope.join();
-         }
-         assertEquals(100, tids.size());
-     }
- 
-     /**
-      * Test that fork creates a new virtual thread for each task.
+      * Test that fork creates virtual threads when no ThreadFactory is configured.
       */
      @Test
      void testForkCreateVirtualThread() throws Exception {
          Set<Thread> threads = ConcurrentHashMap.newKeySet();
-         try (var scope = new StructuredTaskScope<Object>()) {
-             for (int i = 0; i < 100; i++) {
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll())) {
+             for (int i = 0; i < 50; i++) {
+                 // runnable
+                 scope.fork(() -> {
+                     threads.add(Thread.currentThread());
+                 });
+ 
+                 // callable
                  scope.fork(() -> {
                      threads.add(Thread.currentThread());
                      return null;
                  });
              }

@@ -137,114 +122,122 @@
          assertEquals(100, threads.size());
          threads.forEach(t -> assertTrue(t.isVirtual()));
      }
  
      /**
-      * Test that fork creates a new thread with the given thread factory.
+      * Test that fork create threads with the configured ThreadFactory.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testForkUsesFactory(ThreadFactory factory) throws Exception {
-         var count = new AtomicInteger();
-         ThreadFactory countingFactory = task -> {
-             count.incrementAndGet();
-             return factory.newThread(task);
-         };
-         try (var scope = new StructuredTaskScope<Object>(null, countingFactory)) {
-             for (int i = 0; i < 100; i++) {
-                 scope.fork(() -> null);
+     void testForkUsesThreadFactory(ThreadFactory factory) throws Exception {
+         // TheadFactory that keeps reference to all threads it creates
+         class RecordingThreadFactory implements ThreadFactory {
+             final ThreadFactory delegate;
+             final Set<Thread> threads = ConcurrentHashMap.newKeySet();
+             RecordingThreadFactory(ThreadFactory delegate) {
+                 this.delegate = delegate;
+             }
+             @Override
+             public Thread newThread(Runnable task) {
+                 Thread thread = delegate.newThread(task);
+                 threads.add(thread);
+                 return thread;
+             }
+             Set<Thread> threads() {
+                 return threads;
+             }
+         }
+         var recordingThreadFactory = new RecordingThreadFactory(factory);
+         Set<Thread> threads = ConcurrentHashMap.newKeySet();
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
+                 cf -> cf.withThreadFactory(recordingThreadFactory))) {
+ 
+             for (int i = 0; i < 50; i++) {
+                 // runnable
+                 scope.fork(() -> {
+                     threads.add(Thread.currentThread());
+                 });
+ 
+                 // callable
+                 scope.fork(() -> {
+                     threads.add(Thread.currentThread());
+                     return null;
+                 });
              }
              scope.join();
          }
-         assertEquals(100, count.get());
+         assertEquals(100, threads.size());
+         assertEquals(recordingThreadFactory.threads(), threads);
      }
  
      /**
-      * Test fork is confined to threads in the scope "tree".
+      * Test fork is owner confined.
       */
      @ParameterizedTest
      @MethodSource("factories")
      void testForkConfined(ThreadFactory factory) throws Exception {
-         try (var scope1 = new StructuredTaskScope<Boolean>();
-              var scope2 = new StructuredTaskScope<Boolean>()) {
+         try (var scope = StructuredTaskScope.open(Policy.<Boolean>ignoreAll(),
+                 cf -> cf.withThreadFactory(factory))) {
  
-             // thread in scope1 cannot fork thread in scope2
-             Subtask<Boolean> subtask1 = scope1.fork(() -> {
+             // thread in scope cannot fork
+             Subtask<Boolean> subtask = scope.fork(() -> {
                  assertThrows(WrongThreadException.class, () -> {
-                     scope2.fork(() -> null);
+                     scope.fork(() -> null);
                  });
                  return true;
              });
- 
-             // thread in scope2 can fork thread in scope1
-             Subtask<Boolean> subtask2 = scope2.fork(() -> {
-                 scope1.fork(() -> null);
-                 return true;
-             });
- 
-             scope2.join();
-             scope1.join();
- 
-             assertTrue(subtask1.get());
-             assertTrue(subtask2.get());
+             scope.join();
+             assertTrue(subtask.get());
  
              // random thread cannot fork
              try (var pool = Executors.newSingleThreadExecutor()) {
                  Future<Void> future = pool.submit(() -> {
                      assertThrows(WrongThreadException.class, () -> {
-                         scope1.fork(() -> null);
-                     });
-                     assertThrows(WrongThreadException.class, () -> {
-                         scope2.fork(() -> null);
+                         scope.fork(() -> null);
                      });
                      return null;
                  });
                  future.get();
              }
          }
      }
  
      /**
-      * Test fork after join completes.
+      * Test fork after join, no subtasks forked before join.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testForkAfterJoin(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope<String>(null, factory)) {
-             // round 1
-             var subtask1 = scope.fork(() -> "foo");
-             assertThrows(IllegalStateException.class, subtask1::get);
-             scope.join();
-             assertEquals("foo", subtask1.get());
- 
-             // round 2
-             var subtask2 = scope.fork(() -> "bar");
-             assertEquals("foo", subtask1.get());
-             assertThrows(IllegalStateException.class, subtask2::get);
+     void testForkAfterJoin1(ThreadFactory factory) throws Exception {
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
+                 cf -> cf.withThreadFactory(factory))) {
              scope.join();
-             assertEquals("foo", subtask1.get());
-             assertEquals("bar", subtask2.get());
+             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
+         }
+     }
  
-             // round 3
-             var subtask3 = scope.fork(() -> "baz");
-             assertEquals("foo", subtask1.get());
-             assertEquals("bar", subtask2.get());
-             assertThrows(IllegalStateException.class, subtask3::get);
+     /**
+      * Test fork after join, subtasks forked before join.
+      */
+     @ParameterizedTest
+     @MethodSource("factories")
+     void testForkAfterJoin2(ThreadFactory factory) throws Exception {
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
+                 cf -> cf.withThreadFactory(factory))) {
+             scope.fork(() -> "foo");
              scope.join();
-             assertEquals("foo", subtask1.get());
-             assertEquals("bar", subtask2.get());
-             assertEquals("baz", subtask3.get());
+             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
          }
      }
  
      /**
       * Test fork after join throws.
       */
      @ParameterizedTest
      @MethodSource("factories")
      void testForkAfterJoinThrows(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope<String>(null, factory)) {
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
+                 cf -> cf.withThreadFactory(factory))) {
              var latch = new CountDownLatch(1);
              var subtask1 = scope.fork(() -> {
                  latch.await();
                  return "foo";
              });

@@ -256,96 +249,133 @@
              // allow subtask1 to finish
              latch.countDown();
  
              // continue to fork
              var subtask2 = scope.fork(() -> "bar");
-             assertThrows(IllegalStateException.class, subtask1::get);
-             assertThrows(IllegalStateException.class, subtask2::get);
              scope.join();
              assertEquals("foo", subtask1.get());
              assertEquals("bar", subtask2.get());
          }
      }
  
      /**
-      * Test fork after scope is shutdown.
+      * Test fork after task scope is cancelled.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testForkAfterShutdown(ThreadFactory factory) throws Exception {
-         var executed = new AtomicBoolean();
-         try (var scope = new StructuredTaskScope<Object>(null, factory)) {
-             scope.shutdown();
-             Subtask<String> subtask = scope.fork(() -> {
-                 executed.set(true);
-                 return null;
-             });
+     void testForkAfterCancel(ThreadFactory factory) throws Exception {
+         var countingThreadFactory = new CountingThreadFactory(factory);
+         var testPolicy = new CancelAfterOnePolicy<String>();
+ 
+         try (var scope = StructuredTaskScope.open(testPolicy,
+                 cf -> cf.withThreadFactory(countingThreadFactory))) {
+ 
+             // fork subtask, the scope should be cancelled when the subtask completes
+             var subtask1 = scope.fork(() -> "foo");
+             while (!scope.isCancelled()) {
+                 Thread.sleep(20);
+             }
+ 
+             assertEquals(1, countingThreadFactory.threadCount());
+             assertEquals(1, testPolicy.onForkCount());
+             assertEquals(1, testPolicy.onCompleteCount());
+ 
+             // fork second subtask, it should not run
+             var subtask2 = scope.fork(() -> "bar");
+ 
+             // onFork should be invoked, newThread and onComplete should not be invoked
+             assertEquals(1, countingThreadFactory.threadCount());
+             assertEquals(2, testPolicy.onForkCount());
+             assertEquals(1, testPolicy.onCompleteCount());
+ 
              scope.join();
-             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
-             assertThrows(IllegalStateException.class, subtask::get);
-             assertThrows(IllegalStateException.class, subtask::exception);
+ 
+             assertEquals(1, countingThreadFactory.threadCount());
+             assertEquals(2, testPolicy.onForkCount());
+             assertEquals(1, testPolicy.onCompleteCount());
+             assertEquals("foo", subtask1.get());
+             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
          }
-         assertFalse(executed.get());
      }
  
      /**
-      * Test fork after scope is closed.
+      * Test fork after task scope is closed.
       */
      @ParameterizedTest
      @MethodSource("factories")
      void testForkAfterClose(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope<Object>(null, factory)) {
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
+                 cf -> cf.withThreadFactory(factory))) {
              scope.close();
              assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
          }
      }
  
      /**
-      * Test fork when the thread factory rejects creating a thread.
+      * Test fork when the ThreadFactory rejects creating a thread.
       */
      @Test
      void testForkRejectedExecutionException() throws Exception {
          ThreadFactory factory = task -> null;
-         try (var scope = new StructuredTaskScope(null, factory)) {
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
+                 cf -> cf.withThreadFactory(factory))) {
              assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null));
-             scope.join();
          }
      }
  
      /**
       * Test join with no subtasks.
       */
      @Test
      void testJoinWithNoSubtasks() throws Exception {
-         try (var scope = new StructuredTaskScope()) {
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll())) {
              scope.join();
          }
      }
  
      /**
-      * Test join with unfinished subtasks.
+      * Test join with a remaining subtask.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testJoinWithSubtasks(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope(null, factory)) {
+     void testJoinWithRemainingSubtasks(ThreadFactory factory) throws Exception {
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
+                 cf -> cf.withThreadFactory(factory))) {
              Subtask<String> subtask = scope.fork(() -> {
-                 Thread.sleep(Duration.ofMillis(50));
+                 Thread.sleep(Duration.ofMillis(100));
                  return "foo";
              });
              scope.join();
              assertEquals("foo", subtask.get());
          }
      }
  
+     /**
+      * Test repeated calls to join.
+      */
+     @Test
+     void testJoinAfterJoin() throws Exception {
+         var results = new LinkedTransferQueue<>(List.of("foo", "bar", "baz"));
+         Policy<Object, String> policy = results::take;
+         try (var scope = StructuredTaskScope.open(policy)) {
+             scope.fork(() -> "foo");
+ 
+             // each call to join should invoke Policy::result
+             assertEquals("foo", scope.join());
+             assertEquals("bar", scope.join());
+             assertEquals("baz", scope.join());
+         }
+     }
+ 
      /**
       * Test join is owner confined.
       */
      @ParameterizedTest
      @MethodSource("factories")
      void testJoinConfined(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope<Boolean>()) {
+         try (var scope = StructuredTaskScope.open(Policy.<Boolean>ignoreAll(),
+                 cf -> cf.withThreadFactory(factory))) {
  
              // thread in scope cannot join
              Subtask<Boolean> subtask = scope.fork(() -> {
                  assertThrows(WrongThreadException.class, () -> { scope.join(); });
                  return true;

@@ -370,11 +400,13 @@
       * Test join with interrupt status set.
       */
      @ParameterizedTest
      @MethodSource("factories")
      void testInterruptJoin1(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope(null, factory)) {
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
+                 cf -> cf.withThreadFactory(factory))) {
+ 
              var latch = new CountDownLatch(1);
  
              Subtask<String> subtask = scope.fork(() -> {
                  latch.await();
                  return "foo";

@@ -384,11 +416,11 @@
              Thread.currentThread().interrupt();
              try {
                  scope.join();
                  fail("join did not throw");
              } catch (InterruptedException expected) {
-                 assertFalse(Thread.interrupted());   // interrupt status should be clear
+                 assertFalse(Thread.interrupted());   // interrupt status should be cleared
              } finally {
                  // let task continue
                  latch.countDown();
              }
  

@@ -402,11 +434,13 @@
       * Test interrupt of thread blocked in join.
       */
      @ParameterizedTest
      @MethodSource("factories")
      void testInterruptJoin2(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope(null, factory)) {
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
+                 cf -> cf.withThreadFactory(factory))) {
+ 
              var latch = new CountDownLatch(1);
              Subtask<String> subtask = scope.fork(() -> {
                  latch.await();
                  return "foo";
              });

@@ -428,514 +462,278 @@
              assertEquals("foo", subtask.get());
          }
      }
  
      /**
-      * Test join when scope is shutdown.
+      * Test join when scope is cancelled.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testJoinWithShutdown1(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope<String>(null, factory)) {
-             var interrupted = new CountDownLatch(1);
-             var finish = new CountDownLatch(1);
- 
-             Subtask<String> subtask = scope.fork(() -> {
-                 try {
-                     Thread.sleep(Duration.ofDays(1));
-                 } catch (InterruptedException e) {
-                     interrupted.countDown();
-                 }
-                 finish.await();
-                 return "foo";
-             });
- 
-             scope.shutdown();      // should interrupt task
+     void testJoinWhenCancelled(ThreadFactory factory) throws Exception {
+         var countingThreadFactory = new CountingThreadFactory(factory);
+         var testPolicy = new CancelAfterOnePolicy<String>();
  
-             interrupted.await();
- 
-             scope.join();
- 
-             // signal task to finish
-             finish.countDown();
-         }
-     }
+         try (var scope = StructuredTaskScope.open(testPolicy,
+                     cf -> cf.withThreadFactory(countingThreadFactory))) {
  
-     /**
-      * Test shutdown when owner is blocked in join.
-      */
-     @ParameterizedTest
-     @MethodSource("factories")
-     void testJoinWithShutdown2(ThreadFactory factory) throws Exception {
-         class MyScope<T> extends StructuredTaskScope<T> {
-             MyScope(ThreadFactory factory) {
-                 super(null, factory);
-             }
-             @Override
-             protected void handleComplete(Subtask<? extends T> subtask) {
-                 shutdown();
+             // fork subtask, the scope should be cancelled when the subtask completes
+             var subtask1 = scope.fork(() -> "foo");
+             while (!scope.isCancelled()) {
+                 Thread.sleep(20);
              }
-         }
  
-         try (var scope = new MyScope<String>(factory)) {
-             Subtask<String> subtask1 = scope.fork(() -> {
-                 Thread.sleep(Duration.ofMillis(50));
-                 return "foo";
-             });
-             Subtask<String> subtask2 = scope.fork(() -> {
+             // fork second subtask, it should not run
+             var subtask2 = scope.fork(() -> {
                  Thread.sleep(Duration.ofDays(1));
                  return "bar";
              });
  
-             // join should wakeup when shutdown is called
              scope.join();
  
-             // task1 should have completed successfully
-             assertEquals(Subtask.State.SUCCESS, subtask1.state());
              assertEquals("foo", subtask1.get());
-             assertThrows(IllegalStateException.class, subtask1::exception);
- 
-             // task2 result/exception not available
              assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
-             assertThrows(IllegalStateException.class, subtask2::get);
-             assertThrows(IllegalStateException.class, subtask2::exception);
          }
      }
  
      /**
       * Test join after scope is closed.
       */
      @Test
      void testJoinAfterClose() throws Exception {
-         try (var scope = new StructuredTaskScope()) {
-             scope.join();
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll())) {
              scope.close();
              assertThrows(IllegalStateException.class, () -> scope.join());
-             assertThrows(IllegalStateException.class, () -> scope.joinUntil(Instant.now()));
          }
      }
  
      /**
-      * Test joinUntil, subtasks finish before deadline expires.
+      * Test join with timeout, subtasks finish before timeout expires.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testJoinUntil1(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope<String>(null, factory)) {
+     void testJoinWithTimeout1(ThreadFactory factory) throws Exception {
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
+                 cf -> cf.withThreadFactory(factory)
+                         .withTimeout(Duration.ofDays(1)))) {
+ 
              Subtask<String> subtask = scope.fork(() -> {
-                 try {
-                     Thread.sleep(Duration.ofSeconds(2));
-                 } catch (InterruptedException e) { }
+                 Thread.sleep(Duration.ofSeconds(1));
                  return "foo";
              });
  
-             long startMillis = millisTime();
-             scope.joinUntil(Instant.now().plusSeconds(30));
-             expectDuration(startMillis, /*min*/1900, /*max*/20_000);
+             scope.join();
+ 
+             assertFalse(scope.isCancelled());
              assertEquals("foo", subtask.get());
          }
      }
  
      /**
-      * Test joinUntil, deadline expires before subtasks finish.
+      * Test join with timeout, timeout expires before subtasks finish.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testJoinUntil2(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope<Object>(null, factory)) {
+     void testJoinWithTimeout2(ThreadFactory factory) throws Exception {
+         long startMillis = millisTime();
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
+                 cf -> cf.withThreadFactory(factory)
+                         .withTimeout(Duration.ofSeconds(2)))) {
+ 
              Subtask<Void> subtask = scope.fork(() -> {
                  Thread.sleep(Duration.ofDays(1));
                  return null;
              });
  
-             long startMillis = millisTime();
              try {
-                 scope.joinUntil(Instant.now().plusSeconds(2));
-             } catch (TimeoutException e) {
-                 expectDuration(startMillis, /*min*/1900, /*max*/20_000);
+                 scope.join();
+                 fail();
+             } catch (ExecutionException e) {
+                 assertTrue(e.getCause() instanceof TimeoutException);
              }
+             expectDuration(startMillis, /*min*/1900, /*max*/20_000);
+ 
+             assertTrue(scope.isCancelled());
              assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
          }
      }
  
      /**
-      * Test joinUntil many times.
+      * Test join with timeout that has already expired.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testJoinUntil3(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope<String>(null, factory)) {
-             Subtask<String> subtask = scope.fork(() -> {
-                 Thread.sleep(Duration.ofDays(1));
-                 return null;
-             });
- 
-             for (int i = 0; i < 3; i++) {
-                 try {
-                     scope.joinUntil(Instant.now().plusMillis(50));
-                     fail("joinUntil did not throw");
-                 } catch (TimeoutException expected) {
-                     assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
-                 }
-             }
-         }
-     }
+     void testJoinWithTimeout3(ThreadFactory factory) throws Exception {
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
+                 cf -> cf.withThreadFactory(factory)
+                         .withTimeout(Duration.ofSeconds(-1)))) {
  
-     /**
-      * Test joinUntil with a deadline that has already expired.
-      */
-     @ParameterizedTest
-     @MethodSource("factories")
-     void testJoinUntil4(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope<Object>(null, factory)) {
              Subtask<Void> subtask = scope.fork(() -> {
                  Thread.sleep(Duration.ofDays(1));
                  return null;
              });
  
-             // now
-             try {
-                 scope.joinUntil(Instant.now());
-                 fail("joinUntil did not throw");
-             } catch (TimeoutException expected) {
-                 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
-             }
- 
-             // in the past
              try {
-                 scope.joinUntil(Instant.now().minusSeconds(1));
-                 fail("joinUntil did not throw");
-             } catch (TimeoutException expected) {
-                 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
+                 scope.join();
+                 fail();
+             } catch (ExecutionException e) {
+                 assertTrue(e.getCause() instanceof TimeoutException);
              }
+             assertTrue(scope.isCancelled());
+             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
          }
      }
  
      /**
-      * Test joinUntil with interrupt status set.
+      * Test that cancelling exceutions interrupts unfinished threads.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testInterruptJoinUntil1(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope<String>(null, factory)) {
-             var latch = new CountDownLatch(1);
+     void testCancellationInterruptsThreads(ThreadFactory factory) throws Exception {
+         var testPolicy = new CancelAfterOnePolicy<String>();
  
-             Subtask<String> subtask = scope.fork(() -> {
-                 latch.await();
-                 return "foo";
+         try (var scope = StructuredTaskScope.open(testPolicy,
+                 cf -> cf.withThreadFactory(factory))) {
+ 
+             // fork subtask1 that runs for a long time
+             var started = new CountDownLatch(1);
+             var interrupted = new CountDownLatch(1);
+             var subtask1 = scope.fork(() -> {
+                 started.countDown();
+                 try {
+                     Thread.sleep(Duration.ofDays(1));
+                 } catch (InterruptedException e) {
+                     interrupted.countDown();
+                 }
              });
+             started.await();
  
-             // joinUntil should throw
-             Thread.currentThread().interrupt();
-             try {
-                 scope.joinUntil(Instant.now().plusSeconds(30));
-                 fail("joinUntil did not throw");
-             } catch (InterruptedException expected) {
-                 assertFalse(Thread.interrupted());   // interrupt status should be clear
-             } finally {
-                 // let task continue
-                 latch.countDown();
+             // fork subtask2, the scope should be cancelled when the subtask completes
+             var subtask2 = scope.fork(() -> "bar");
+             while (!scope.isCancelled()) {
+                 Thread.sleep(20);
              }
  
-             // join should complete
+             // subtask1 should be interrupted
+             interrupted.await();
+ 
              scope.join();
-             assertEquals("foo", subtask.get());
+ 
+             assertEquals(Subtask.State.UNAVAILABLE, subtask1.state());
+             assertEquals("bar", subtask2.get());
          }
      }
  
      /**
-      * Test interrupt of thread blocked in joinUntil.
+      * Test that timeout interrupts unfinished threads.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testInterruptJoinUntil2(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope(null, factory)) {
-             var latch = new CountDownLatch(1);
+     void testTimeoutInterruptsThreads(ThreadFactory factory) throws Exception {
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
+                 cf -> cf.withThreadFactory(factory)
+                         .withTimeout(Duration.ofSeconds(2)))) {
  
-             Subtask<String> subtask = scope.fork(() -> {
-                 latch.await();
-                 return "foo";
+             var started = new AtomicBoolean();
+             var interrupted = new CountDownLatch(1);
+             Subtask<Void> subtask = scope.fork(() -> {
+                 started.set(true);
+                 try {
+                     Thread.sleep(Duration.ofDays(1));
+                 } catch (InterruptedException e) {
+                     interrupted.countDown();
+                 }
+                 return null;
              });
  
-             // joinUntil should throw
-             scheduleInterruptAt("java.util.concurrent.StructuredTaskScope.joinUntil");
+             while (!scope.isCancelled()) {
+                 Thread.sleep(50);
+             }
+ 
+             // if subtask started then it should be interrupted
+             if (started.get()) {
+                 interrupted.await();
+             }
+ 
              try {
-                 scope.joinUntil(Instant.now().plusSeconds(30));
-                 fail("joinUntil did not throw");
-             } catch (InterruptedException expected) {
-                 assertFalse(Thread.interrupted());   // interrupt status should be clear
-             } finally {
-                 // let task continue
-                 latch.countDown();
+                 scope.join();
+                 fail();
+             } catch (ExecutionException e) {
+                 assertTrue(e.getCause() instanceof TimeoutException);
              }
+             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
+         }
+     }
  
-             // join should complete
-             scope.join();
-             assertEquals("foo", subtask.get());
+     /**
+      * Test close without join, no subtasks forked.
+      */
+     @Test
+     void testCloseWithoutJoin1() {
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll())) {
+             // do nothing
          }
      }
  
      /**
-      * Test that shutdown interrupts unfinished subtasks.
+      * Test close without join, subtasks forked.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testShutdownInterruptsThreads1(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope<Object>(null, factory)) {
-             var interrupted = new AtomicBoolean();
-             var latch = new CountDownLatch(1);
-             var subtask = scope.fork(() -> {
-                 try {
-                     Thread.sleep(Duration.ofDays(1));
-                 } catch (InterruptedException e) {
-                     interrupted.set(true);
-                 } finally {
-                     latch.countDown();
-                 }
+     void testCloseWithoutJoin2(ThreadFactory factory) {
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
+                 cf -> cf.withThreadFactory(factory))) {
+             Subtask<String> subtask = scope.fork(() -> {
+                 Thread.sleep(Duration.ofDays(1));
                  return null;
              });
- 
-             scope.shutdown();
- 
-             // wait for task to complete
-             latch.await();
-             assertTrue(interrupted.get());
- 
-             scope.join();
+             assertThrows(IllegalStateException.class, scope::close);
  
              // subtask result/exception not available
              assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
              assertThrows(IllegalStateException.class, subtask::get);
              assertThrows(IllegalStateException.class, subtask::exception);
          }
      }
  
      /**
-      * Test that shutdown does not interrupt current thread.
+      * Test close after join throws. Close should not throw as join attempted.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testShutdownInterruptsThreads2(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope<Object>(null, factory)) {
-             var interrupted = new AtomicBoolean();
-             var latch = new CountDownLatch(1);
+     void testCloseAfterJoinThrows(ThreadFactory factory) throws Exception {
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
+                 cf -> cf.withThreadFactory(factory))) {
              var subtask = scope.fork(() -> {
-                 try {
-                     scope.shutdown();
-                     interrupted.set(Thread.currentThread().isInterrupted());
-                 } finally {
-                     latch.countDown();
-                 }
+                 Thread.sleep(Duration.ofDays(1));
                  return null;
              });
  
-             // wait for task to complete
-             latch.await();
-             assertFalse(interrupted.get());
+             // join throws
+             Thread.currentThread().interrupt();
+             assertThrows(InterruptedException.class, scope::join);
+             assertThrows(IllegalStateException.class, subtask::get);
  
-             scope.join();
-         }
+         }  // close should not throw
      }
  
      /**
-      * Test shutdown wakes join.
+      * Test close is owner confined.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testShutdownWakesJoin(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope<Object>(null, factory)) {
-             var latch = new CountDownLatch(1);
-             scope.fork(() -> {
-                 Thread.sleep(Duration.ofMillis(100));  // give time for join to block
-                 scope.shutdown();
-                 latch.await();
-                 return null;
+     void testCloseConfined(ThreadFactory factory) throws Exception {
+         try (var scope = StructuredTaskScope.open(Policy.<Boolean>ignoreAll(),
+                 cf -> cf.withThreadFactory(factory))) {
+ 
+             // attempt to close from thread in scope
+             Subtask<Boolean> subtask = scope.fork(() -> {
+                 assertThrows(WrongThreadException.class, scope::close);
+                 return true;
              });
  
              scope.join();
- 
-             // join woke up, allow task to complete
-             latch.countDown();
-         }
-     }
- 
-     /**
-      * Test shutdown after scope is closed.
-      */
-     @Test
-     void testShutdownAfterClose() throws Exception {
-         try (var scope = new StructuredTaskScope<Object>()) {
-             scope.join();
-             scope.close();
-             assertThrows(IllegalStateException.class, scope::shutdown);
-         }
-     }
- 
-     /**
-      * Test shutdown is confined to threads in the scope "tree".
-      */
-     @ParameterizedTest
-     @MethodSource("factories")
-     void testShutdownConfined(ThreadFactory factory) throws Exception {
-         try (var scope1 = new StructuredTaskScope<Boolean>();
-              var scope2 = new StructuredTaskScope<Boolean>()) {
- 
-             // thread in scope1 cannot shutdown scope2
-             Subtask<Boolean> subtask1 = scope1.fork(() -> {
-                 assertThrows(WrongThreadException.class, scope2::shutdown);
-                 return true;
-             });
- 
-             // wait for task in scope1 to complete to avoid racing with task in scope2
-             while (subtask1.state() == Subtask.State.UNAVAILABLE) {
-                 Thread.sleep(10);
-             }
- 
-             // thread in scope2 shutdown scope1
-             Subtask<Boolean> subtask2 = scope2.fork(() -> {
-                 scope1.shutdown();
-                 return true;
-             });
- 
-             scope2.join();
-             scope1.join();
- 
-             assertTrue(subtask1.get());
-             assertTrue(subtask1.get());
- 
-             // random thread cannot shutdown
-             try (var pool = Executors.newSingleThreadExecutor()) {
-                 Future<Void> future = pool.submit(() -> {
-                     assertThrows(WrongThreadException.class, scope1::shutdown);
-                     assertThrows(WrongThreadException.class, scope2::shutdown);
-                     return null;
-                 });
-                 future.get();
-             }
-         }
-     }
- 
-     /**
-      * Test isShutdown.
-      */
-     @Test
-     void testIsShutdown() {
-         try (var scope = new StructuredTaskScope<Object>()) {
-             assertFalse(scope.isShutdown());   // before shutdown
-             scope.shutdown();
-             assertTrue(scope.isShutdown());    // after shutdown
-             scope.close();
-             assertTrue(scope.isShutdown());    // after cose
-         }
-     }
- 
-     /**
-      * Test close without join, no subtasks forked.
-      */
-     @Test
-     void testCloseWithoutJoin1() {
-         try (var scope = new StructuredTaskScope<Object>()) {
-             // do nothing
-         }
-     }
- 
-     /**
-      * Test close without join, unfinished subtasks.
-      */
-     @ParameterizedTest
-     @MethodSource("factories")
-     void testCloseWithoutJoin2(ThreadFactory factory) {
-         try (var scope = new StructuredTaskScope<String>(null, factory)) {
-             Subtask<String> subtask = scope.fork(() -> {
-                 Thread.sleep(Duration.ofDays(1));
-                 return null;
-             });
-             assertThrows(IllegalStateException.class, scope::close);
- 
-             // subtask result/exception not available
-             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
-             assertThrows(IllegalStateException.class, subtask::get);
-             assertThrows(IllegalStateException.class, subtask::exception);
-         }
-     }
- 
-     /**
-      * Test close without join, unfinished subtasks forked after join.
-      */
-     @ParameterizedTest
-     @MethodSource("factories")
-     void testCloseWithoutJoin3(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope(null, factory)) {
-             scope.fork(() -> "foo");
-             scope.join();
- 
-             Subtask<String> subtask = scope.fork(() -> {
-                 Thread.sleep(Duration.ofDays(1));
-                 return null;
-             });
-             assertThrows(IllegalStateException.class, scope::close);
- 
-             // subtask result/exception not available
-             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
-             assertThrows(IllegalStateException.class, subtask::get);
-             assertThrows(IllegalStateException.class, subtask::exception);
-         }
-     }
- 
-     /**
-      * Test close after join throws. Close should not throw as join attempted.
-      */
-     @ParameterizedTest
-     @MethodSource("factories")
-     void testCloseAfterJoinThrows(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope<Object>()) {
-             var subtask = scope.fork(() -> {
-                 Thread.sleep(Duration.ofDays(1));
-                 return null;
-             });
- 
-             // join throws
-             Thread.currentThread().interrupt();
-             assertThrows(InterruptedException.class, scope::join);
-             assertThrows(IllegalStateException.class, subtask::get);
-         }
-     }
- 
-     /**
-      * Test close after joinUntil throws. Close should not throw as join attempted.
-      */
-     @ParameterizedTest
-     @MethodSource("factories")
-     void testCloseAfterJoinUntilThrows(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope<Object>()) {
-             var subtask = scope.fork(() -> {
-                 Thread.sleep(Duration.ofDays(1));
-                 return null;
-             });
- 
-             // joinUntil throws
-             assertThrows(TimeoutException.class, () -> scope.joinUntil(Instant.now()));
-             assertThrows(IllegalStateException.class, subtask::get);
-         }
-     }
- 
-     /**
-      * Test close is owner confined.
-      */
-     @ParameterizedTest
-     @MethodSource("factories")
-     void testCloseConfined(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope<Boolean>()) {
- 
-             // attempt to close from thread in scope
-             Subtask<Boolean> subtask = scope.fork(() -> {
-                 assertThrows(WrongThreadException.class, scope::close);
-                 return true;
-             });
- 
-             scope.join();
-             assertTrue(subtask.get());
+             assertTrue(subtask.get());
  
              // random thread cannot close scope
              try (var pool = Executors.newCachedThreadPool(factory)) {
                  Future<Boolean> future = pool.submit(() -> {
                      assertThrows(WrongThreadException.class, scope::close);

@@ -950,24 +748,36 @@
       * Test close with interrupt status set.
       */
      @ParameterizedTest
      @MethodSource("factories")
      void testInterruptClose1(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope<Object>(null, factory)) {
+         var testPolicy = new CancelAfterOnePolicy<String>();
+         try (var scope = StructuredTaskScope.open(testPolicy,
+                 cf -> cf.withThreadFactory(factory))) {
+ 
+             // fork first subtask, a straggler as it continues after being interrupted
+             var started = new CountDownLatch(1);
              var done = new AtomicBoolean();
              scope.fork(() -> {
+                 started.countDown();
                  try {
                      Thread.sleep(Duration.ofDays(1));
                  } catch (InterruptedException e) {
-                     // interrupted by shutdown, expected
+                     // interrupted by cancel, expected
                  }
                  Thread.sleep(Duration.ofMillis(100)); // force close to wait
                  done.set(true);
                  return null;
              });
+             started.await();
+ 
+             // fork second subtask, the scope should be cancelled when this subtask completes
+             scope.fork(() -> "bar");
+             while (!scope.isCancelled()) {
+                 Thread.sleep(20);
+             }
  
-             scope.shutdown();
              scope.join();
  
              // invoke close with interrupt status set
              Thread.currentThread().interrupt();
              try {

@@ -983,285 +793,171 @@
       * Test interrupting thread waiting in close.
       */
      @ParameterizedTest
      @MethodSource("factories")
      void testInterruptClose2(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope<Object>(null, factory)) {
-             var done = new AtomicBoolean();
+         var testPolicy = new CancelAfterOnePolicy<String>();
+         try (var scope = StructuredTaskScope.open(testPolicy,
+                 cf -> cf.withThreadFactory(factory))) {
+ 
              Thread mainThread = Thread.currentThread();
+ 
+             // fork first subtask, a straggler as it continues after being interrupted
+             var started = new CountDownLatch(1);
+             var done = new AtomicBoolean();
              scope.fork(() -> {
+                 started.countDown();
                  try {
                      Thread.sleep(Duration.ofDays(1));
                  } catch (InterruptedException e) {
-                     // interrupted by shutdown, expected
+                     // interrupted by cancel, expected
                  }
  
                  // interrupt main thread when it blocks in close
                  interruptThreadAt(mainThread, "java.util.concurrent.StructuredTaskScope.close");
  
                  Thread.sleep(Duration.ofMillis(100)); // force close to wait
                  done.set(true);
                  return null;
              });
+             started.await();
+ 
+             // fork second subtask, the scope should be cancelled when this subtask completes
+             scope.fork(() -> "bar");
+             while (!scope.isCancelled()) {
+                 Thread.sleep(20);
+             }
  
-             scope.shutdown();   // interrupts task
              scope.join();
+ 
+             // main thread will be interrupted while blocked in close
              try {
                  scope.close();
              } finally {
-                 assertTrue(Thread.interrupted()); // clear interrupt status
+                 assertTrue(Thread.interrupted());   // clear interrupt status
                  assertTrue(done.get());
              }
          }
      }
  
      /**
       * Test that closing an enclosing scope closes the thread flock of a nested scope.
       */
      @Test
      void testCloseThrowsStructureViolation() throws Exception {
-         try (var scope1 = new StructuredTaskScope<Object>()) {
-             try (var scope2 = new StructuredTaskScope<Object>()) {
+         try (var scope1 = StructuredTaskScope.open(Policy.ignoreAll())) {
+             try (var scope2 = StructuredTaskScope.open(Policy.ignoreAll())) {
  
-                 // join + close enclosing scope
-                 scope1.join();
+                 // close enclosing scope
                  try {
                      scope1.close();
                      fail("close did not throw");
                  } catch (StructureViolationException expected) { }
  
-                 // underlying flock should be closed, fork should return a cancelled task
+                 // underlying flock should be closed
                  var executed = new AtomicBoolean();
-                 Subtask<Void> subtask = scope2.fork(() -> {
-                     executed.set(true);
-                     return null;
-                 });
+                 Subtask<?> subtask = scope2.fork(() -> executed.set(true));
                  assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
                  scope2.join();
                  assertFalse(executed.get());
+                 assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
              }
          }
      }
  
      /**
-      * A StructuredTaskScope that collects the subtasks notified to the handleComplete method.
-      */
-     private static class CollectAll<T> extends StructuredTaskScope<T> {
-         private final Set<Subtask<? extends T>> subtasks = ConcurrentHashMap.newKeySet();
- 
-         CollectAll(ThreadFactory factory) {
-             super(null, factory);
-         }
- 
-         @Override
-         protected void handleComplete(Subtask<? extends T> subtask) {
-             subtasks.add(subtask);
-         }
- 
-         Set<Subtask<? extends T>> subtasks() {
-             return subtasks;
-         }
- 
-         Subtask<? extends T> find(Callable<T> task) {
-             return subtasks.stream()
-                     .filter(h -> task.equals(h.task()))
-                     .findAny()
-                     .orElseThrow();
-         }
-     }
- 
-     /**
-      * Test that handleComplete method is invoked for tasks that complete before shutdown.
-      */
-     @ParameterizedTest
-     @MethodSource("factories")
-     void testHandleCompleteBeforeShutdown(ThreadFactory factory) throws Exception {
-         try (var scope = new CollectAll<String>(factory)) {
-             Callable<String> task1 = () -> "foo";
-             Callable<String> task2 = () -> { throw new FooException(); };
-             scope.fork(task1);
-             scope.fork(task2);
-             scope.join();
- 
-             var subtask1 = scope.find(task1);
-             assertEquals("foo", subtask1.get());
- 
-             var subtask2 = scope.find(task2);
-             assertTrue(subtask2.exception() instanceof FooException);
-         }
-     }
- 
-     /**
-      * Test that handleComplete method is not invoked for tasks that finish after shutdown
-      * or are forked after shutdown.
+      * Test that isCancelled returns true after close.
       */
-     @ParameterizedTest
-     @MethodSource("factories")
-     void testHandleCompleteAfterShutdown(ThreadFactory factory) throws Exception {
-         try (var scope = new CollectAll<String>(factory)) {
-             Callable<String> task1 = () -> {
-                 try {
-                     Thread.sleep(Duration.ofDays(1));
-                 } catch (InterruptedException ignore) { }
-                 return "foo";
-             };
-             Callable<String> task2 = () -> {
-                 Thread.sleep(Duration.ofDays(1));
-                 return "bar";
-             };
-             Callable<String> task3 = () -> "baz";
- 
-             // forked before shutdown, will complete after shutdown
-             var subtask1 = scope.fork(task1);
-             var subtask2 = scope.fork(task2);
- 
-             scope.shutdown();
- 
-             // forked after shutdown
-             var subtask3 = scope.fork(task3);
- 
-             scope.join();
- 
-             // handleComplete should not be called
-             for (int i = 0; i < 3; i++) {
-                 assertEquals(0, scope.subtasks().size());
-                 Thread.sleep(20);
-             }
- 
-             assertEquals(Subtask.State.UNAVAILABLE, subtask1.state());
-             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
-             assertEquals(Subtask.State.UNAVAILABLE, subtask3.state());
+     @Test
+     void testIsCancelledAfterClose() throws Exception {
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll())) {
+             assertFalse(scope.isCancelled());
+             scope.close();
+             assertTrue(scope.isCancelled());
          }
      }
  
      /**
-      * Test that the default handleComplete throws IllegalArgumentException if called
-      * with a running task.
+      * Test Policy.onFork throwing exception.
       */
      @Test
-     void testHandleCompleteThrows() throws Exception {
-         class TestScope<T> extends StructuredTaskScope<T> {
-             protected void handleComplete(Subtask<? extends T> subtask) {
-                 super.handleComplete(subtask);
+     void testOnForkThrows() throws Exception {
+         var policy = new Policy<String, Void>() {
+             @Override
+             public boolean onFork(Subtask<? extends String> subtask) {
+                 throw new FooException();
              }
-         }
- 
-         try (var scope = new TestScope<String>()) {
-             var subtask = scope.fork(() -> {
-                 Thread.sleep(Duration.ofDays(1));
-                 return "foo";
-             });
- 
-             // running task
-             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
-             assertThrows(IllegalArgumentException.class, () -> scope.handleComplete(subtask));
-             scope.shutdown();
- 
-             // null task
-             assertThrows(NullPointerException.class, () -> scope.handleComplete(null));
- 
-             scope.join();
+             @Override
+             public Void result() {
+                 return null;
+             }
+         };
+         try (var scope = StructuredTaskScope.open(policy)) {
+             assertThrows(FooException.class, () -> scope.fork(() -> "foo"));
          }
      }
  
      /**
-      * Test ensureOwnerAndJoined.
+      * Test Policy.onFork returning true to cancel execution.
       */
-     @ParameterizedTest
-     @MethodSource("factories")
-     void testEnsureOwnerAndJoined(ThreadFactory factory) throws Exception {
-         class MyScope<T> extends StructuredTaskScope<T> {
-             MyScope(ThreadFactory factory) {
-                 super(null, factory);
+     @Test
+     void testOnForkCancelsExecution() throws Exception {
+         var policy = new Policy<String, Void>() {
+             @Override
+             public boolean onFork(Subtask<? extends String> subtask) {
+                 return true;
              }
-             void invokeEnsureOwnerAndJoined() {
-                 super.ensureOwnerAndJoined();
+             @Override
+             public Void result() {
+                 return null;
              }
-         }
- 
-         try (var scope = new MyScope<Boolean>(factory)) {
-             // owner thread, before join
-             scope.fork(() -> true);
-             assertThrows(IllegalStateException.class, () -> {
-                 scope.invokeEnsureOwnerAndJoined();
-             });
- 
-             // owner thread, after join
-             scope.join();
-             scope.invokeEnsureOwnerAndJoined();
- 
-             // thread in scope cannot invoke ensureOwnerAndJoined
-             Subtask<Boolean> subtask = scope.fork(() -> {
-                 assertThrows(WrongThreadException.class, () -> {
-                     scope.invokeEnsureOwnerAndJoined();
-                 });
-                 return true;
-             });
+         };
+         try (var scope = StructuredTaskScope.open(policy)) {
+             assertFalse(scope.isCancelled());
+             scope.fork(() -> "foo");
+             assertTrue(scope.isCancelled());
              scope.join();
-             assertTrue(subtask.get());
- 
-             // random thread cannot invoke ensureOwnerAndJoined
-             try (var pool = Executors.newSingleThreadExecutor()) {
-                 Future<Void> future = pool.submit(() -> {
-                     assertThrows(WrongThreadException.class, () -> {
-                         scope.invokeEnsureOwnerAndJoined();
-                     });
-                     return null;
-                 });
-                 future.get();
-             }
          }
      }
  
      /**
-      * Test ensureOwnerAndJoined after the task scope has been closed.
+      * Test Policy.onComplete returning true to cancel execution.
       */
-     @ParameterizedTest
-     @MethodSource("factories")
-     void testEnsureOwnerAndJoinedAfterClose(ThreadFactory factory) throws Exception {
-         class MyScope<T> extends StructuredTaskScope<T> {
-             MyScope(ThreadFactory factory) {
-                 super(null, factory);
+     @Test
+     void testOnCompleteCancelsExecution() throws Exception {
+         var policy = new Policy<String, Void>() {
+             @Override
+             public boolean onComplete(Subtask<? extends String> subtask) {
+                 return true;
              }
-             public void invokeEnsureOwnerAndJoined() {
-                 super.ensureOwnerAndJoined();
+             @Override
+             public Void result() {
+                 return null;
              }
-         }
- 
-         // ensureOwnerAndJoined after close, join invoked
-         try (var scope = new MyScope<String>(factory)) {
+         };
+         try (var scope = StructuredTaskScope.open(policy)) {
+             assertFalse(scope.isCancelled());
              scope.fork(() -> "foo");
+             while (!scope.isCancelled()) {
+                 Thread.sleep(10);
+             }
              scope.join();
-             scope.close();
-             scope.invokeEnsureOwnerAndJoined();  // should not throw
-         }
- 
-         // ensureOwnerAndJoined after close, join not invoked
-         try (var scope = new MyScope<String>(factory)) {
-             scope.fork(() -> "foo");
-             assertThrows(IllegalStateException.class, scope::close);
-             scope.invokeEnsureOwnerAndJoined();  // should not throw
          }
      }
  
- 
      /**
       * Test toString.
       */
      @Test
      void testToString() throws Exception {
-         ThreadFactory factory = Thread.ofVirtual().factory();
-         try (var scope = new StructuredTaskScope<Object>("duke", factory)) {
-             // open
-             assertTrue(scope.toString().contains("duke"));
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
+                 cf -> cf.withName("duke"))) {
  
-             // shutdown
-             scope.shutdown();
+             // open
              assertTrue(scope.toString().contains("duke"));
  
              // closed
-             scope.join();
              scope.close();
              assertTrue(scope.toString().contains("duke"));
          }
      }
  

@@ -1269,23 +965,22 @@
       * Test Subtask with task that completes successfully.
       */
      @ParameterizedTest
      @MethodSource("factories")
      void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope<String>(null, factory)) {
-             Callable<String> task = () -> "foo";
-             Subtask<String> subtask = scope.fork(task);
+         try (var scope = StructuredTaskScope.open(Policy.<String>ignoreAll(),
+                 cf -> cf.withThreadFactory(factory))) {
+ 
+             Subtask<String> subtask = scope.fork(() -> "foo");
  
-             // before join, owner thread
-             assertEquals(task, subtask.task());
+             // before join
              assertThrows(IllegalStateException.class, subtask::get);
              assertThrows(IllegalStateException.class, subtask::exception);
  
              scope.join();
  
              // after join
-             assertEquals(task, subtask.task());
              assertEquals(Subtask.State.SUCCESS, subtask.state());
              assertEquals("foo", subtask.get());
              assertThrows(IllegalStateException.class, subtask::exception);
          }
      }

@@ -1294,23 +989,22 @@
       * Test Subtask with task that fails.
       */
      @ParameterizedTest
      @MethodSource("factories")
      void testSubtaskWhenFailed(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope<String>(null, factory)) {
-             Callable<String> task = () -> { throw new FooException(); };
-             Subtask<String> subtask = scope.fork(task);
+         try (var scope = StructuredTaskScope.open(Policy.<String>ignoreAll(),
+                 cf -> cf.withThreadFactory(factory))) {
+ 
+             Subtask<String> subtask = scope.fork(() -> { throw new FooException(); });
  
-             // before join, owner thread
-             assertEquals(task, subtask.task());
+             // before join
              assertThrows(IllegalStateException.class, subtask::get);
              assertThrows(IllegalStateException.class, subtask::exception);
  
              scope.join();
  
              // after join
-             assertEquals(task, subtask.task());
              assertEquals(Subtask.State.FAILED, subtask.state());
              assertThrows(IllegalStateException.class, subtask::get);
              assertTrue(subtask.exception() instanceof FooException);
          }
      }

@@ -1319,53 +1013,55 @@
       * Test Subtask with a task that has not completed.
       */
      @ParameterizedTest
      @MethodSource("factories")
      void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope<Object>(null, factory)) {
-             Callable<Void> task = () -> {
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(),
+                 cf -> cf.withThreadFactory(factory))) {
+             Subtask<Void> subtask = scope.fork(() -> {
                  Thread.sleep(Duration.ofDays(1));
                  return null;
-             };
-             Subtask<Void> subtask = scope.fork(task);
+             });
  
              // before join
-             assertEquals(task, subtask.task());
              assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
              assertThrows(IllegalStateException.class, subtask::get);
              assertThrows(IllegalStateException.class, subtask::exception);
  
              // attempt join, join throws
              Thread.currentThread().interrupt();
              assertThrows(InterruptedException.class, scope::join);
  
              // after join
-             assertEquals(task, subtask.task());
              assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
              assertThrows(IllegalStateException.class, subtask::get);
              assertThrows(IllegalStateException.class, subtask::exception);
          }
      }
  
      /**
-      * Test Subtask when forked after shutdown.
+      * Test Subtask forked after execution cancelled.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testSubtaskWhenShutdown(ThreadFactory factory) throws Exception {
-         try (var scope = new StructuredTaskScope<Object>(null, factory)) {
-             Callable<Void> task = () -> {
-                 Thread.sleep(Duration.ofDays(1));
-                 return null;
-             };
+     void testSubtaskWhenCancelled(ThreadFactory factory) throws Exception {
+         try (var scope = StructuredTaskScope.open(new CancelAfterOnePolicy<String>())) {
+             scope.fork(() -> "foo");
+             while (!scope.isCancelled()) {
+                 Thread.sleep(20);
+             }
  
-             scope.shutdown();
+             var subtask = scope.fork(() -> "foo");
+ 
+             // before join
+             assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
+             assertThrows(IllegalStateException.class, subtask::get);
+             assertThrows(IllegalStateException.class, subtask::exception);
  
-             // fork after shutdown
-             Subtask<Void> subtask = scope.fork(task);
              scope.join();
-             assertEquals(task, subtask.task());
+ 
+             // after join
              assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
              assertThrows(IllegalStateException.class, subtask::get);
              assertThrows(IllegalStateException.class, subtask::exception);
          }
      }

@@ -1373,254 +1069,481 @@
      /**
       * Test Subtask::toString.
       */
      @Test
      void testSubtaskToString() throws Exception {
-         try (var scope = new StructuredTaskScope<Object>()) {
-             // success
-             var subtask1 = scope.fork(() -> "foo");
-             scope.join();
-             assertTrue(subtask1.toString().contains("Completed successfully"));
- 
-             // failed
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll())) {
+             var latch = new CountDownLatch(1);
+             var subtask1 = scope.fork(() -> {
+                 latch.await();
+                 return "foo";
+             });
              var subtask2 = scope.fork(() -> { throw new FooException(); });
+ 
+             // subtask1 result is unavailable
+             assertTrue(subtask1.toString().contains("Unavailable"));
+             latch.countDown();
+ 
              scope.join();
-             assertTrue(subtask2.toString().contains("Failed"));
  
-             // not completed
-             Callable<Void> sleepForDay = () -> {
-                 Thread.sleep(Duration.ofDays(1));
-                 return null;
-             };
-             var subtask3 = scope.fork(sleepForDay);
-             assertTrue(subtask3.toString().contains("Unavailable"));
+             assertTrue(subtask1.toString().contains("Completed successfully"));
+             assertTrue(subtask2.toString().contains("Failed"));
+         }
+     }
  
-             scope.shutdown();
+     /**
+      * Test Policy.allSuccessfulOrThrow() with no subtasks.
+      */
+     @Test
+     void testAllSuccessfulOrThrow1() throws Throwable {
+         try (var scope = StructuredTaskScope.open(Policy.allSuccessfulOrThrow())) {
+             var subtasks = scope.join().toList();
+             assertTrue(subtasks.isEmpty());
+         }
+     }
  
-             // forked after shutdown
-             var subtask4 = scope.fork(sleepForDay);
-             assertTrue(subtask4.toString().contains("Unavailable"));
+     /**
+      * Test Policy.allSuccessfulOrThrow() with subtasks that complete successfully.
+      */
+     @ParameterizedTest
+     @MethodSource("factories")
+     void testAllSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
+         try (var scope = StructuredTaskScope.open(Policy.<String>allSuccessfulOrThrow(),
+                 cf -> cf.withThreadFactory(factory))) {
+             var subtask1 = scope.fork(() -> "foo");
+             var subtask2 = scope.fork(() -> "bar");
+             var subtasks = scope.join().toList();
+             assertEquals(List.of(subtask1, subtask2), subtasks);
+             assertEquals("foo", subtask1.get());
+             assertEquals("bar", subtask2.get());
+         }
+     }
  
-             scope.join();
+     /**
+      * Test Policy.allSuccessfulOrThrow() with a subtask that complete successfully and
+      * a subtask that fails.
+      */
+     @ParameterizedTest
+     @MethodSource("factories")
+     void testAllSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
+         try (var scope = StructuredTaskScope.open(Policy.<String>allSuccessfulOrThrow(),
+                 cf -> cf.withThreadFactory(factory))) {
+             scope.fork(() -> "foo");
+             scope.fork(() -> { throw new FooException(); });
+             try {
+                 scope.join();
+             } catch (ExecutionException e) {
+                 assertTrue(e.getCause() instanceof FooException);
+             }
          }
      }
  
      /**
-      * Test ShutdownOnSuccess with no completed tasks.
+      * Test Policy.anySuccessfulResultOrThrow() with no subtasks.
       */
      @Test
-     void testShutdownOnSuccess1() throws Exception {
-         try (var scope = new ShutdownOnSuccess<Object>()) {
-             assertThrows(IllegalStateException.class, () -> scope.result());
-             assertThrows(IllegalStateException.class, () -> scope.result(e -> null));
+     void testAnySuccessfulResultOrThrow1() throws Exception {
+         try (var scope = StructuredTaskScope.open(Policy.anySuccessfulResultOrThrow())) {
+             try {
+                 scope.join();
+             } catch (ExecutionException e) {
+                 assertTrue(e.getCause() instanceof NoSuchElementException);
+             }
          }
      }
  
      /**
-      * Test ShutdownOnSuccess with tasks that complete successfully.
+      * Test Policy.anySuccessfulResultOrThrow() with a subtask that completes successfully.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testShutdownOnSuccess2(ThreadFactory factory) throws Exception {
-         try (var scope = new ShutdownOnSuccess<String>(null, factory)) {
+     void testAnySuccessfulResultOrThrow2(ThreadFactory factory) throws Exception {
+         try (var scope = StructuredTaskScope.open(Policy.<String>anySuccessfulResultOrThrow(),
+                 cf -> cf.withThreadFactory(factory))) {
              scope.fork(() -> "foo");
-             scope.join();  // ensures foo completes first
-             scope.fork(() -> "bar");
-             scope.join();
-             assertEquals("foo", scope.result());
-             assertEquals("foo", scope.result(e -> null));
+             String result = scope.join();
+             assertEquals("foo", result);
          }
      }
  
      /**
-      * Test ShutdownOnSuccess with a task that completes successfully with a null result.
+      * Test Policy.anySuccessfulResultOrThrow() with a subtask that completes successfully
+      * with a null result.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testShutdownOnSuccess3(ThreadFactory factory) throws Exception {
-         try (var scope = new ShutdownOnSuccess<Object>(null, factory)) {
+     void testAnySuccessfulResultOrThrow3(ThreadFactory factory) throws Exception {
+         try (var scope = StructuredTaskScope.open(Policy.<String>anySuccessfulResultOrThrow(),
+                 cf -> cf.withThreadFactory(factory))) {
              scope.fork(() -> null);
-             scope.join();
-             assertNull(scope.result());
-             assertNull(scope.result(e -> null));
+             String result = scope.join();
+             assertNull(result);
          }
      }
  
      /**
-      * Test ShutdownOnSuccess with tasks that complete succcessfully and tasks that fail.
+      * Test Policy.anySuccessfulResultOrThrow() with a subtask that complete succcessfully
+      * and a subtask that fails.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testShutdownOnSuccess4(ThreadFactory factory) throws Exception {
-         try (var scope = new ShutdownOnSuccess<String>(null, factory)) {
+     void testAnySuccessfulResultOrThrow4(ThreadFactory factory) throws Exception {
+         try (var scope = StructuredTaskScope.open(Policy.<String>anySuccessfulResultOrThrow(),
+                 cf -> cf.withThreadFactory(factory))) {
              scope.fork(() -> "foo");
-             scope.fork(() -> { throw new ArithmeticException(); });
-             scope.join();
-             assertEquals("foo", scope.result());
-             assertEquals("foo", scope.result(e -> null));
+             scope.fork(() -> { throw new FooException(); });
+             String first = scope.join();
+             assertEquals("foo", first);
          }
      }
  
      /**
-      * Test ShutdownOnSuccess with a task that fails.
+      * Test Policy.anySuccessfulResultOrThrow() with a subtask that fails.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testShutdownOnSuccess5(ThreadFactory factory) throws Exception {
-         try (var scope = new ShutdownOnSuccess<Object>(null, factory)) {
-             scope.fork(() -> { throw new ArithmeticException(); });
-             scope.join();
-             Throwable ex = assertThrows(ExecutionException.class, () -> scope.result());
-             assertTrue(ex.getCause() instanceof ArithmeticException);
-             ex = assertThrows(FooException.class, () -> scope.result(e -> new FooException(e)));
-             assertTrue(ex.getCause() instanceof ArithmeticException);
+     void testAnySuccessfulResultOrThrow5(ThreadFactory factory) throws Exception {
+         try (var scope = StructuredTaskScope.open(Policy.anySuccessfulResultOrThrow(),
+                 cf -> cf.withThreadFactory(factory))) {
+             scope.fork(() -> { throw new FooException(); });
+             Throwable ex = assertThrows(ExecutionException.class, scope::join);
+             assertTrue(ex.getCause() instanceof FooException);
          }
      }
  
      /**
-      * Test ShutdownOnSuccess methods are confined to the owner.
+      * Test Policy.ignoreSuccessfulOrThrow() with no subtasks.
+      */
+     @Test
+     void testIgnoreSuccessfulOrThrow1() throws Throwable {
+         try (var scope = StructuredTaskScope.open(Policy.ignoreSuccessfulOrThrow())) {
+             var result = scope.join();
+             assertNull(result);
+         }
+     }
+ 
+     /**
+      * Test Policy.ignoreSuccessfulOrThrow() with subtasks that complete successfully.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testShutdownOnSuccessConfined(ThreadFactory factory) throws Exception {
-         // owner before join
-         try (var scope = new ShutdownOnSuccess<Boolean>(null, factory)) {
-             scope.fork(() -> { throw new FooException(); });
-             assertThrows(IllegalStateException.class, scope::result);
-             assertThrows(IllegalStateException.class, () -> {
-                 scope.result(e -> new RuntimeException(e));
-             });
-             scope.join();
+     void testIgnoreSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
+         try (var scope = StructuredTaskScope.open(Policy.<String>ignoreSuccessfulOrThrow(),
+                 cf -> cf.withThreadFactory(factory))) {
+             var subtask1 = scope.fork(() -> "foo");
+             var subtask2 = scope.fork(() -> "bar");
+             var result = scope.join();
+             assertNull(result);
+             assertEquals("foo", subtask1.get());
+             assertEquals("bar", subtask2.get());
          }
+     }
  
-         // non-owner
-         try (var scope = new ShutdownOnSuccess<Boolean>(null, factory)) {
-             Subtask<Boolean> subtask = scope.fork(() -> {
-                 assertThrows(WrongThreadException.class, scope::result);
-                 assertThrows(WrongThreadException.class, () -> {
-                     scope.result(e -> new RuntimeException(e));
-                 });
-                 return true;
-             });
-             scope.join();
-             assertTrue(subtask.get());
+     /**
+      * Test Policy.ignoreSuccessfulOrThrow() with a subtask that complete successfully and
+      * a subtask that fails.
+      */
+     @ParameterizedTest
+     @MethodSource("factories")
+     void testIgnoreSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
+         try (var scope = StructuredTaskScope.open(Policy.<String>ignoreSuccessfulOrThrow(),
+                 cf -> cf.withThreadFactory(factory))) {
+             scope.fork(() -> "foo");
+             scope.fork(() -> { throw new FooException(); });
+             try {
+                 scope.join();
+             } catch (ExecutionException e) {
+                 assertTrue(e.getCause() instanceof FooException);
+             }
          }
      }
  
      /**
-      * Test ShutdownOnFailure with no completed tasks.
+      * Test Policy.ignoreAll() with no subtasks.
       */
      @Test
-     void testShutdownOnFailure1() throws Throwable {
-         try (var scope = new ShutdownOnFailure()) {
-             assertTrue(scope.exception().isEmpty());
-             scope.throwIfFailed();
-             scope.throwIfFailed(e -> new FooException(e));
+     void testIgnoreAll1() throws Throwable {
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll())) {
+             var result = scope.join();
+             assertNull(result);
          }
      }
  
      /**
-      * Test ShutdownOnFailure with tasks that complete successfully.
+      * Test Policy.ignoreAll() with subtasks that complete successfully.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testShutdownOnFailure2(ThreadFactory factory) throws Throwable {
-         try (var scope = new ShutdownOnFailure(null, factory)) {
-             scope.fork(() -> "foo");
-             scope.fork(() -> "bar");
-             scope.join();
- 
-             // no exception
-             assertTrue(scope.exception().isEmpty());
-             scope.throwIfFailed();
-             scope.throwIfFailed(e -> new FooException(e));
+     void testIgnoreAll2(ThreadFactory factory) throws Throwable {
+         try (var scope = StructuredTaskScope.open(Policy.<String>ignoreAll(),
+                 cf -> cf.withThreadFactory(factory))) {
+             var subtask1 = scope.fork(() -> "foo");
+             var subtask2 = scope.fork(() -> "bar");
+             var result = scope.join();
+             assertNull(result);
+             assertEquals("foo", subtask1.get());
+             assertEquals("bar", subtask2.get());
          }
      }
  
      /**
-      * Test ShutdownOnFailure with tasks that complete succcessfully and tasks that fail.
+      * Test Policy.ignoreAll() with a subtask that complete successfully and a subtask
+      * that fails.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testShutdownOnFailure3(ThreadFactory factory) throws Throwable {
-         try (var scope = new ShutdownOnFailure(null, factory)) {
+     void testIgnoreAll3(ThreadFactory factory) throws Throwable {
+         try (var scope = StructuredTaskScope.open(Policy.<String>ignoreAll(),
+                 cf -> cf.withThreadFactory(factory))) {
+             var subtask1 = scope.fork(() -> "foo");
+             var subtask2 = scope.fork(() -> { throw new FooException(); });
+             var result = scope.join();
+             assertNull(result);
+             assertEquals("foo", subtask1.get());
+             assertTrue(subtask2.exception() instanceof FooException);
+         }
+     }
  
-             // one task completes successfully, the other fails
-             scope.fork(() -> "foo");
-             scope.fork(() -> { throw new ArithmeticException(); });
-             scope.join();
+     /**
+      * Test Policy.all(Predicate) with no subtasks.
+      */
+     @Test
+     void testAllWithPredicate1() throws Throwable {
+         try (var scope = StructuredTaskScope.open(Policy.all(s -> false))) {
+             var subtasks = scope.join();
+             assertEquals(0, subtasks.count());
+         }
+     }
  
-             Throwable ex = scope.exception().orElse(null);
-             assertTrue(ex instanceof ArithmeticException);
+     /**
+      * Test Policy.all(Predicate) with no cancellation.
+      */
+     @ParameterizedTest
+     @MethodSource("factories")
+     void testAllWithPredicate2(ThreadFactory factory) throws Exception {
+         try (var scope = StructuredTaskScope.open(Policy.<String>all(s -> false),
+                 cf -> cf.withThreadFactory(factory))) {
  
-             ex = assertThrows(ExecutionException.class, () -> scope.throwIfFailed());
-             assertTrue(ex.getCause() instanceof ArithmeticException);
+             var subtask1 = scope.fork(() -> "foo");
+             var subtask2 = scope.fork(() -> { throw new FooException(); });
  
-             ex = assertThrows(FooException.class,
-                               () -> scope.throwIfFailed(e -> new FooException(e)));
-             assertTrue(ex.getCause() instanceof ArithmeticException);
+             var subtasks = scope.join().toList();
+             assertEquals(2, subtasks.size());
+ 
+             assertTrue(subtasks.get(0) == subtask1);
+             assertTrue(subtasks.get(1) == subtask2);
+             assertEquals("foo", subtask1.get());
+             assertTrue(subtask2.exception() instanceof FooException);
          }
      }
  
      /**
-      * Test ShutdownOnFailure methods are confined to the owner.
+      * Test Policy.all(Predicate) with cancellation after one subtask completes.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testShutdownOnFailureConfined(ThreadFactory factory) throws Exception {
-         // owner before join
-         try (var scope = new ShutdownOnFailure(null, factory)) {
-             scope.fork(() -> "foo");
-             assertThrows(IllegalStateException.class, scope::exception);
-             assertThrows(IllegalStateException.class, scope::throwIfFailed);
-             assertThrows(IllegalStateException.class, () -> {
-                 scope.throwIfFailed(e -> new RuntimeException(e));
+     void testAllWithPredicate3(ThreadFactory factory) throws Exception {
+         try (var scope = StructuredTaskScope.open(Policy.<String>all(s -> true),
+                 cf -> cf.withThreadFactory(factory))) {
+ 
+             var subtask1 = scope.fork(() -> "foo");
+             var subtask2 = scope.fork(() -> {
+                 Thread.sleep(Duration.ofDays(1));
+                 return "bar";
              });
-             scope.join();
+ 
+             var subtasks = scope.join().toList();
+ 
+             assertEquals(2, subtasks.size());
+             assertTrue(subtasks.get(0) == subtask1);
+             assertTrue(subtasks.get(1) == subtask2);
+             assertEquals("foo", subtask1.get());
+             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
          }
+     }
  
-         // non-owner
-         try (var scope = new ShutdownOnFailure(null, factory)) {
-             Subtask<Boolean> subtask = scope.fork(() -> {
-                 assertThrows(WrongThreadException.class, scope::exception);
-                 assertThrows(WrongThreadException.class, scope::throwIfFailed);
-                 assertThrows(WrongThreadException.class, () -> {
-                     scope.throwIfFailed(e -> new RuntimeException(e));
-                 });
-                 return true;
-             });
+     /**
+      * Test Policy.all(Predicate) with cancellation after serveral subtasks complete.
+      */
+     @ParameterizedTest
+     @MethodSource("factories")
+     void testAllWithPredicate4(ThreadFactory factory) throws Exception {
+ 
+         // cancel execution after two or more failures
+         class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> {
+             final AtomicInteger failedCount = new AtomicInteger();
+             @Override
+             public boolean test(Subtask<? extends T> subtask) {
+                 return subtask.state() == Subtask.State.FAILED
+                         && failedCount.incrementAndGet() >= 2;
+             }
+         }
+         var policy = Policy.all(new CancelAfterTwoFailures<String>());
+ 
+         try (var scope = StructuredTaskScope.open(policy)) {
+             int forkCount = 0;
+ 
+             // fork subtasks until execution cancelled
+             while (!scope.isCancelled()) {
+                 scope.fork(() -> "foo");
+                 scope.fork(() -> { throw new FooException(); });
+                 forkCount += 2;
+                 Thread.sleep(Duration.ofMillis(10));
+             }
+ 
+             var subtasks = scope.join().toList();
+             assertEquals(forkCount, subtasks.size());
+ 
+             long failedCount = subtasks.stream()
+                     .filter(s -> s.state() == Subtask.State.FAILED)
+                     .count();
+             assertTrue(failedCount >= 2);
+         }
+     }
+ 
+     /**
+      * Test Config equals/hashCode/toString
+      */
+     @Test
+     void testConfigMethods() throws Exception {
+         Function<Config, Config> testConfig = cf -> {
+             var name = "duke";
+             var threadFactory = Thread.ofPlatform().factory();
+             var timeout = Duration.ofSeconds(10);
+ 
+             assertEquals(cf, cf);
+             assertEquals(cf.withName(name), cf.withName(name));
+             assertEquals(cf.withThreadFactory(threadFactory), cf.withThreadFactory(threadFactory));
+             assertEquals(cf.withTimeout(timeout), cf.withTimeout(timeout));
+ 
+             assertNotEquals(cf, cf.withName(name));
+             assertNotEquals(cf, cf.withThreadFactory(threadFactory));
+             assertNotEquals(cf, cf.withTimeout(timeout));
+ 
+             assertEquals(cf.withName(name).hashCode(), cf.withName(name).hashCode());
+             assertEquals(cf.withThreadFactory(threadFactory).hashCode(),
+                     cf.withThreadFactory(threadFactory).hashCode());
+             assertEquals(cf.withTimeout(timeout).hashCode(), cf.withTimeout(timeout).hashCode());
+ 
+             assertTrue(cf.withName(name).toString().contains(name));
+             assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
+             assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
+ 
+             return cf;
+         };
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll(), testConfig)) {
+             // do nothing
+         }
+     }
+ 
+     /**
+      * Test Policy's default methods.
+      */
+     @Test
+     void testPolicyDefaultMethods() throws Exception {
+         try (var scope = StructuredTaskScope.open(new CancelAfterOnePolicy<String>())) {
+ 
+             // need subtasks to test default methods
+             var subtask1 = scope.fork(() -> "foo");
+             while (!scope.isCancelled()) {
+                 Thread.sleep(20);
+             }
+             var subtask2 = scope.fork(() -> "bar");
              scope.join();
-             assertTrue(subtask.get());
+ 
+             assertEquals(Subtask.State.SUCCESS, subtask1.state());
+             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
+ 
+             // Policy that does not override default methods
+             Policy<Object, Void> policy = () -> null;
+             assertThrows(NullPointerException.class, () -> policy.onFork(null));
+             assertThrows(NullPointerException.class, () -> policy.onComplete(null));
+             assertThrows(IllegalArgumentException.class, () -> policy.onFork(subtask1));
+             assertFalse(policy.onFork(subtask2));
+             assertFalse(policy.onComplete(subtask1));
+             assertThrows(IllegalArgumentException.class, () -> policy.onComplete(subtask2));
          }
      }
  
      /**
       * Test for NullPointerException.
       */
      @Test
      void testNulls() throws Exception {
-         assertThrows(NullPointerException.class, () -> new StructuredTaskScope("", null));
-         try (var scope = new StructuredTaskScope<Object>()) {
-             assertThrows(NullPointerException.class, () -> scope.fork(null));
-             assertThrows(NullPointerException.class, () -> scope.joinUntil(null));
-         }
+         assertThrows(NullPointerException.class,
+                 () -> StructuredTaskScope.open(null));
+         assertThrows(NullPointerException.class,
+                 () -> StructuredTaskScope.open(null, cf -> cf));
+         assertThrows(NullPointerException.class,
+                 () -> StructuredTaskScope.open(Policy.ignoreAll(), null));
+         assertThrows(NullPointerException.class,
+                 () -> StructuredTaskScope.open(Policy.ignoreAll(), cf -> null));
  
-         assertThrows(NullPointerException.class, () -> new ShutdownOnSuccess<Object>("", null));
-         try (var scope = new ShutdownOnSuccess<Object>()) {
-             assertThrows(NullPointerException.class, () -> scope.fork(null));
-             assertThrows(NullPointerException.class, () -> scope.joinUntil(null));
-             assertThrows(NullPointerException.class, () -> scope.result(null));
+         assertThrows(NullPointerException.class, () -> Policy.all(null));
+ 
+         // fork
+         try (var scope = StructuredTaskScope.open(Policy.ignoreAll())) {
+             assertThrows(NullPointerException.class, () -> scope.fork((Callable<Object>) null));
+             assertThrows(NullPointerException.class, () -> scope.fork((Runnable) null));
          }
  
-         assertThrows(NullPointerException.class, () -> new ShutdownOnFailure("", null));
-         try (var scope = new ShutdownOnFailure()) {
-             assertThrows(NullPointerException.class, () -> scope.fork(null));
-             assertThrows(NullPointerException.class, () -> scope.joinUntil(null));
-             assertThrows(NullPointerException.class, () -> scope.throwIfFailed(null));
+         // withXXX
+         assertThrows(NullPointerException.class,
+                 () -> StructuredTaskScope.open(Policy.ignoreAll(), cf -> cf.withName(null)));
+         assertThrows(NullPointerException.class,
+                 () -> StructuredTaskScope.open(Policy.ignoreAll(), cf -> cf.withThreadFactory(null)));
+         assertThrows(NullPointerException.class,
+                 () -> StructuredTaskScope.open(Policy.ignoreAll(), cf -> cf.withTimeout(null)));
+     }
+ 
+     /**
+      * ThreadFactory that counts usage.
+      */
+     private static class CountingThreadFactory implements ThreadFactory {
+         final ThreadFactory delegate;
+         final AtomicInteger threadCount = new AtomicInteger();
+         CountingThreadFactory(ThreadFactory delegate) {
+             this.delegate = delegate;
+         }
+         @Override
+         public Thread newThread(Runnable task) {
+             threadCount.incrementAndGet();
+             return delegate.newThread(task);
+         }
+         int threadCount() {
+             return threadCount.get();
          }
      }
  
+     /**
+      * Policy that cancels execution when a subtask completes.
+      */
+     private static class CancelAfterOnePolicy<T> implements Policy<T, Void> {
+         final AtomicInteger onForkCount = new AtomicInteger();
+         final AtomicInteger onCompleteCount = new AtomicInteger();
+         @Override
+         public boolean onFork(Subtask<? extends T> subtask) {
+             onForkCount.incrementAndGet();
+             return false;
+         }
+         @Override
+         public boolean onComplete(Subtask<? extends T> subtask) {
+             onCompleteCount.incrementAndGet();
+             return true;
+         }
+         @Override
+         public Void result() {
+             return null;
+         }
+         int onForkCount() {
+             return onForkCount.get();
+         }
+         int onCompleteCount() {
+             return onCompleteCount.get();
+         }
+     };
+ 
      /**
       * A runtime exception for tests.
       */
      private static class FooException extends RuntimeException {
          FooException() { }
< prev index next >