< prev index next > test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java
Print this page
import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.concurrent.StructureViolationException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
- import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import static java.lang.Thread.State.*;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeAll;
import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.concurrent.StructureViolationException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
+ import java.util.function.UnaryOperator;
import java.util.stream.Stream;
import static java.lang.Thread.State.*;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeAll;
/**
* Test fork after join, no subtasks forked before join.
*/
@ParameterizedTest
@MethodSource("factories")
! void testForkAfterJoin1(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
scope.join();
assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
}
/**
* Test fork after join, no subtasks forked before join.
*/
@ParameterizedTest
@MethodSource("factories")
! void testForkAfterJoinCompleted1(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
scope.join();
assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
}
/**
* Test fork after join, subtasks forked before join.
*/
@ParameterizedTest
@MethodSource("factories")
! void testForkAfterJoin2(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
scope.fork(() -> "foo");
scope.join();
assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
}
}
/**
! * Test fork after join throws.
*/
@ParameterizedTest
@MethodSource("factories")
! void testForkAfterJoinThrows(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
- var latch = new CountDownLatch(1);
var subtask1 = scope.fork(() -> {
! latch.await();
return "foo";
});
// join throws
Thread.currentThread().interrupt();
/**
* Test fork after join, subtasks forked before join.
*/
@ParameterizedTest
@MethodSource("factories")
! void testForkAfterJoinCompleted2(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
scope.fork(() -> "foo");
scope.join();
assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
}
}
/**
! * Test fork after join interrupted.
*/
@ParameterizedTest
@MethodSource("factories")
! void testForkAfterJoinInterrupted(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
var subtask1 = scope.fork(() -> {
! Thread.sleep(Duration.ofDays(1));
return "foo";
});
// join throws
Thread.currentThread().interrupt();
// fork should throw
assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
}
}
+ /**
+ * Test fork after join timeout.
+ */
+ @ParameterizedTest
+ @MethodSource("factories")
+ void testForkAfterJoinTimeout(ThreadFactory factory) throws Exception {
+ try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
+ cf -> cf.withThreadFactory(factory)
+ .withTimeout(Duration.ofMillis(100)))) {
+ awaitCancelled(scope);
+
+ // join throws
+ assertThrows(TimeoutException.class, scope::join);
+
+ // fork should throw
+ assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
+ }
+ }
+
/**
* Test fork after task scope is cancelled. This test uses a custom Joiner to
* cancel execution.
*/
@ParameterizedTest
}
/**
* Test fork after task scope is closed.
*/
! @Test
! void testForkAfterClose() {
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
scope.close();
assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
}
}
}
/**
* Test fork after task scope is closed.
*/
! @ParameterizedTest
! @MethodSource("factories")
! void testForkAfterClose(ThreadFactory factory) {
+ try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
+ cf -> cf.withThreadFactory(factory))) {
scope.close();
assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
}
}
/**
* Test join after join completed with a timeout.
*/
@Test
! void testJoinAfterJoin3() throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
cf -> cf.withTimeout(Duration.ofMillis(100)))) {
// wait for scope to be cancelled by timeout
awaitCancelled(scope);
assertThrows(TimeoutException.class, scope::join);
/**
* Test join after join completed with a timeout.
*/
@Test
! void testJoinAfterJoinInterrupted() throws Exception {
+ try (var scope = StructuredTaskScope.open()) {
+ var latch = new CountDownLatch(1);
+ var subtask = scope.fork(() -> {
+ latch.await();
+ return "foo";
+ });
+
+ // join throws InterruptedException
+ Thread.currentThread().interrupt();
+ assertThrows(InterruptedException.class, scope::join);
+
+ latch.countDown();
+
+ // retry join to get result
+ scope.join();
+ assertEquals("foo", subtask.get());
+
+ // retry after otbaining result
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ }
+
+ /**
+ * Test join after join completed with a timeout.
+ */
+ @Test
+ void testJoinAfterJoinTimeout() throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
cf -> cf.withTimeout(Duration.ofMillis(100)))) {
// wait for scope to be cancelled by timeout
awaitCancelled(scope);
assertThrows(TimeoutException.class, scope::join);
assertThrows(IllegalStateException.class, scope::join);
}
}
}
+ /**
+ * Test join invoked from Joiner.onTimeout.
+ */
+ @Test
+ void testJoinInOnTimeout() throws Exception {
+ Thread owner = Thread.currentThread();
+ var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>();
+
+ var joiner = new Joiner<String, Void>() {
+ @Override
+ public void onTimeout() {
+ assertTrue(Thread.currentThread() == owner);
+ var scope = scopeRef.get();
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ @Override
+ public Void result() {
+ return null;
+ }
+ };
+
+ try (var scope = StructuredTaskScope.open(joiner,
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ awaitCancelled(scope);
+ scopeRef.set(scope);
+ scope.join(); // invokes onTimeout
+ }
+ }
+
/**
* Test join method is owner confined.
*/
@ParameterizedTest
@MethodSource("factories")
void testInterruptJoin1(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
Subtask<String> subtask = scope.fork(() -> {
! Thread.sleep(60_000);
return "foo";
});
// join should throw
Thread.currentThread().interrupt();
void testInterruptJoin1(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
Subtask<String> subtask = scope.fork(() -> {
! Thread.sleep(Duration.ofDays(1));
return "foo";
});
// join should throw
Thread.currentThread().interrupt();
@ParameterizedTest
@MethodSource("factories")
void testInterruptJoin2(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
-
- var latch = new CountDownLatch(1);
Subtask<String> subtask = scope.fork(() -> {
! Thread.sleep(60_000);
return "foo";
});
// interrupt main thread when it blocks in join
scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join");
@ParameterizedTest
@MethodSource("factories")
void testInterruptJoin2(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
Subtask<String> subtask = scope.fork(() -> {
! Thread.sleep(Duration.ofDays(1));
return "foo";
});
// interrupt main thread when it blocks in join
scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join");
awaitCancelled(scope);
scope.join();
}
}
+ /**
+ * Test Joiner.onTimeout invoked by owner thread when timeout expires.
+ */
+ @Test
+ void testOnTimeoutInvoked() throws Exception {
+ var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>();
+ Thread owner = Thread.currentThread();
+ var invokeCount = new AtomicInteger();
+ var joiner = new Joiner<String, Void>() {
+ @Override
+ public void onTimeout() {
+ assertTrue(Thread.currentThread() == owner);
+ assertTrue(scopeRef.get().isCancelled());
+ invokeCount.incrementAndGet();
+ }
+ @Override
+ public Void result() {
+ return null;
+ }
+ };
+ try (var scope = StructuredTaskScope.open(joiner,
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ scopeRef.set(scope);
+ scope.fork(() -> {
+ Thread.sleep(Duration.ofDays(1));
+ return null;
+ });
+ scope.join();
+ assertEquals(1, invokeCount.get());
+ }
+ }
+
+ /**
+ * Test Joiner.onTimeout throwing an excepiton.
+ */
+ @Test
+ void testOnTimeoutThrows() throws Exception {
+ var joiner = new Joiner<String, Void>() {
+ @Override
+ public void onTimeout() {
+ throw new FooException();
+ }
+ @Override
+ public Void result() {
+ return null;
+ }
+ };
+ try (var scope = StructuredTaskScope.open(joiner,
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ // wait for scope to be cancelled by timeout
+ awaitCancelled(scope);
+
+ // join should throw FooException on first usage
+ assertThrows(FooException.class, scope::join);
+
+ // retry after onTimeout fails
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ }
+
/**
* Test toString.
*/
@Test
void testToString() throws Exception {
assertTrue(e.getCause() instanceof FooException);
}
}
}
+ /**
+ * Test Joiner.allSuccessfulOrThrow() with a timeout.
+ */
+ @Test
+ void testAllSuccessfulOrThrow4() throws Exception {
+ try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ scope.fork(() -> "foo");
+ scope.fork(() -> {
+ Thread.sleep(Duration.ofDays(1));
+ return "bar";
+ });
+ assertThrows(TimeoutException.class, scope::join);
+
+ // retry after join throws TimeoutException
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ }
+
/**
* Test Joiner.anySuccessfulResultOrThrow() with no subtasks.
*/
@Test
void testAnySuccessfulResultOrThrow1() throws Exception {
Throwable ex = assertThrows(FailedException.class, scope::join);
assertTrue(ex.getCause() instanceof FooException);
}
}
+ /**
+ * Test Joiner.allSuccessfulOrThrow() with a timeout.
+ */
+ @Test
+ void anySuccessfulResultOrThrow6() throws Exception {
+ try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ scope.fork(() -> { throw new FooException(); });
+ scope.fork(() -> {
+ Thread.sleep(Duration.ofDays(1));
+ return "bar";
+ });
+ assertThrows(TimeoutException.class, scope::join);
+
+ // retry after join throws TimeoutException
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ }
+
/**
* Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks.
*/
@Test
void testAwaitSuccessfulOrThrow1() throws Throwable {
assertTrue(e.getCause() instanceof FooException);
}
}
}
+ /**
+ * Test Joiner.awaitAllSuccessfulOrThrow() with a timeout.
+ */
+ @Test
+ void testAwaitSuccessfulOrThrow4() throws Exception {
+ try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ scope.fork(() -> "foo");
+ scope.fork(() -> {
+ Thread.sleep(Duration.ofDays(1));
+ return "bar";
+ });
+ assertThrows(TimeoutException.class, scope::join);
+
+ // retry after join throws TimeoutException
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ }
+
/**
* Test Joiner.awaitAll() with no subtasks.
*/
@Test
void testAwaitAll1() throws Throwable {
assertEquals("foo", subtask1.get());
assertTrue(subtask2.exception() instanceof FooException);
}
}
+ /**
+ * Test Joiner.awaitAll() with a timeout.
+ */
+ @Test
+ void testAwaitAll4() throws Exception {
+ try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ scope.fork(() -> "foo");
+ scope.fork(() -> {
+ Thread.sleep(Duration.ofDays(1));
+ return "bar";
+ });
+ assertThrows(TimeoutException.class, scope::join);
+
+ // retry after join throws TimeoutException
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ }
+
/**
* Test Joiner.allUntil(Predicate) with no subtasks.
*/
@Test
void testAllUntil1() throws Throwable {
scope.join();
assertInstanceOf(FooException.class, excRef.get());
}
}
+ /**
+ * Test Joiner.allUntil(Predicate) with a timeout.
+ */
+ @Test
+ void testAllUntil6() throws Exception {
+ try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false),
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ var subtask1 = scope.fork(() -> "foo");
+ var subtask2 = scope.fork(() -> {
+ Thread.sleep(Duration.ofDays(1));
+ return "bar";
+ });
+
+ // TimeoutException should not be thrown
+ var subtasks = scope.join().toList();
+
+ // stream should have two elements, subtask1 may or may not have completed
+ assertEquals(2, subtasks.size());
+ assertSame(subtask1, subtasks.get(0));
+ assertSame(subtask2, subtasks.get(1));
+ assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
+
+ // retry after join throws TimeoutException
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ }
+
/**
* Test Joiner default methods.
*/
@Test
void testJoinerDefaultMethods() throws Exception {
assertThrows(NullPointerException.class, () -> joiner.onComplete(null));
assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1));
assertFalse(joiner.onFork(subtask2));
assertFalse(joiner.onComplete(subtask1));
assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2));
+ assertThrows(TimeoutException.class, joiner::onTimeout);
}
}
/**
* Test Joiners onFork/onComplete methods with a subtask in an unexpected state.
/**
* Test Configuration equals/hashCode/toString
*/
@Test
void testConfigMethods() throws Exception {
! Function<Configuration, Configuration> testConfig = cf -> {
var name = "duke";
var threadFactory = Thread.ofPlatform().factory();
var timeout = Duration.ofSeconds(10);
assertEquals(cf, cf);
/**
* Test Configuration equals/hashCode/toString
*/
@Test
void testConfigMethods() throws Exception {
! UnaryOperator<Configuration> configOperator = cf -> {
var name = "duke";
var threadFactory = Thread.ofPlatform().factory();
var timeout = Duration.ofSeconds(10);
assertEquals(cf, cf);
assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
return cf;
};
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), testConfig)) {
// do nothing
}
}
/**
assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
return cf;
};
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), configOperator)) {
// do nothing
}
}
/**
< prev index next >