< prev index next >

test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java

Print this page
*** 59,12 ***
  import java.util.concurrent.StructuredTaskScope.Subtask;
  import java.util.concurrent.StructureViolationException;
  import java.util.concurrent.atomic.AtomicBoolean;
  import java.util.concurrent.atomic.AtomicInteger;
  import java.util.concurrent.atomic.AtomicReference;
- import java.util.function.Function;
  import java.util.function.Predicate;
  import java.util.stream.Stream;
  import static java.lang.Thread.State.*;
  
  import org.junit.jupiter.api.Test;
  import org.junit.jupiter.api.BeforeAll;
--- 59,12 ---
  import java.util.concurrent.StructuredTaskScope.Subtask;
  import java.util.concurrent.StructureViolationException;
  import java.util.concurrent.atomic.AtomicBoolean;
  import java.util.concurrent.atomic.AtomicInteger;
  import java.util.concurrent.atomic.AtomicReference;
  import java.util.function.Predicate;
+ import java.util.function.UnaryOperator;
  import java.util.stream.Stream;
  import static java.lang.Thread.State.*;
  
  import org.junit.jupiter.api.Test;
  import org.junit.jupiter.api.BeforeAll;

*** 207,11 ***
      /**
       * Test fork after join, no subtasks forked before join.
       */
      @ParameterizedTest
      @MethodSource("factories")
!     void testForkAfterJoin1(ThreadFactory factory) throws Exception {
          try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
                  cf -> cf.withThreadFactory(factory))) {
              scope.join();
              assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
          }
--- 207,11 ---
      /**
       * Test fork after join, no subtasks forked before join.
       */
      @ParameterizedTest
      @MethodSource("factories")
!     void testForkAfterJoinCompleted1(ThreadFactory factory) throws Exception {
          try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
                  cf -> cf.withThreadFactory(factory))) {
              scope.join();
              assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
          }

*** 220,30 ***
      /**
       * Test fork after join, subtasks forked before join.
       */
      @ParameterizedTest
      @MethodSource("factories")
!     void testForkAfterJoin2(ThreadFactory factory) throws Exception {
          try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
                  cf -> cf.withThreadFactory(factory))) {
              scope.fork(() -> "foo");
              scope.join();
              assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
          }
      }
  
      /**
!      * Test fork after join throws.
       */
      @ParameterizedTest
      @MethodSource("factories")
!     void testForkAfterJoinThrows(ThreadFactory factory) throws Exception {
          try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
                  cf -> cf.withThreadFactory(factory))) {
-             var latch = new CountDownLatch(1);
              var subtask1 = scope.fork(() -> {
!                 latch.await();
                  return "foo";
              });
  
              // join throws
              Thread.currentThread().interrupt();
--- 220,29 ---
      /**
       * Test fork after join, subtasks forked before join.
       */
      @ParameterizedTest
      @MethodSource("factories")
!     void testForkAfterJoinCompleted2(ThreadFactory factory) throws Exception {
          try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
                  cf -> cf.withThreadFactory(factory))) {
              scope.fork(() -> "foo");
              scope.join();
              assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
          }
      }
  
      /**
!      * Test fork after join interrupted.
       */
      @ParameterizedTest
      @MethodSource("factories")
!     void testForkAfterJoinInterrupted(ThreadFactory factory) throws Exception {
          try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
                  cf -> cf.withThreadFactory(factory))) {
              var subtask1 = scope.fork(() -> {
!                 Thread.sleep(Duration.ofDays(1));
                  return "foo";
              });
  
              // join throws
              Thread.currentThread().interrupt();

*** 252,10 ***
--- 251,29 ---
              // fork should throw
              assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
          }
      }
  
+     /**
+      * Test fork after join timeout.
+      */
+     @ParameterizedTest
+     @MethodSource("factories")
+     void testForkAfterJoinTimeout(ThreadFactory factory) throws Exception {
+         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
+                 cf -> cf.withThreadFactory(factory)
+                         .withTimeout(Duration.ofMillis(100)))) {
+             awaitCancelled(scope);
+ 
+             // join throws
+             assertThrows(TimeoutException.class, scope::join);
+ 
+             // fork should throw
+             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
+         }
+     }
+ 
      /**
       * Test fork after task scope is cancelled. This test uses a custom Joiner to
       * cancel execution.
       */
      @ParameterizedTest

