< prev index next > test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java
Print this page
import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.concurrent.StructureViolationException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
- import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import static java.lang.Thread.State.*;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeAll;
import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.concurrent.StructureViolationException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
+ import java.util.function.UnaryOperator;
import java.util.stream.Stream;
import static java.lang.Thread.State.*;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeAll;
/**
* Test fork after join, no subtasks forked before join.
*/
@ParameterizedTest
@MethodSource("factories")
! void testForkAfterJoin1(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
scope.join();
assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
}
/**
* Test fork after join, no subtasks forked before join.
*/
@ParameterizedTest
@MethodSource("factories")
! void testForkAfterJoinCompleted1(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
scope.join();
assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
}
/**
* Test fork after join, subtasks forked before join.
*/
@ParameterizedTest
@MethodSource("factories")
! void testForkAfterJoin2(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
scope.fork(() -> "foo");
scope.join();
assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
}
}
/**
! * Test fork after join throws.
*/
@ParameterizedTest
@MethodSource("factories")
! void testForkAfterJoinThrows(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
- var latch = new CountDownLatch(1);
var subtask1 = scope.fork(() -> {
! latch.await();
return "foo";
});
// join throws
Thread.currentThread().interrupt();
/**
* Test fork after join, subtasks forked before join.
*/
@ParameterizedTest
@MethodSource("factories")
! void testForkAfterJoinCompleted2(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
scope.fork(() -> "foo");
scope.join();
assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
}
}
/**
! * Test fork after join interrupted.
*/
@ParameterizedTest
@MethodSource("factories")
! void testForkAfterJoinInterrupted(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
var subtask1 = scope.fork(() -> {
! Thread.sleep(Duration.ofDays(1));
return "foo";
});
// join throws
Thread.currentThread().interrupt();
// fork should throw
assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
}
}
+ /**
+ * Test fork after join timeout.
+ */
+ @ParameterizedTest
+ @MethodSource("factories")
+ void testForkAfterJoinTimeout(ThreadFactory factory) throws Exception {
+ try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
+ cf -> cf.withThreadFactory(factory)
+ .withTimeout(Duration.ofMillis(100)))) {
+ awaitCancelled(scope);
+
+ // join throws
+ assertThrows(TimeoutException.class, scope::join);
+
+ // fork should throw
+ assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
+ }
+ }
+
/**
* Test fork after task scope is cancelled. This test uses a custom Joiner to
* cancel execution.
*/
@ParameterizedTest
}
/**
* Test fork after task scope is closed.
*/
! @Test
! void testForkAfterClose() {
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
scope.close();
assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
}
}
}
/**
* Test fork after task scope is closed.
*/
! @ParameterizedTest
! @MethodSource("factories")
! void testForkAfterClose(ThreadFactory factory) {
+ try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
+ cf -> cf.withThreadFactory(factory))) {
scope.close();
assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
}
}
/**
* Test join after join completed with an exception.
*/
@Test
void testJoinAfterJoin2() throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
scope.fork(() -> { throw new FooException(); });
Throwable ex = assertThrows(FailedException.class, scope::join);
assertTrue(ex.getCause() instanceof FooException);
// join already called
/**
* Test join after join completed with an exception.
*/
@Test
void testJoinAfterJoin2() throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow())) {
scope.fork(() -> { throw new FooException(); });
Throwable ex = assertThrows(FailedException.class, scope::join);
assertTrue(ex.getCause() instanceof FooException);
// join already called
assertThrows(IllegalStateException.class, scope::join);
}
}
}
/**
* Test join after join completed with a timeout.
*/
@Test
! void testJoinAfterJoin3() throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
cf -> cf.withTimeout(Duration.ofMillis(100)))) {
// wait for scope to be cancelled by timeout
awaitCancelled(scope);
assertThrows(TimeoutException.class, scope::join);
assertThrows(IllegalStateException.class, scope::join);
}
}
}
+ /**
+ * Test join after join interrupted.
+ */
+ @Test
+ void testJoinAfterJoinInterrupted() throws Exception {
+ try (var scope = StructuredTaskScope.open()) {
+ var latch = new CountDownLatch(1);
+ var subtask = scope.fork(() -> {
+ latch.await();
+ return "foo";
+ });
+
+ // join throws InterruptedException
+ Thread.currentThread().interrupt();
+ assertThrows(InterruptedException.class, scope::join);
+
+ latch.countDown();
+
+ // retry join to get result
+ scope.join();
+ assertEquals("foo", subtask.get());
+
+ // retry after otbaining result
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ }
+
/**
* Test join after join completed with a timeout.
*/
@Test
! void testJoinAfterJoinTimeout() throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow(),
cf -> cf.withTimeout(Duration.ofMillis(100)))) {
// wait for scope to be cancelled by timeout
awaitCancelled(scope);
assertThrows(TimeoutException.class, scope::join);
assertThrows(IllegalStateException.class, scope::join);
}
}
}
+ /**
+ * Test join invoked from Joiner.onTimeout.
+ */
+ @Test
+ void testJoinInOnTimeout() throws Exception {
+ Thread owner = Thread.currentThread();
+ var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>();
+
+ var joiner = new Joiner<String, Void>() {
+ @Override
+ public void onTimeout() {
+ assertTrue(Thread.currentThread() == owner);
+ var scope = scopeRef.get();
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ @Override
+ public Void result() {
+ return null;
+ }
+ };
+
+ try (var scope = StructuredTaskScope.open(joiner,
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ awaitCancelled(scope);
+ scopeRef.set(scope);
+ scope.join(); // invokes onTimeout
+ }
+ }
+
/**
* Test join method is owner confined.
*/
@ParameterizedTest
@MethodSource("factories")
void testInterruptJoin1(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
Subtask<String> subtask = scope.fork(() -> {
! Thread.sleep(60_000);
return "foo";
});
// join should throw
Thread.currentThread().interrupt();
void testInterruptJoin1(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
Subtask<String> subtask = scope.fork(() -> {
! Thread.sleep(Duration.ofDays(1));
return "foo";
});
// join should throw
Thread.currentThread().interrupt();
@ParameterizedTest
@MethodSource("factories")
void testInterruptJoin2(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
-
- var latch = new CountDownLatch(1);
Subtask<String> subtask = scope.fork(() -> {
! Thread.sleep(60_000);
return "foo";
});
// interrupt main thread when it blocks in join
scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join");
@ParameterizedTest
@MethodSource("factories")
void testInterruptJoin2(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
cf -> cf.withThreadFactory(factory))) {
Subtask<String> subtask = scope.fork(() -> {
! Thread.sleep(Duration.ofDays(1));
return "foo";
});
// interrupt main thread when it blocks in join
scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join");
*/
@Test
void testOnForkThrows() throws Exception {
var joiner = new Joiner<String, Void>() {
@Override
! public boolean onFork(Subtask<? extends String> subtask) {
throw new FooException();
}
@Override
public Void result() {
return null;
*/
@Test
void testOnForkThrows() throws Exception {
var joiner = new Joiner<String, Void>() {
@Override
! public boolean onFork(Subtask<String> subtask) {
throw new FooException();
}
@Override
public Void result() {
return null;
*/
@Test
void testOnForkCancelsExecution() throws Exception {
var joiner = new Joiner<String, Void>() {
@Override
! public boolean onFork(Subtask<? extends String> subtask) {
return true;
}
@Override
public Void result() {
return null;
*/
@Test
void testOnForkCancelsExecution() throws Exception {
var joiner = new Joiner<String, Void>() {
@Override
! public boolean onFork(Subtask<String> subtask) {
return true;
}
@Override
public Void result() {
return null;
*/
@Test
void testOnCompleteThrows() throws Exception {
var joiner = new Joiner<String, Void>() {
@Override
! public boolean onComplete(Subtask<? extends String> subtask) {
throw new FooException();
}
@Override
public Void result() {
return null;
*/
@Test
void testOnCompleteThrows() throws Exception {
var joiner = new Joiner<String, Void>() {
@Override
! public boolean onComplete(Subtask<String> subtask) {
throw new FooException();
}
@Override
public Void result() {
return null;
*/
@Test
void testOnCompleteCancelsExecution() throws Exception {
var joiner = new Joiner<String, Void>() {
@Override
! public boolean onComplete(Subtask<? extends String> subtask) {
return true;
}
@Override
public Void result() {
return null;
*/
@Test
void testOnCompleteCancelsExecution() throws Exception {
var joiner = new Joiner<String, Void>() {
@Override
! public boolean onComplete(Subtask<String> subtask) {
return true;
}
@Override
public Void result() {
return null;
awaitCancelled(scope);
scope.join();
}
}
+ /**
+ * Test Joiner.onTimeout invoked by owner thread when timeout expires.
+ */
+ @Test
+ void testOnTimeoutInvoked() throws Exception {
+ var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>();
+ Thread owner = Thread.currentThread();
+ var invokeCount = new AtomicInteger();
+ var joiner = new Joiner<String, Void>() {
+ @Override
+ public void onTimeout() {
+ assertTrue(Thread.currentThread() == owner);
+ assertTrue(scopeRef.get().isCancelled());
+ invokeCount.incrementAndGet();
+ }
+ @Override
+ public Void result() {
+ return null;
+ }
+ };
+ try (var scope = StructuredTaskScope.open(joiner,
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ scopeRef.set(scope);
+ scope.fork(() -> {
+ Thread.sleep(Duration.ofDays(1));
+ return null;
+ });
+ scope.join();
+ assertEquals(1, invokeCount.get());
+ }
+ }
+
+ /**
+ * Test Joiner.onTimeout throwing an excepiton.
+ */
+ @Test
+ void testOnTimeoutThrows() throws Exception {
+ var joiner = new Joiner<String, Void>() {
+ @Override
+ public void onTimeout() {
+ throw new FooException();
+ }
+ @Override
+ public Void result() {
+ return null;
+ }
+ };
+ try (var scope = StructuredTaskScope.open(joiner,
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ // wait for scope to be cancelled by timeout
+ awaitCancelled(scope);
+
+ // join should throw FooException on first usage
+ assertThrows(FooException.class, scope::join);
+
+ // retry after onTimeout fails
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ }
+
/**
* Test toString.
*/
@Test
void testToString() throws Exception {
@ParameterizedTest
@MethodSource("factories")
void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
cf -> cf.withThreadFactory(factory))) {
-
Subtask<String> subtask = scope.fork(() -> "foo");
! // before join
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
scope.join();
- // after join
assertEquals(Subtask.State.SUCCESS, subtask.state());
assertEquals("foo", subtask.get());
assertThrows(IllegalStateException.class, subtask::exception);
}
}
/**
* Test Subtask with task that fails.
@ParameterizedTest
@MethodSource("factories")
void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception {
try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
cf -> cf.withThreadFactory(factory))) {
Subtask<String> subtask = scope.fork(() -> "foo");
! // before join, owner thread
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
+ // before join, another thread
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
+
scope.join();
assertEquals(Subtask.State.SUCCESS, subtask.state());
+
+ // after join, owner thread
assertEquals("foo", subtask.get());
assertThrows(IllegalStateException.class, subtask::exception);
+
+ // after join, another thread
+ assertEquals("foo", callInOtherThread(subtask::get));
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
}
}
/**
* Test Subtask with task that fails.
try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
cf -> cf.withThreadFactory(factory))) {
Subtask<String> subtask = scope.fork(() -> { throw new FooException(); });
! // before join
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
scope.join();
- // after join
assertEquals(Subtask.State.FAILED, subtask.state());
assertThrows(IllegalStateException.class, subtask::get);
assertTrue(subtask.exception() instanceof FooException);
}
}
/**
* Test Subtask with a task that has not completed.
try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
cf -> cf.withThreadFactory(factory))) {
Subtask<String> subtask = scope.fork(() -> { throw new FooException(); });
! // before join, owner thread
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
+ // before join, another thread
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
+
scope.join();
assertEquals(Subtask.State.FAILED, subtask.state());
+
+ // after join, owner thread
assertThrows(IllegalStateException.class, subtask::get);
assertTrue(subtask.exception() instanceof FooException);
+
+ // after join, another thread
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
+ assertTrue(callInOtherThread(subtask::exception) instanceof FooException);
}
}
/**
* Test Subtask with a task that has not completed.
cf -> cf.withThreadFactory(factory))) {
Subtask<Void> subtask = scope.fork(() -> {
Thread.sleep(Duration.ofDays(1));
return null;
});
-
- // before join
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
// attempt join, join throws
Thread.currentThread().interrupt();
assertThrows(InterruptedException.class, scope::join);
- // after join
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
}
}
/**
* Test Subtask forked after execution cancelled.
cf -> cf.withThreadFactory(factory))) {
Subtask<Void> subtask = scope.fork(() -> {
Thread.sleep(Duration.ofDays(1));
return null;
});
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
+
+ // before join, owner thread
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
+ // before join, another thread
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
+
// attempt join, join throws
Thread.currentThread().interrupt();
assertThrows(InterruptedException.class, scope::join);
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
+
+ // after join, owner thread
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
+
+ // before join, another thread
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
}
}
/**
* Test Subtask forked after execution cancelled.
scope.fork(() -> "foo");
awaitCancelled(scope);
var subtask = scope.fork(() -> "foo");
! // before join
- assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
scope.join();
- // after join
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
}
}
/**
* Test Subtask::toString.
scope.fork(() -> "foo");
awaitCancelled(scope);
var subtask = scope.fork(() -> "foo");
! // before join, owner thread
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
+ // before join, another thread
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
+
scope.join();
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
+
+ // after join, owner thread
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
+
+ // before join, another thread
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
+ assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
}
}
/**
* Test Subtask::toString.
* Test Joiner.allSuccessfulOrThrow() with no subtasks.
*/
@Test
void testAllSuccessfulOrThrow1() throws Throwable {
try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow())) {
! var subtasks = scope.join().toList();
! assertTrue(subtasks.isEmpty());
}
}
/**
* Test Joiner.allSuccessfulOrThrow() with subtasks that complete successfully.
* Test Joiner.allSuccessfulOrThrow() with no subtasks.
*/
@Test
void testAllSuccessfulOrThrow1() throws Throwable {
try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow())) {
! var results = scope.join();
! assertTrue(results.isEmpty());
}
}
/**
* Test Joiner.allSuccessfulOrThrow() with subtasks that complete successfully.
@ParameterizedTest
@MethodSource("factories")
void testAllSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
cf -> cf.withThreadFactory(factory))) {
! var subtask1 = scope.fork(() -> "foo");
! var subtask2 = scope.fork(() -> "bar");
! var subtasks = scope.join().toList();
! assertEquals(List.of(subtask1, subtask2), subtasks);
- assertEquals("foo", subtask1.get());
- assertEquals("bar", subtask2.get());
}
}
/**
* Test Joiner.allSuccessfulOrThrow() with a subtask that complete successfully and
@ParameterizedTest
@MethodSource("factories")
void testAllSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
cf -> cf.withThreadFactory(factory))) {
! scope.fork(() -> "foo");
! scope.fork(() -> "bar");
! var results = scope.join();
! assertEquals(List.of("foo", "bar"), results);
}
}
/**
* Test Joiner.allSuccessfulOrThrow() with a subtask that complete successfully and
}
}
}
/**
! * Test Joiner.anySuccessfulResultOrThrow() with no subtasks.
*/
@Test
! void testAnySuccessfulResultOrThrow1() throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
try {
scope.join();
} catch (FailedException e) {
assertTrue(e.getCause() instanceof NoSuchElementException);
}
}
}
/**
! * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully.
*/
@ParameterizedTest
@MethodSource("factories")
! void testAnySuccessfulResultOrThrow2(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
cf -> cf.withThreadFactory(factory))) {
scope.fork(() -> "foo");
String result = scope.join();
assertEquals("foo", result);
}
}
/**
! * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully
* with a null result.
*/
@ParameterizedTest
@MethodSource("factories")
! void testAnySuccessfulResultOrThrow3(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
cf -> cf.withThreadFactory(factory))) {
scope.fork(() -> null);
String result = scope.join();
assertNull(result);
}
}
/**
! * Test Joiner.anySuccessfulResultOrThrow() with a subtask that complete succcessfully
* and a subtask that fails.
*/
@ParameterizedTest
@MethodSource("factories")
! void testAnySuccessfulResultOrThrow4(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
cf -> cf.withThreadFactory(factory))) {
scope.fork(() -> "foo");
scope.fork(() -> { throw new FooException(); });
String first = scope.join();
assertEquals("foo", first);
}
}
/**
! * Test Joiner.anySuccessfulResultOrThrow() with a subtask that fails.
*/
@ParameterizedTest
@MethodSource("factories")
! void testAnySuccessfulResultOrThrow5(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
cf -> cf.withThreadFactory(factory))) {
scope.fork(() -> { throw new FooException(); });
Throwable ex = assertThrows(FailedException.class, scope::join);
assertTrue(ex.getCause() instanceof FooException);
}
}
/**
* Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks.
*/
@Test
void testAwaitSuccessfulOrThrow1() throws Throwable {
}
}
}
/**
! * Test Joiner.allSuccessfulOrThrow() with a timeout.
+ */
+ @Test
+ void testAllSuccessfulOrThrow4() throws Exception {
+ try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ scope.fork(() -> "foo");
+ scope.fork(() -> {
+ Thread.sleep(Duration.ofDays(1));
+ return "bar";
+ });
+ assertThrows(TimeoutException.class, scope::join);
+
+ // retry after join throws TimeoutException
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ }
+
+ /**
+ * Test Joiner.allSuccessfulOrThrow() yields an unmodifiable list.
+ */
+ @Test
+ void testAllSuccessfulOrThrow5() throws Exception {
+ // empty list
+ try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow())) {
+ var results = scope.join();
+ assertEquals(0, results.size());
+ assertThrows(UnsupportedOperationException.class, () -> results.add("foo"));
+ }
+
+ // non-empty list
+ try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow())) {
+ scope.fork(() -> "foo");
+ var results = scope.join();
+ assertEquals(1, results.size());
+ assertThrows(UnsupportedOperationException.class, () -> results.add("foo"));
+ assertThrows(UnsupportedOperationException.class, () -> results.add("bar"));
+ }
+ }
+
+ /**
+ * Test Joiner.anySuccessfulOrThrow() with no subtasks.
*/
@Test
! void testAnySuccessfulOrThrow1() throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow())) {
try {
scope.join();
} catch (FailedException e) {
assertTrue(e.getCause() instanceof NoSuchElementException);
}
}
}
/**
! * Test Joiner.anySuccessfulOrThrow() with a subtask that completes successfully.
*/
@ParameterizedTest
@MethodSource("factories")
! void testAnySuccessfulOrThrow2(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulOrThrow(),
cf -> cf.withThreadFactory(factory))) {
scope.fork(() -> "foo");
String result = scope.join();
assertEquals("foo", result);
}
}
/**
! * Test Joiner.anySuccessfulOrThrow() with a subtask that completes successfully
* with a null result.
*/
@ParameterizedTest
@MethodSource("factories")
! void testAnySuccessfulOrThrow3(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulOrThrow(),
cf -> cf.withThreadFactory(factory))) {
scope.fork(() -> null);
String result = scope.join();
assertNull(result);
}
}
/**
! * Test Joiner.anySuccessfulOrThrow() with a subtask that complete succcessfully
* and a subtask that fails.
*/
@ParameterizedTest
@MethodSource("factories")
! void testAnySuccessfulOrThrow4(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulOrThrow(),
cf -> cf.withThreadFactory(factory))) {
scope.fork(() -> "foo");
scope.fork(() -> { throw new FooException(); });
String first = scope.join();
assertEquals("foo", first);
}
}
/**
! * Test Joiner.anySuccessfulOrThrow() with a subtask that fails.
*/
@ParameterizedTest
@MethodSource("factories")
! void testAnySuccessfulOrThrow5(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow(),
cf -> cf.withThreadFactory(factory))) {
scope.fork(() -> { throw new FooException(); });
Throwable ex = assertThrows(FailedException.class, scope::join);
assertTrue(ex.getCause() instanceof FooException);
}
}
+ /**
+ * Test Joiner.anySuccessfulOrThrow() with a timeout.
+ */
+ @Test
+ void anySuccessfulOrThrow6() throws Exception {
+ try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulOrThrow(),
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ scope.fork(() -> { throw new FooException(); });
+ scope.fork(() -> {
+ Thread.sleep(Duration.ofDays(1));
+ return "bar";
+ });
+ assertThrows(TimeoutException.class, scope::join);
+
+ // retry after join throws TimeoutException
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ }
+
/**
* Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks.
*/
@Test
void testAwaitSuccessfulOrThrow1() throws Throwable {
assertTrue(e.getCause() instanceof FooException);
}
}
}
+ /**
+ * Test Joiner.awaitAllSuccessfulOrThrow() with a timeout.
+ */
+ @Test
+ void testAwaitSuccessfulOrThrow4() throws Exception {
+ try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ scope.fork(() -> "foo");
+ scope.fork(() -> {
+ Thread.sleep(Duration.ofDays(1));
+ return "bar";
+ });
+ assertThrows(TimeoutException.class, scope::join);
+
+ // retry after join throws TimeoutException
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ }
+
/**
* Test Joiner.awaitAll() with no subtasks.
*/
@Test
void testAwaitAll1() throws Throwable {
assertEquals("foo", subtask1.get());
assertTrue(subtask2.exception() instanceof FooException);
}
}
/**
* Test Joiner.allUntil(Predicate) with no subtasks.
*/
@Test
void testAllUntil1() throws Throwable {
try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) {
var subtasks = scope.join();
! assertEquals(0, subtasks.count());
}
}
/**
* Test Joiner.allUntil(Predicate) with no cancellation.
assertEquals("foo", subtask1.get());
assertTrue(subtask2.exception() instanceof FooException);
}
}
+ /**
+ * Test Joiner.awaitAll() with a timeout.
+ */
+ @Test
+ void testAwaitAll4() throws Exception {
+ try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ scope.fork(() -> "foo");
+ scope.fork(() -> {
+ Thread.sleep(Duration.ofDays(1));
+ return "bar";
+ });
+ assertThrows(TimeoutException.class, scope::join);
+
+ // retry after join throws TimeoutException
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ }
+
/**
* Test Joiner.allUntil(Predicate) with no subtasks.
*/
@Test
void testAllUntil1() throws Throwable {
try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) {
var subtasks = scope.join();
! assertEquals(0, subtasks.size());
}
}
/**
* Test Joiner.allUntil(Predicate) with no cancellation.
cf -> cf.withThreadFactory(factory))) {
var subtask1 = scope.fork(() -> "foo");
var subtask2 = scope.fork(() -> { throw new FooException(); });
! var subtasks = scope.join().toList();
! assertEquals(2, subtasks.size());
- assertSame(subtask1, subtasks.get(0));
- assertSame(subtask2, subtasks.get(1));
assertEquals("foo", subtask1.get());
assertTrue(subtask2.exception() instanceof FooException);
}
}
cf -> cf.withThreadFactory(factory))) {
var subtask1 = scope.fork(() -> "foo");
var subtask2 = scope.fork(() -> { throw new FooException(); });
! var subtasks = scope.join();
! assertEquals(List.of(subtask1, subtask2), subtasks);
assertEquals("foo", subtask1.get());
assertTrue(subtask2.exception() instanceof FooException);
}
}
var subtask2 = scope.fork(() -> {
Thread.sleep(Duration.ofDays(1));
return "bar";
});
! var subtasks = scope.join().toList();
- assertEquals(2, subtasks.size());
- assertSame(subtask1, subtasks.get(0));
- assertSame(subtask2, subtasks.get(1));
assertEquals("foo", subtask1.get());
assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
}
}
var subtask2 = scope.fork(() -> {
Thread.sleep(Duration.ofDays(1));
return "bar";
});
! var subtasks = scope.join();
+ assertEquals(List.of(subtask1, subtask2), subtasks);
assertEquals("foo", subtask1.get());
assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
}
}
@ParameterizedTest
@MethodSource("factories")
void testAllUntil4(ThreadFactory factory) throws Exception {
// cancel execution after two or more failures
! class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> {
final AtomicInteger failedCount = new AtomicInteger();
@Override
! public boolean test(Subtask<? extends T> subtask) {
return subtask.state() == Subtask.State.FAILED
&& failedCount.incrementAndGet() >= 2;
}
}
var joiner = Joiner.allUntil(new CancelAfterTwoFailures<String>());
@ParameterizedTest
@MethodSource("factories")
void testAllUntil4(ThreadFactory factory) throws Exception {
// cancel execution after two or more failures
! class CancelAfterTwoFailures<T> implements Predicate<Subtask<T>> {
final AtomicInteger failedCount = new AtomicInteger();
@Override
! public boolean test(Subtask<T> subtask) {
return subtask.state() == Subtask.State.FAILED
&& failedCount.incrementAndGet() >= 2;
}
}
var joiner = Joiner.allUntil(new CancelAfterTwoFailures<String>());
scope.fork(() -> { throw new FooException(); });
forkCount += 2;
Thread.sleep(Duration.ofMillis(20));
}
! var subtasks = scope.join().toList();
assertEquals(forkCount, subtasks.size());
long failedCount = subtasks.stream()
.filter(s -> s.state() == Subtask.State.FAILED)
.count();
scope.fork(() -> { throw new FooException(); });
forkCount += 2;
Thread.sleep(Duration.ofMillis(20));
}
! var subtasks = scope.join();
assertEquals(forkCount, subtasks.size());
long failedCount = subtasks.stream()
.filter(s -> s.state() == Subtask.State.FAILED)
.count();
scope.join();
assertInstanceOf(FooException.class, excRef.get());
}
}
+ /**
+ * Test Joiner.allUntil(Predicate) with a timeout.
+ */
+ @Test
+ void testAllUntil6() throws Exception {
+ try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false),
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ var subtask1 = scope.fork(() -> "foo");
+ var subtask2 = scope.fork(() -> {
+ Thread.sleep(Duration.ofDays(1));
+ return "bar";
+ });
+
+ // TimeoutException should not be thrown
+ var subtasks = scope.join();
+
+ // stream should have two elements, subtask1 may or may not have completed
+ assertEquals(List.of(subtask1, subtask2), subtasks);
+ assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
+
+ // retry after join throws TimeoutException
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ }
+
+ /**
+ * Test Joiner.allUntil(Predicate) yields an unmodifiable list.
+ */
+ @Test
+ void testAllUntil7() throws Exception {
+ Subtask<String> subtask1;
+ try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false))) {
+ subtask1 = scope.fork(() -> "?");
+ scope.join();
+ }
+
+ // empty list
+ try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false))) {
+ var subtasks = scope.join();
+ assertEquals(0, subtasks.size());
+ assertThrows(UnsupportedOperationException.class, () -> subtasks.add(subtask1));
+ }
+
+ // non-empty list
+ try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false))) {
+ var subtask2 = scope.fork(() -> "foo");
+ var subtasks = scope.join();
+ assertEquals(1, subtasks.size());
+ assertThrows(UnsupportedOperationException.class, () -> subtasks.add(subtask1));
+ assertThrows(UnsupportedOperationException.class, () -> subtasks.add(subtask2));
+ }
+ }
+
/**
* Test Joiner default methods.
*/
@Test
void testJoinerDefaultMethods() throws Exception {
assertEquals(Subtask.State.SUCCESS, subtask1.state());
assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
// Joiner that does not override default methods
! Joiner<Object, Void> joiner = () -> null;
assertThrows(NullPointerException.class, () -> joiner.onFork(null));
assertThrows(NullPointerException.class, () -> joiner.onComplete(null));
assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1));
assertFalse(joiner.onFork(subtask2));
assertFalse(joiner.onComplete(subtask1));
assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2));
}
}
/**
* Test Joiners onFork/onComplete methods with a subtask in an unexpected state.
assertEquals(Subtask.State.SUCCESS, subtask1.state());
assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
// Joiner that does not override default methods
! Joiner<String, Void> joiner = () -> null;
assertThrows(NullPointerException.class, () -> joiner.onFork(null));
assertThrows(NullPointerException.class, () -> joiner.onComplete(null));
assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1));
assertFalse(joiner.onFork(subtask2));
assertFalse(joiner.onComplete(subtask1));
assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2));
+ assertThrows(TimeoutException.class, joiner::onTimeout);
}
}
/**
* Test Joiners onFork/onComplete methods with a subtask in an unexpected state.
// onComplete with uncompleted task should throw IAE
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
assertThrows(IllegalArgumentException.class,
() -> Joiner.allSuccessfulOrThrow().onComplete(subtask));
assertThrows(IllegalArgumentException.class,
! () -> Joiner.anySuccessfulResultOrThrow().onComplete(subtask));
assertThrows(IllegalArgumentException.class,
() -> Joiner.awaitAllSuccessfulOrThrow().onComplete(subtask));
assertThrows(IllegalArgumentException.class,
() -> Joiner.awaitAll().onComplete(subtask));
assertThrows(IllegalArgumentException.class,
// onComplete with uncompleted task should throw IAE
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
assertThrows(IllegalArgumentException.class,
() -> Joiner.allSuccessfulOrThrow().onComplete(subtask));
assertThrows(IllegalArgumentException.class,
! () -> Joiner.anySuccessfulOrThrow().onComplete(subtask));
assertThrows(IllegalArgumentException.class,
() -> Joiner.awaitAllSuccessfulOrThrow().onComplete(subtask));
assertThrows(IllegalArgumentException.class,
() -> Joiner.awaitAll().onComplete(subtask));
assertThrows(IllegalArgumentException.class,
// onFork with completed task should throw IAE
assertEquals(Subtask.State.SUCCESS, subtask.state());
assertThrows(IllegalArgumentException.class,
() -> Joiner.allSuccessfulOrThrow().onFork(subtask));
assertThrows(IllegalArgumentException.class,
! () -> Joiner.anySuccessfulResultOrThrow().onFork(subtask));
assertThrows(IllegalArgumentException.class,
() -> Joiner.awaitAllSuccessfulOrThrow().onFork(subtask));
assertThrows(IllegalArgumentException.class,
() -> Joiner.awaitAll().onFork(subtask));
assertThrows(IllegalArgumentException.class,
// onFork with completed task should throw IAE
assertEquals(Subtask.State.SUCCESS, subtask.state());
assertThrows(IllegalArgumentException.class,
() -> Joiner.allSuccessfulOrThrow().onFork(subtask));
assertThrows(IllegalArgumentException.class,
! () -> Joiner.anySuccessfulOrThrow().onFork(subtask));
assertThrows(IllegalArgumentException.class,
() -> Joiner.awaitAllSuccessfulOrThrow().onFork(subtask));
assertThrows(IllegalArgumentException.class,
() -> Joiner.awaitAll().onFork(subtask));
assertThrows(IllegalArgumentException.class,
/**
* Test Configuration equals/hashCode/toString
*/
@Test
void testConfigMethods() throws Exception {
! Function<Configuration, Configuration> testConfig = cf -> {
var name = "duke";
var threadFactory = Thread.ofPlatform().factory();
var timeout = Duration.ofSeconds(10);
assertEquals(cf, cf);
/**
* Test Configuration equals/hashCode/toString
*/
@Test
void testConfigMethods() throws Exception {
! UnaryOperator<Configuration> configOperator = cf -> {
var name = "duke";
var threadFactory = Thread.ofPlatform().factory();
var timeout = Duration.ofSeconds(10);
assertEquals(cf, cf);
assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
return cf;
};
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), testConfig)) {
// do nothing
}
}
/**
assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
return cf;
};
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), configOperator)) {
// do nothing
}
}
/**
assertThrows(NullPointerException.class,
() -> Joiner.allSuccessfulOrThrow().onFork(null));
assertThrows(NullPointerException.class,
() -> Joiner.allSuccessfulOrThrow().onComplete(null));
assertThrows(NullPointerException.class,
! () -> Joiner.anySuccessfulResultOrThrow().onFork(null));
assertThrows(NullPointerException.class,
! () -> Joiner.anySuccessfulResultOrThrow().onComplete(null));
}
/**
* ThreadFactory that counts usage.
*/
assertThrows(NullPointerException.class,
() -> Joiner.allSuccessfulOrThrow().onFork(null));
assertThrows(NullPointerException.class,
() -> Joiner.allSuccessfulOrThrow().onComplete(null));
assertThrows(NullPointerException.class,
! () -> Joiner.anySuccessfulOrThrow().onFork(null));
assertThrows(NullPointerException.class,
! () -> Joiner.anySuccessfulOrThrow().onComplete(null));
}
/**
* ThreadFactory that counts usage.
*/
*/
private static class CountingJoiner<T> implements Joiner<T, Void> {
final AtomicInteger onForkCount = new AtomicInteger();
final AtomicInteger onCompleteCount = new AtomicInteger();
@Override
! public boolean onFork(Subtask<? extends T> subtask) {
onForkCount.incrementAndGet();
return false;
}
@Override
! public boolean onComplete(Subtask<? extends T> subtask) {
onCompleteCount.incrementAndGet();
return false;
}
@Override
public Void result() {
*/
private static class CountingJoiner<T> implements Joiner<T, Void> {
final AtomicInteger onForkCount = new AtomicInteger();
final AtomicInteger onCompleteCount = new AtomicInteger();
@Override
! public boolean onFork(Subtask<T> subtask) {
onForkCount.incrementAndGet();
return false;
}
@Override
! public boolean onComplete(Subtask<T> subtask) {
onCompleteCount.incrementAndGet();
return false;
}
@Override
public Void result() {
*/
private static class CancelAfterOneJoiner<T> implements Joiner<T, Void> {
final AtomicInteger onForkCount = new AtomicInteger();
final AtomicInteger onCompleteCount = new AtomicInteger();
@Override
! public boolean onFork(Subtask<? extends T> subtask) {
onForkCount.incrementAndGet();
return false;
}
@Override
! public boolean onComplete(Subtask<? extends T> subtask) {
onCompleteCount.incrementAndGet();
return true;
}
@Override
public Void result() {
*/
private static class CancelAfterOneJoiner<T> implements Joiner<T, Void> {
final AtomicInteger onForkCount = new AtomicInteger();
final AtomicInteger onCompleteCount = new AtomicInteger();
@Override
! public boolean onFork(Subtask<T> subtask) {
onForkCount.incrementAndGet();
return false;
}
@Override
! public boolean onComplete(Subtask<T> subtask) {
onCompleteCount.incrementAndGet();
return true;
}
@Override
public Void result() {
interruptThreadAt(target, location);
return null;
});
}
+ /**
+ * Calls a result returning task from another thread.
+ */
+ private <V> V callInOtherThread(Callable<V> task) throws Exception {
+ var result = new AtomicReference<V>();
+ var exc = new AtomicReference<Exception>();
+ Thread thread = Thread.ofVirtual().start(() -> {
+ try {
+ result.set(task.call());
+ } catch (Exception e) {
+ exc.set(e);
+ }
+ });
+ boolean interrupted = false;
+ boolean terminated = false;
+ while (!terminated) {
+ try {
+ thread.join();
+ terminated = true;
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ Exception e = exc.get();
+ if (e != null) {
+ throw e;
+ } else {
+ return result.get();
+ }
+ }
+
/**
* Returns true if the given stack trace contains an element for the given class
* and method name.
*/
private boolean contains(StackTraceElement[] stack, String className, String methodName) {
< prev index next >