< prev index next >

test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java

Print this page
@@ -59,12 +59,12 @@
  import java.util.concurrent.StructuredTaskScope.Subtask;
  import java.util.concurrent.StructureViolationException;
  import java.util.concurrent.atomic.AtomicBoolean;
  import java.util.concurrent.atomic.AtomicInteger;
  import java.util.concurrent.atomic.AtomicReference;
- import java.util.function.Function;
  import java.util.function.Predicate;
+ import java.util.function.UnaryOperator;
  import java.util.stream.Stream;
  import static java.lang.Thread.State.*;
  
  import org.junit.jupiter.api.Test;
  import org.junit.jupiter.api.BeforeAll;

@@ -207,11 +207,11 @@
      /**
       * Test fork after join, no subtasks forked before join.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testForkAfterJoin1(ThreadFactory factory) throws Exception {
+     void testForkAfterJoinCompleted1(ThreadFactory factory) throws Exception {
          try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
                  cf -> cf.withThreadFactory(factory))) {
              scope.join();
              assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
          }

@@ -220,30 +220,29 @@
      /**
       * Test fork after join, subtasks forked before join.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testForkAfterJoin2(ThreadFactory factory) throws Exception {
+     void testForkAfterJoinCompleted2(ThreadFactory factory) throws Exception {
          try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
                  cf -> cf.withThreadFactory(factory))) {
              scope.fork(() -> "foo");
              scope.join();
              assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
          }
      }
  
      /**
-      * Test fork after join throws.
+      * Test fork after join interrupted.
       */
      @ParameterizedTest
      @MethodSource("factories")
-     void testForkAfterJoinThrows(ThreadFactory factory) throws Exception {
+     void testForkAfterJoinInterrupted(ThreadFactory factory) throws Exception {
          try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
                  cf -> cf.withThreadFactory(factory))) {
-             var latch = new CountDownLatch(1);
              var subtask1 = scope.fork(() -> {
-                 latch.await();
+                 Thread.sleep(Duration.ofDays(1));
                  return "foo";
              });
  
              // join throws
              Thread.currentThread().interrupt();

@@ -252,10 +251,29 @@
              // fork should throw
              assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
          }
      }
  
+     /**
+      * Test fork after join timeout.
+      */
+     @ParameterizedTest
+     @MethodSource("factories")
+     void testForkAfterJoinTimeout(ThreadFactory factory) throws Exception {
+         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
+                 cf -> cf.withThreadFactory(factory)
+                         .withTimeout(Duration.ofMillis(100)))) {
+             awaitCancelled(scope);
+ 
+             // join throws
+             assertThrows(TimeoutException.class, scope::join);
+ 
+             // fork should throw
+             assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
+         }
+     }
+ 
      /**
       * Test fork after task scope is cancelled. This test uses a custom Joiner to
       * cancel execution.
       */
      @ParameterizedTest

@@ -294,13 +312,15 @@
      }
  
      /**
       * Test fork after task scope is closed.
       */
-     @Test
-     void testForkAfterClose() {
-         try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
+     @ParameterizedTest
+     @MethodSource("factories")
+     void testForkAfterClose(ThreadFactory factory) {
+         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
+                 cf -> cf.withThreadFactory(factory))) {
              scope.close();
              assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
          }
      }
  

@@ -380,11 +400,38 @@
  
      /**
       * Test join after join completed with a timeout.
       */
      @Test
-     void testJoinAfterJoin3() throws Exception {
+     void testJoinAfterJoinInterrupted() throws Exception {
+         try (var scope = StructuredTaskScope.open()) {
+             var latch = new CountDownLatch(1);
+             var subtask = scope.fork(() -> {
+                 latch.await();
+                 return "foo";
+             });
+ 
+             // join throws InterruptedException
+             Thread.currentThread().interrupt();
+             assertThrows(InterruptedException.class, scope::join);
+ 
+             latch.countDown();
+ 
+             // retry join to get result
+             scope.join();
+             assertEquals("foo", subtask.get());
+ 
+             // retry after otbaining result
+             assertThrows(IllegalStateException.class, scope::join);
+         }
+     }
+ 
+     /**
+      * Test join after join completed with a timeout.
+      */
+     @Test
+     void testJoinAfterJoinTimeout() throws Exception {
          try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
                  cf -> cf.withTimeout(Duration.ofMillis(100)))) {
              // wait for scope to be cancelled by timeout
              awaitCancelled(scope);
              assertThrows(TimeoutException.class, scope::join);

@@ -394,10 +441,39 @@
                  assertThrows(IllegalStateException.class, scope::join);
              }
          }
      }
  