*** 294,13 ***
      }
  
      /**
       * Test fork after task scope is closed.
       */
!     @Test
!     void testForkAfterClose() {
!         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
              scope.close();
              assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
          }
      }
  
--- 312,15 ---
      }
  
      /**
       * Test fork after task scope is closed.
       */
!     @ParameterizedTest
!     @MethodSource("factories")
!     void testForkAfterClose(ThreadFactory factory) {
+         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
+                 cf -> cf.withThreadFactory(factory))) {
              scope.close();
              assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
          }
      }
  

*** 380,11 ***
  
      /**
       * Test join after join completed with a timeout.
       */
      @Test
!     void testJoinAfterJoin3() throws Exception {
          try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
                  cf -> cf.withTimeout(Duration.ofMillis(100)))) {
              // wait for scope to be cancelled by timeout
              awaitCancelled(scope);
              assertThrows(TimeoutException.class, scope::join);
--- 400,38 ---
  
      /**
       * Test join after join completed with a timeout.
       */
      @Test
!     void testJoinAfterJoinInterrupted() throws Exception {
+         try (var scope = StructuredTaskScope.open()) {
+             var latch = new CountDownLatch(1);
+             var subtask = scope.fork(() -> {
+                 latch.await();
+                 return "foo";
+             });
+ 
+             // join throws InterruptedException
+             Thread.currentThread().interrupt();
+             assertThrows(InterruptedException.class, scope::join);
+ 
+             latch.countDown();
+ 
+             // retry join to get result
+             scope.join();
+             assertEquals("foo", subtask.get());
+ 
+             // retry after otbaining result
+             assertThrows(IllegalStateException.class, scope::join);
+         }
+     }
+ 
+     /**
+      * Test join after join completed with a timeout.
+      */
+     @Test
+     void testJoinAfterJoinTimeout() throws Exception {
          try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
                  cf -> cf.withTimeout(Duration.ofMillis(100)))) {
              // wait for scope to be cancelled by timeout
              awaitCancelled(scope);
              assertThrows(TimeoutException.class, scope::join);

*** 394,10 ***
--- 441,39 ---
                  assertThrows(IllegalStateException.class, scope::join);
              }
          }
      }
  
+     /**
+      * Test join invoked from Joiner.onTimeout.
+      */
+     @Test
+     void testJoinInOnTimeout() throws Exception {
+         Thread owner = Thread.currentThread();
+         var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>();
+ 
+         var joiner = new Joiner<String, Void>() {
+             @Override
+             public void onTimeout() {
+                 assertTrue(Thread.currentThread() == owner);
+                 var scope = scopeRef.get();
+                 assertThrows(IllegalStateException.class, scope::join);
+             }
+             @Override
+             public Void result() {
+                 return null;
+             }
+         };
+ 
+         try (var scope = StructuredTaskScope.open(joiner,
+                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+             awaitCancelled(scope);
+             scopeRef.set(scope);
+             scope.join();  // invokes onTimeout
+         }
+     }
+ 
      /**
       * Test join method is owner confined.
       */
      @ParameterizedTest
      @MethodSource("factories")

*** 432,11 ***
      void testInterruptJoin1(ThreadFactory factory) throws Exception {
          try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
                  cf -> cf.withThreadFactory(factory))) {
  
              Subtask<String> subtask = scope.fork(() -> {
!                 Thread.sleep(60_000);
                  return "foo";
              });
  
              // join should throw
              Thread.currentThread().interrupt();
--- 508,11 ---
      void testInterruptJoin1(ThreadFactory factory) throws Exception {
          try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
                  cf -> cf.withThreadFactory(factory))) {
  
              Subtask<String> subtask = scope.fork(() -> {
!                 Thread.sleep(Duration.ofDays(1));
                  return "foo";
              });
  
              // join should throw
              Thread.currentThread().interrupt();

*** 455,14 ***
      @ParameterizedTest
      @MethodSource("factories")
      void testInterruptJoin2(ThreadFactory factory) throws Exception {
          try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
                  cf -> cf.withThreadFactory(factory))) {
- 
-             var latch = new CountDownLatch(1);
              Subtask<String> subtask = scope.fork(() -> {
!                 Thread.sleep(60_000);
                  return "foo";
              });
  
              // interrupt main thread when it blocks in join
              scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join");
--- 531,12 ---
      @ParameterizedTest
      @MethodSource("factories")
      void testInterruptJoin2(ThreadFactory factory) throws Exception {
          try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
                  cf -> cf.withThreadFactory(factory))) {
              Subtask<String> subtask = scope.fork(() -> {
!                 Thread.sleep(Duration.ofDays(1));
                  return "foo";
              });
  
              // interrupt main thread when it blocks in join
              scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join");

*** 963,10 ***
--- 1037,70 ---
              awaitCancelled(scope);
              scope.join();
          }
      }
  
