< prev index next > test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java
Print this page
import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.concurrent.StructureViolationException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
- import java.util.function.Function;
import java.util.function.Predicate;
+ import java.util.function.UnaryOperator;
import java.util.stream.Stream;
import static java.lang.Thread.State.*;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeAll;
/**
* Test fork after join, no subtasks forked before join.
*/
@ParameterizedTest
@MethodSource("factories")
- void testForkAfterJoin1(ThreadFactory factory) throws Exception {
+ void testForkAfterJoinCompleted1(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
scope.join();
assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
}
/**
* Test fork after join, subtasks forked before join.
*/
@ParameterizedTest
@MethodSource("factories")
- void testForkAfterJoin2(ThreadFactory factory) throws Exception {
+ void testForkAfterJoinCompleted2(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
scope.fork(() -> "foo");
scope.join();
assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
}
}
/**
- * Test fork after join throws.
+ * Test fork after join interrupted.
*/
@ParameterizedTest
@MethodSource("factories")
- void testForkAfterJoinThrows(ThreadFactory factory) throws Exception {
+ void testForkAfterJoinInterrupted(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
- var latch = new CountDownLatch(1);
var subtask1 = scope.fork(() -> {
- latch.await();
+ Thread.sleep(Duration.ofDays(1));
return "foo";
});
// join throws
Thread.currentThread().interrupt();
// fork should throw
assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
}
}
+ /**
+ * Test fork after join timeout.
+ */
+ @ParameterizedTest
+ @MethodSource("factories")
+ void testForkAfterJoinTimeout(ThreadFactory factory) throws Exception {
+ try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
+ cf -> cf.withThreadFactory(factory)
+ .withTimeout(Duration.ofMillis(100)))) {
+ awaitCancelled(scope);
+
+ // join throws
+ assertThrows(TimeoutException.class, scope::join);
+
+ // fork should throw
+ assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
+ }
+ }
+
/**
* Test fork after task scope is cancelled. This test uses a custom Joiner to
* cancel execution.
*/
@ParameterizedTest
}
/**
* Test fork after task scope is closed.
*/
- @Test
- void testForkAfterClose() {
- try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
+ @ParameterizedTest
+ @MethodSource("factories")
+ void testForkAfterClose(ThreadFactory factory) {
+ try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
+ cf -> cf.withThreadFactory(factory))) {
scope.close();
assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
}
}
/**
* Test join after join completed with a timeout.
*/
@Test
- void testJoinAfterJoin3() throws Exception {
+ void testJoinAfterJoinInterrupted() throws Exception {
+ try (var scope = StructuredTaskScope.open()) {
+ var latch = new CountDownLatch(1);
+ var subtask = scope.fork(() -> {
+ latch.await();
+ return "foo";
+ });
+
+ // join throws InterruptedException
+ Thread.currentThread().interrupt();
+ assertThrows(InterruptedException.class, scope::join);
+
+ latch.countDown();
+
+ // retry join to get result
+ scope.join();
+ assertEquals("foo", subtask.get());
+
+ // retry after otbaining result
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ }
+
+ /**
+ * Test join after join completed with a timeout.
+ */
+ @Test
+ void testJoinAfterJoinTimeout() throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
cf -> cf.withTimeout(Duration.ofMillis(100)))) {
// wait for scope to be cancelled by timeout
awaitCancelled(scope);
assertThrows(TimeoutException.class, scope::join);
assertThrows(IllegalStateException.class, scope::join);
}
}
}
+ /**
+ * Test join invoked from Joiner.onTimeout.
+ */
+ @Test
+ void testJoinInOnTimeout() throws Exception {
+ Thread owner = Thread.currentThread();
+ var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>();
+
+ var joiner = new Joiner<String, Void>() {
+ @Override
+ public void onTimeout() {
+ assertTrue(Thread.currentThread() == owner);
+ var scope = scopeRef.get();
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ @Override
+ public Void result() {
+ return null;
+ }
+ };
+
+ try (var scope = StructuredTaskScope.open(joiner,
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ awaitCancelled(scope);
+ scopeRef.set(scope);
+ scope.join(); // invokes onTimeout
+ }
+ }
+
/**
* Test join method is owner confined.
*/
@ParameterizedTest
@MethodSource("factories")
void testInterruptJoin1(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
Subtask<String> subtask = scope.fork(() -> {
- Thread.sleep(60_000);
+ Thread.sleep(Duration.ofDays(1));
return "foo";
});
// join should throw
Thread.currentThread().interrupt();
@ParameterizedTest
@MethodSource("factories")
void testInterruptJoin2(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
-
- var latch = new CountDownLatch(1);
Subtask<String> subtask = scope.fork(() -> {
- Thread.sleep(60_000);
+ Thread.sleep(Duration.ofDays(1));
return "foo";
});
// interrupt main thread when it blocks in join
scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join");
awaitCancelled(scope);
scope.join();
}
}
+ /**
+ * Test Joiner.onTimeout invoked by owner thread when timeout expires.
+ */
+ @Test
+ void testOnTimeoutInvoked() throws Exception {
+ var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>();
+ Thread owner = Thread.currentThread();
+ var invokeCount = new AtomicInteger();
+ var joiner = new Joiner<String, Void>() {
+ @Override
+ public void onTimeout() {
+ assertTrue(Thread.currentThread() == owner);
+ assertTrue(scopeRef.get().isCancelled());
+ invokeCount.incrementAndGet();
+ }
+ @Override
+ public Void result() {
+ return null;
+ }
+ };
+ try (var scope = StructuredTaskScope.open(joiner,
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ scopeRef.set(scope);
+ scope.fork(() -> {
+ Thread.sleep(Duration.ofDays(1));
+ return null;
+ });
+ scope.join();
+ assertEquals(1, invokeCount.get());
+ }
+ }
+
+ /**
+ * Test Joiner.onTimeout throwing an excepiton.
+ */
+ @Test
+ void testOnTimeoutThrows() throws Exception {
+ var joiner = new Joiner<String, Void>() {
+ @Override
+ public void onTimeout() {
+ throw new FooException();
+ }
+ @Override
+ public Void result() {
+ return null;
+ }
+ };
+ try (var scope = StructuredTaskScope.open(joiner,
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ // wait for scope to be cancelled by timeout
+ awaitCancelled(scope);
+
+ // join should throw FooException on first usage
+ assertThrows(FooException.class, scope::join);
+
+ // retry after onTimeout fails
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ }
+
/**
* Test toString.
*/
@Test
void testToString() throws Exception {
@ParameterizedTest
@MethodSource("factories")
void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
cf -> cf.withThreadFactory(factory))) {
-
Subtask<String> subtask = scope.fork(() -> "foo");
- // before join
+ // before join, owner thread
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
+ // before join, another thread
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
+
scope.join();
- // after join
assertEquals(Subtask.State.SUCCESS, subtask.state());
+
+ // after join, owner thread
assertEquals("foo", subtask.get());
assertThrows(IllegalStateException.class, subtask::exception);
+
+ // after join, another thread
+ assertEquals("foo", callInOtherThread(subtask::get));
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
}
}
/**
* Test Subtask with task that fails.
try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
cf -> cf.withThreadFactory(factory))) {
Subtask<String> subtask = scope.fork(() -> { throw new FooException(); });
- // before join
+ // before join, owner thread
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
+ // before join, another thread
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
+
scope.join();
- // after join
assertEquals(Subtask.State.FAILED, subtask.state());
+
+ // after join, owner thread
assertThrows(IllegalStateException.class, subtask::get);
assertTrue(subtask.exception() instanceof FooException);
+
+ // after join, another thread
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
+ assertTrue(callInOtherThread(subtask::exception) instanceof FooException);
}
}
/**
* Test Subtask with a task that has not completed.
cf -> cf.withThreadFactory(factory))) {
Subtask<Void> subtask = scope.fork(() -> {
Thread.sleep(Duration.ofDays(1));
return null;
});
-
- // before join
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
+
+ // before join, owner thread
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
+ // before join, another thread
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
+
// attempt join, join throws
Thread.currentThread().interrupt();
assertThrows(InterruptedException.class, scope::join);
- // after join
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
+
+ // after join, owner thread
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
+
+ // before join, another thread
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
}
}
/**
* Test Subtask forked after execution cancelled.
scope.fork(() -> "foo");
awaitCancelled(scope);
var subtask = scope.fork(() -> "foo");
- // before join
- assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
+ // before join, owner thread
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
+ // before join, another thread
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
+
scope.join();
- // after join
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
+
+ // after join, owner thread
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
+
+ // before join, another thread
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
}
}
/**
* Test Subtask::toString.
assertTrue(e.getCause() instanceof FooException);
}
}
}
+ /**
+ * Test Joiner.allSuccessfulOrThrow() with a timeout.
+ */
+ @Test
+ void testAllSuccessfulOrThrow4() throws Exception {
+ try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ scope.fork(() -> "foo");
+ scope.fork(() -> {
+ Thread.sleep(Duration.ofDays(1));
+ return "bar";
+ });
+ assertThrows(TimeoutException.class, scope::join);
+
+ // retry after join throws TimeoutException
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ }
+
/**
* Test Joiner.anySuccessfulResultOrThrow() with no subtasks.
*/
@Test
void testAnySuccessfulResultOrThrow1() throws Exception {
Throwable ex = assertThrows(FailedException.class, scope::join);
assertTrue(ex.getCause() instanceof FooException);
}
}
+ /**
+ * Test Joiner.allSuccessfulOrThrow() with a timeout.
+ */
+ @Test
+ void anySuccessfulResultOrThrow6() throws Exception {
+ try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ scope.fork(() -> { throw new FooException(); });
+ scope.fork(() -> {
+ Thread.sleep(Duration.ofDays(1));
+ return "bar";
+ });
+ assertThrows(TimeoutException.class, scope::join);
+
+ // retry after join throws TimeoutException
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ }
+
/**
* Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks.
*/
@Test
void testAwaitSuccessfulOrThrow1() throws Throwable {
assertTrue(e.getCause() instanceof FooException);
}
}
}
+ /**
+ * Test Joiner.awaitAllSuccessfulOrThrow() with a timeout.
+ */
+ @Test
+ void testAwaitSuccessfulOrThrow4() throws Exception {
+ try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ scope.fork(() -> "foo");
+ scope.fork(() -> {
+ Thread.sleep(Duration.ofDays(1));
+ return "bar";
+ });
+ assertThrows(TimeoutException.class, scope::join);
+
+ // retry after join throws TimeoutException
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ }
+
/**
* Test Joiner.awaitAll() with no subtasks.
*/
@Test
void testAwaitAll1() throws Throwable {
assertEquals("foo", subtask1.get());
assertTrue(subtask2.exception() instanceof FooException);
}
}
+ /**
+ * Test Joiner.awaitAll() with a timeout.
+ */
+ @Test
+ void testAwaitAll4() throws Exception {
+ try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ scope.fork(() -> "foo");
+ scope.fork(() -> {
+ Thread.sleep(Duration.ofDays(1));
+ return "bar";
+ });
+ assertThrows(TimeoutException.class, scope::join);
+
+ // retry after join throws TimeoutException
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ }
+
/**
* Test Joiner.allUntil(Predicate) with no subtasks.
*/
@Test
void testAllUntil1() throws Throwable {
var subtask1 = scope.fork(() -> "foo");
var subtask2 = scope.fork(() -> { throw new FooException(); });
var subtasks = scope.join().toList();
- assertEquals(2, subtasks.size());
+ assertEquals(List.of(subtask1, subtask2), subtasks);
- assertSame(subtask1, subtasks.get(0));
- assertSame(subtask2, subtasks.get(1));
assertEquals("foo", subtask1.get());
assertTrue(subtask2.exception() instanceof FooException);
}
}
Thread.sleep(Duration.ofDays(1));
return "bar";
});
var subtasks = scope.join().toList();
+ assertEquals(List.of(subtask1, subtask2), subtasks);
- assertEquals(2, subtasks.size());
- assertSame(subtask1, subtasks.get(0));
- assertSame(subtask2, subtasks.get(1));
assertEquals("foo", subtask1.get());
assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
}
}
scope.join();
assertInstanceOf(FooException.class, excRef.get());
}
}
+ /**
+ * Test Joiner.allUntil(Predicate) with a timeout.
+ */
+ @Test
+ void testAllUntil6() throws Exception {
+ try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false),
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ var subtask1 = scope.fork(() -> "foo");
+ var subtask2 = scope.fork(() -> {
+ Thread.sleep(Duration.ofDays(1));
+ return "bar";
+ });
+
+ // TimeoutException should not be thrown
+ var subtasks = scope.join().toList();
+
+ // stream should have two elements, subtask1 may or may not have completed
+ assertEquals(List.of(subtask1, subtask2), subtasks);
+ assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
+
+ // retry after join throws TimeoutException
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ }
+
/**
* Test Joiner default methods.
*/
@Test
void testJoinerDefaultMethods() throws Exception {
assertThrows(NullPointerException.class, () -> joiner.onComplete(null));
assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1));
assertFalse(joiner.onFork(subtask2));
assertFalse(joiner.onComplete(subtask1));
assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2));
+ assertThrows(TimeoutException.class, joiner::onTimeout);
}
}
/**
* Test Joiners onFork/onComplete methods with a subtask in an unexpected state.
/**
* Test Configuration equals/hashCode/toString
*/
@Test
void testConfigMethods() throws Exception {
- Function<Configuration, Configuration> testConfig = cf -> {
+ UnaryOperator<Configuration> configOperator = cf -> {
var name = "duke";
var threadFactory = Thread.ofPlatform().factory();
var timeout = Duration.ofSeconds(10);
assertEquals(cf, cf);
assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
return cf;
};
- try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), testConfig)) {
+ try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), configOperator)) {
// do nothing
}
}
/**
interruptThreadAt(target, location);
return null;
});
}
+ /**
+ * Calls a result returning task from another thread.
+ */
+ private <V> V callInOtherThread(Callable<V> task) throws Exception {
+ var result = new AtomicReference<V>();
+ var exc = new AtomicReference<Exception>();
+ Thread thread = Thread.ofVirtual().start(() -> {
+ try {
+ result.set(task.call());
+ } catch (Exception e) {
+ exc.set(e);
+ }
+ });
+ boolean interrupted = false;
+ boolean terminated = false;
+ while (!terminated) {
+ try {
+ thread.join();
+ terminated = true;
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ Exception e = exc.get();
+ if (e != null) {
+ throw e;
+ } else {
+ return result.get();
+ }
+ }
+
/**
* Returns true if the given stack trace contains an element for the given class
* and method name.
*/
private boolean contains(StackTraceElement[] stack, String className, String methodName) {
< prev index next >