+     /**
+      * Test join invoked from Joiner.onTimeout.
+      */
+     @Test
+     void testJoinInOnTimeout() throws Exception {
+         Thread owner = Thread.currentThread();
+         var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>();
+ 
+         var joiner = new Joiner<String, Void>() {
+             @Override
+             public void onTimeout() {
+                 assertTrue(Thread.currentThread() == owner);
+                 var scope = scopeRef.get();
+                 assertThrows(IllegalStateException.class, scope::join);
+             }
+             @Override
+             public Void result() {
+                 return null;
+             }
+         };
+ 
+         try (var scope = StructuredTaskScope.open(joiner,
+                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+             awaitCancelled(scope);
+             scopeRef.set(scope);
+             scope.join();  // invokes onTimeout
+         }
+     }
+ 
      /**
       * Test join method is owner confined.
       */
      @ParameterizedTest
      @MethodSource("factories")

@@ -432,11 +508,11 @@
      void testInterruptJoin1(ThreadFactory factory) throws Exception {
          try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
                  cf -> cf.withThreadFactory(factory))) {
  
              Subtask<String> subtask = scope.fork(() -> {
-                 Thread.sleep(60_000);
+                 Thread.sleep(Duration.ofDays(1));
                  return "foo";
              });
  
              // join should throw
              Thread.currentThread().interrupt();

@@ -455,14 +531,12 @@
      @ParameterizedTest
      @MethodSource("factories")
      void testInterruptJoin2(ThreadFactory factory) throws Exception {
          try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
                  cf -> cf.withThreadFactory(factory))) {
- 
-             var latch = new CountDownLatch(1);
              Subtask<String> subtask = scope.fork(() -> {
-                 Thread.sleep(60_000);
+                 Thread.sleep(Duration.ofDays(1));
                  return "foo";
              });
  
              // interrupt main thread when it blocks in join
              scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join");

@@ -963,10 +1037,70 @@
              awaitCancelled(scope);
              scope.join();
          }
      }
  
+     /**
+      * Test Joiner.onTimeout invoked by owner thread when timeout expires.
+      */
+     @Test
+     void testOnTimeoutInvoked() throws Exception {
+         var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>();
+         Thread owner = Thread.currentThread();
+         var invokeCount = new AtomicInteger();
+         var joiner = new Joiner<String, Void>() {
+             @Override
+             public void onTimeout() {
+                 assertTrue(Thread.currentThread() == owner);
+                 assertTrue(scopeRef.get().isCancelled());
+                 invokeCount.incrementAndGet();
+             }
+             @Override
+             public Void result() {
+                 return null;
+             }
+         };
+         try (var scope = StructuredTaskScope.open(joiner,
+                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+             scopeRef.set(scope);
+             scope.fork(() -> {
+                 Thread.sleep(Duration.ofDays(1));
+                 return null;
+             });
+             scope.join();
+             assertEquals(1, invokeCount.get());
+         }
+     }
+ 
+     /**
+      * Test Joiner.onTimeout throwing an excepiton.
+      */
+     @Test
+     void testOnTimeoutThrows() throws Exception {
+         var joiner = new Joiner<String, Void>() {
+             @Override
+             public void onTimeout() {
+                 throw new FooException();
+             }
+             @Override
+             public Void result() {
+                 return null;
+             }
+         };
+         try (var scope = StructuredTaskScope.open(joiner,
+                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+             // wait for scope to be cancelled by timeout
+             awaitCancelled(scope);
+ 
+             // join should throw FooException on first usage
+             assertThrows(FooException.class, scope::join);
+ 
+             // retry after onTimeout fails
+             assertThrows(IllegalStateException.class, scope::join);
+         }
+     }
+ 
      /**
       * Test toString.
       */
      @Test
      void testToString() throws Exception {

@@ -1154,10 +1288,29 @@
                  assertTrue(e.getCause() instanceof FooException);
              }
          }
      }
  
+     /**
+      * Test Joiner.allSuccessfulOrThrow() with a timeout.
+      */
+     @Test
+     void testAllSuccessfulOrThrow4() throws Exception {
+         try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
+                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+             scope.fork(() -> "foo");
+             scope.fork(() -> {
+                 Thread.sleep(Duration.ofDays(1));
+                 return "bar";
+             });
+             assertThrows(TimeoutException.class, scope::join);
+ 
+             // retry after join throws TimeoutException
+             assertThrows(IllegalStateException.class, scope::join);
+         }
+     }
+ 
      /**
       * Test Joiner.anySuccessfulResultOrThrow() with no subtasks.
       */
      @Test
      void testAnySuccessfulResultOrThrow1() throws Exception {

@@ -1227,10 +1380,29 @@
              Throwable ex = assertThrows(FailedException.class, scope::join);
              assertTrue(ex.getCause() instanceof FooException);
          }
      }
  