+     /**
+      * Test Joiner.onTimeout invoked by owner thread when timeout expires.
+      */
+     @Test
+     void testOnTimeoutInvoked() throws Exception {
+         var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>();
+         Thread owner = Thread.currentThread();
+         var invokeCount = new AtomicInteger();
+         var joiner = new Joiner<String, Void>() {
+             @Override
+             public void onTimeout() {
+                 assertTrue(Thread.currentThread() == owner);
+                 assertTrue(scopeRef.get().isCancelled());
+                 invokeCount.incrementAndGet();
+             }
+             @Override
+             public Void result() {
+                 return null;
+             }
+         };
+         try (var scope = StructuredTaskScope.open(joiner,
+                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+             scopeRef.set(scope);
+             scope.fork(() -> {
+                 Thread.sleep(Duration.ofDays(1));
+                 return null;
+             });
+             scope.join();
+             assertEquals(1, invokeCount.get());
+         }
+     }
+ 
+     /**
+      * Test Joiner.onTimeout throwing an excepiton.
+      */
+     @Test
+     void testOnTimeoutThrows() throws Exception {
+         var joiner = new Joiner<String, Void>() {
+             @Override
+             public void onTimeout() {
+                 throw new FooException();
+             }
+             @Override
+             public Void result() {
+                 return null;
+             }
+         };
+         try (var scope = StructuredTaskScope.open(joiner,
+                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+             // wait for scope to be cancelled by timeout
+             awaitCancelled(scope);
+ 
+             // join should throw FooException on first usage
+             assertThrows(FooException.class, scope::join);
+ 
+             // retry after onTimeout fails
+             assertThrows(IllegalStateException.class, scope::join);
+         }
+     }
+ 
      /**
       * Test toString.
       */
      @Test
      void testToString() throws Exception {

*** 1154,10 ***
--- 1288,29 ---
                  assertTrue(e.getCause() instanceof FooException);
              }
          }
      }
  
+     /**
+      * Test Joiner.allSuccessfulOrThrow() with a timeout.
+      */
+     @Test
+     void testAllSuccessfulOrThrow4() throws Exception {
+         try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
+                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+             scope.fork(() -> "foo");
+             scope.fork(() -> {
+                 Thread.sleep(Duration.ofDays(1));
+                 return "bar";
+             });
+             assertThrows(TimeoutException.class, scope::join);
+ 
+             // retry after join throws TimeoutException
+             assertThrows(IllegalStateException.class, scope::join);
+         }
+     }
+ 
      /**
       * Test Joiner.anySuccessfulResultOrThrow() with no subtasks.
       */
      @Test
      void testAnySuccessfulResultOrThrow1() throws Exception {

*** 1227,10 ***
--- 1380,29 ---
              Throwable ex = assertThrows(FailedException.class, scope::join);
              assertTrue(ex.getCause() instanceof FooException);
          }
      }
  
+     /**
+      * Test Joiner.allSuccessfulOrThrow() with a timeout.
+      */
+     @Test
+     void anySuccessfulResultOrThrow6() throws Exception {
+         try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
+                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+             scope.fork(() -> { throw new FooException(); });
+             scope.fork(() -> {
+                 Thread.sleep(Duration.ofDays(1));
+                 return "bar";
+             });
+             assertThrows(TimeoutException.class, scope::join);
+ 
+             // retry after join throws TimeoutException
+             assertThrows(IllegalStateException.class, scope::join);
+         }
+     }
+ 
      /**
       * Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks.
       */
      @Test
      void testAwaitSuccessfulOrThrow1() throws Throwable {

*** 1274,10 ***
--- 1446,29 ---
                  assertTrue(e.getCause() instanceof FooException);
              }
          }
      }
  
+     /**
+      * Test Joiner.awaitAllSuccessfulOrThrow() with a timeout.
+      */
+     @Test
+     void testAwaitSuccessfulOrThrow4() throws Exception {
+         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
+                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+             scope.fork(() -> "foo");
+             scope.fork(() -> {
+                 Thread.sleep(Duration.ofDays(1));
+                 return "bar";
+             });
+             assertThrows(TimeoutException.class, scope::join);
+ 
+             // retry after join throws TimeoutException
+             assertThrows(IllegalStateException.class, scope::join);
+         }
+     }
+ 
      /**
       * Test Joiner.awaitAll() with no subtasks.
       */
      @Test
      void testAwaitAll1() throws Throwable {

*** 1320,10 ***
--- 1511,29 ---
              assertEquals("foo", subtask1.get());
              assertTrue(subtask2.exception() instanceof FooException);
          }
      }
  
+     /**
+      * Test Joiner.awaitAll() with a timeout.
+      */
+     @Test
+     void testAwaitAll4() throws Exception {
+         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
+                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+             scope.fork(() -> "foo");
+             scope.fork(() -> {
+                 Thread.sleep(Duration.ofDays(1));
+                 return "bar";
+             });
+             assertThrows(TimeoutException.class, scope::join);
+ 
+             // retry after join throws TimeoutException
+             assertThrows(IllegalStateException.class, scope::join);
+         }
+     }
+ 
      /**
       * Test Joiner.allUntil(Predicate) with no subtasks.
       */
      @Test
      void testAllUntil1() throws Throwable {

*** 1435,10 ***
--- 1645,37 ---
              scope.join();
              assertInstanceOf(FooException.class, excRef.get());
          }
      }
  
+     /**
+      * Test Joiner.allUntil(Predicate) with a timeout.
+      */
+     @Test
+     void testAllUntil6() throws Exception {
+         try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false),
+                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+             var subtask1 = scope.fork(() -> "foo");
+             var subtask2 = scope.fork(() -> {
+                 Thread.sleep(Duration.ofDays(1));
+                 return "bar";
+             });
+ 
+             // TimeoutException should not be thrown
+             var subtasks = scope.join().toList();
+ 
+             // stream should have two elements, subtask1 may or may not have completed
+             assertEquals(2, subtasks.size());
+             assertSame(subtask1, subtasks.get(0));
+             assertSame(subtask2, subtasks.get(1));
+             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
+ 
+             // retry after join throws TimeoutException
+             assertThrows(IllegalStateException.class, scope::join);
+         }
+     }
+ 
      /**
       * Test Joiner default methods.
       */
      @Test
      void testJoinerDefaultMethods() throws Exception {

*** 1459,10 ***
--- 1696,11 ---
              assertThrows(NullPointerException.class, () -> joiner.onComplete(null));
              assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1));
              assertFalse(joiner.onFork(subtask2));
              assertFalse(joiner.onComplete(subtask1));
              assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2));
+             assertThrows(TimeoutException.class, joiner::onTimeout);
          }
      }
  
      /**
       * Test Joiners onFork/onComplete methods with a subtask in an unexpected state.

*** 1521,11 ***
      /**
       * Test Configuration equals/hashCode/toString
       */
      @Test
      void testConfigMethods() throws Exception {
!         Function<Configuration, Configuration> testConfig = cf -> {
              var name = "duke";
              var threadFactory = Thread.ofPlatform().factory();
              var timeout = Duration.ofSeconds(10);
  
              assertEquals(cf, cf);
--- 1759,11 ---
      /**
       * Test Configuration equals/hashCode/toString
       */
      @Test
      void testConfigMethods() throws Exception {
!         UnaryOperator<Configuration> configOperator = cf -> {
              var name = "duke";
              var threadFactory = Thread.ofPlatform().factory();
              var timeout = Duration.ofSeconds(10);
  
              assertEquals(cf, cf);

*** 1546,11 ***
              assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
              assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
  
              return cf;
          };
!         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), testConfig)) {
              // do nothing
          }
      }
  
      /**
--- 1784,11 ---
              assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
              assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
  
              return cf;
          };
!         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), configOperator)) {
              // do nothing
          }
      }
  
      /**
< prev index next >