+     /**
+      * Test Joiner.allSuccessfulOrThrow() with a timeout.
+      */
+     @Test
+     void anySuccessfulResultOrThrow6() throws Exception {
+         try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
+                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+             scope.fork(() -> { throw new FooException(); });
+             scope.fork(() -> {
+                 Thread.sleep(Duration.ofDays(1));
+                 return "bar";
+             });
+             assertThrows(TimeoutException.class, scope::join);
+ 
+             // retry after join throws TimeoutException
+             assertThrows(IllegalStateException.class, scope::join);
+         }
+     }
+ 
      /**
       * Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks.
       */
      @Test
      void testAwaitSuccessfulOrThrow1() throws Throwable {

@@ -1274,10 +1446,29 @@
                  assertTrue(e.getCause() instanceof FooException);
              }
          }
      }
  
+     /**
+      * Test Joiner.awaitAllSuccessfulOrThrow() with a timeout.
+      */
+     @Test
+     void testAwaitSuccessfulOrThrow4() throws Exception {
+         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
+                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+             scope.fork(() -> "foo");
+             scope.fork(() -> {
+                 Thread.sleep(Duration.ofDays(1));
+                 return "bar";
+             });
+             assertThrows(TimeoutException.class, scope::join);
+ 
+             // retry after join throws TimeoutException
+             assertThrows(IllegalStateException.class, scope::join);
+         }
+     }
+ 
      /**
       * Test Joiner.awaitAll() with no subtasks.
       */
      @Test
      void testAwaitAll1() throws Throwable {

@@ -1320,10 +1511,29 @@
              assertEquals("foo", subtask1.get());
              assertTrue(subtask2.exception() instanceof FooException);
          }
      }
  
+     /**
+      * Test Joiner.awaitAll() with a timeout.
+      */
+     @Test
+     void testAwaitAll4() throws Exception {
+         try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
+                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+             scope.fork(() -> "foo");
+             scope.fork(() -> {
+                 Thread.sleep(Duration.ofDays(1));
+                 return "bar";
+             });
+             assertThrows(TimeoutException.class, scope::join);
+ 
+             // retry after join throws TimeoutException
+             assertThrows(IllegalStateException.class, scope::join);
+         }
+     }
+ 
      /**
       * Test Joiner.allUntil(Predicate) with no subtasks.
       */
      @Test
      void testAllUntil1() throws Throwable {

@@ -1435,10 +1645,37 @@
              scope.join();
              assertInstanceOf(FooException.class, excRef.get());
          }
      }
  
+     /**
+      * Test Joiner.allUntil(Predicate) with a timeout.
+      */
+     @Test
+     void testAllUntil6() throws Exception {
+         try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false),
+                 cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+             var subtask1 = scope.fork(() -> "foo");
+             var subtask2 = scope.fork(() -> {
+                 Thread.sleep(Duration.ofDays(1));
+                 return "bar";
+             });
+ 
+             // TimeoutException should not be thrown
+             var subtasks = scope.join().toList();
+ 
+             // stream should have two elements, subtask1 may or may not have completed
+             assertEquals(2, subtasks.size());
+             assertSame(subtask1, subtasks.get(0));
+             assertSame(subtask2, subtasks.get(1));
+             assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
+ 
+             // retry after join throws TimeoutException
+             assertThrows(IllegalStateException.class, scope::join);
+         }
+     }
+ 
      /**
       * Test Joiner default methods.
       */
      @Test
      void testJoinerDefaultMethods() throws Exception {

@@ -1459,10 +1696,11 @@
              assertThrows(NullPointerException.class, () -> joiner.onComplete(null));
              assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1));
              assertFalse(joiner.onFork(subtask2));
              assertFalse(joiner.onComplete(subtask1));
              assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2));
+             assertThrows(TimeoutException.class, joiner::onTimeout);
          }
      }
  
      /**
       * Test Joiners onFork/onComplete methods with a subtask in an unexpected state.

@@ -1521,11 +1759,11 @@
      /**
       * Test Configuration equals/hashCode/toString
       */
      @Test
      void testConfigMethods() throws Exception {
-         Function<Configuration, Configuration> testConfig = cf -> {
+         UnaryOperator<Configuration> configOperator = cf -> {
              var name = "duke";
              var threadFactory = Thread.ofPlatform().factory();
              var timeout = Duration.ofSeconds(10);
  
              assertEquals(cf, cf);

@@ -1546,11 +1784,11 @@
              assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
              assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
  
              return cf;
          };
-         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), testConfig)) {
+         try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), configOperator)) {
              // do nothing
          }
      }
  
      /**
< prev index next >