< prev index next > test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java
Print this page
/*
! * Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
/*
! * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
* @enablePreview
* @run junit/othervm -DthreadFactory=virtual StructuredTaskScopeTest
*/
import java.time.Duration;
- import java.io.IOException;
- import java.time.Instant;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
- import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
- import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.Subtask;
- import java.util.concurrent.StructuredTaskScope.ShutdownOnSuccess;
- import java.util.concurrent.StructuredTaskScope.ShutdownOnFailure;
import java.util.concurrent.StructureViolationException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
! import java.util.function.Supplier;
import java.util.stream.Stream;
import static java.lang.Thread.State.*;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeAll;
* @enablePreview
* @run junit/othervm -DthreadFactory=virtual StructuredTaskScopeTest
*/
import java.time.Duration;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+ import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+ import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.StructuredTaskScope;
+ import java.util.concurrent.StructuredTaskScope.TimeoutException;
+ import java.util.concurrent.StructuredTaskScope.Config;
+ import java.util.concurrent.StructuredTaskScope.FailedException;
+ import java.util.concurrent.StructuredTaskScope.Joiner;
import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.concurrent.StructureViolationException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
! import java.util.function.Function;
+ import java.util.function.Predicate;
import java.util.stream.Stream;
import static java.lang.Thread.State.*;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeAll;
private static Stream<ThreadFactory> factories() {
return threadFactories.stream();
}
/**
! * Test that fork creates a new thread for each task.
*/
! @ParameterizedTest
! @MethodSource("factories")
! void testForkCreatesThread(ThreadFactory factory) throws Exception {
! Set<Long> tids = ConcurrentHashMap.newKeySet();
! try (var scope = new StructuredTaskScope<Object>(null, factory)) {
! for (int i = 0; i < 100; i++) {
scope.fork(() -> {
! tids.add(Thread.currentThread().threadId());
- return null;
});
- }
- scope.join();
- }
- assertEquals(100, tids.size());
- }
! /**
- * Test that fork creates a new virtual thread for each task.
- */
- @Test
- void testForkCreateVirtualThread() throws Exception {
- Set<Thread> threads = ConcurrentHashMap.newKeySet();
- try (var scope = new StructuredTaskScope<Object>()) {
- for (int i = 0; i < 100; i++) {
scope.fork(() -> {
threads.add(Thread.currentThread());
return null;
});
}
private static Stream<ThreadFactory> factories() {
return threadFactories.stream();
}
/**
! * Test that fork creates virtual threads when no ThreadFactory is configured.
*/
! @Test
! void testForkCreatesVirtualThread() throws Exception {
! Set<Thread> threads = ConcurrentHashMap.newKeySet();
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
! for (int i = 0; i < 50; i++) {
! // runnable
scope.fork(() -> {
! threads.add(Thread.currentThread());
});
! // callable
scope.fork(() -> {
threads.add(Thread.currentThread());
return null;
});
}
assertEquals(100, threads.size());
threads.forEach(t -> assertTrue(t.isVirtual()));
}
/**
! * Test that fork creates a new thread with the given thread factory.
*/
@ParameterizedTest
@MethodSource("factories")
! void testForkUsesFactory(ThreadFactory factory) throws Exception {
! var count = new AtomicInteger();
! ThreadFactory countingFactory = task -> {
! count.incrementAndGet();
! return factory.newThread(task);
! };
! try (var scope = new StructuredTaskScope<Object>(null, countingFactory)) {
! for (int i = 0; i < 100; i++) {
! scope.fork(() -> null);
}
scope.join();
}
! assertEquals(100, count.get());
}
/**
! * Test fork is confined to threads in the scope "tree".
*/
@ParameterizedTest
@MethodSource("factories")
void testForkConfined(ThreadFactory factory) throws Exception {
! try (var scope1 = new StructuredTaskScope<Boolean>();
! var scope2 = new StructuredTaskScope<Boolean>()) {
-
- // thread in scope1 cannot fork thread in scope2
- Subtask<Boolean> subtask1 = scope1.fork(() -> {
- assertThrows(WrongThreadException.class, () -> {
- scope2.fork(() -> null);
- });
- return true;
- });
-
- // thread in scope2 can fork thread in scope1
- Subtask<Boolean> subtask2 = scope2.fork(() -> {
- scope1.fork(() -> null);
- return true;
- });
-
- scope2.join();
- scope1.join();
-
- assertTrue(subtask1.get());
- assertTrue(subtask2.get());
// random thread cannot fork
try (var pool = Executors.newSingleThreadExecutor()) {
Future<Void> future = pool.submit(() -> {
assertThrows(WrongThreadException.class, () -> {
! scope1.fork(() -> null);
- });
- assertThrows(WrongThreadException.class, () -> {
- scope2.fork(() -> null);
});
return null;
});
future.get();
}
}
}
/**
! * Test fork after join completes.
*/
@ParameterizedTest
@MethodSource("factories")
! void testForkAfterJoin(ThreadFactory factory) throws Exception {
! try (var scope = new StructuredTaskScope<String>(null, factory)) {
! // round 1
- var subtask1 = scope.fork(() -> "foo");
- assertThrows(IllegalStateException.class, subtask1::get);
- scope.join();
- assertEquals("foo", subtask1.get());
-
- // round 2
- var subtask2 = scope.fork(() -> "bar");
- assertEquals("foo", subtask1.get());
- assertThrows(IllegalStateException.class, subtask2::get);
scope.join();
! assertEquals("foo", subtask1.get());
! assertEquals("bar", subtask2.get());
! // round 3
! var subtask3 = scope.fork(() -> "baz");
! assertEquals("foo", subtask1.get());
! assertEquals("bar", subtask2.get());
! assertThrows(IllegalStateException.class, subtask3::get);
scope.join();
! assertEquals("foo", subtask1.get());
- assertEquals("bar", subtask2.get());
- assertEquals("baz", subtask3.get());
}
}
/**
* Test fork after join throws.
*/
@ParameterizedTest
@MethodSource("factories")
void testForkAfterJoinThrows(ThreadFactory factory) throws Exception {
! try (var scope = new StructuredTaskScope<String>(null, factory)) {
var latch = new CountDownLatch(1);
var subtask1 = scope.fork(() -> {
latch.await();
return "foo";
});
// join throws
Thread.currentThread().interrupt();
assertThrows(InterruptedException.class, scope::join);
! // allow subtask1 to finish
! latch.countDown();
-
- // continue to fork
- var subtask2 = scope.fork(() -> "bar");
- assertThrows(IllegalStateException.class, subtask1::get);
- assertThrows(IllegalStateException.class, subtask2::get);
- scope.join();
- assertEquals("foo", subtask1.get());
- assertEquals("bar", subtask2.get());
}
}
/**
! * Test fork after scope is shutdown.
*/
@ParameterizedTest
@MethodSource("factories")
! void testForkAfterShutdown(ThreadFactory factory) throws Exception {
! var executed = new AtomicBoolean();
! try (var scope = new StructuredTaskScope<Object>(null, factory)) {
! scope.shutdown();
! Subtask<String> subtask = scope.fork(() -> {
! executed.set(true);
! return null;
! });
scope.join();
! assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
! assertThrows(IllegalStateException.class, subtask::get);
! assertThrows(IllegalStateException.class, subtask::exception);
}
- assertFalse(executed.get());
}
/**
! * Test fork after scope is closed.
*/
! @ParameterizedTest
! @MethodSource("factories")
! void testForkAfterClose(ThreadFactory factory) throws Exception {
- try (var scope = new StructuredTaskScope<Object>(null, factory)) {
scope.close();
assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
}
}
/**
! * Test fork when the thread factory rejects creating a thread.
*/
@Test
! void testForkRejectedExecutionException() throws Exception {
ThreadFactory factory = task -> null;
! try (var scope = new StructuredTaskScope(null, factory)) {
assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null));
- scope.join();
}
}
/**
* Test join with no subtasks.
*/
@Test
void testJoinWithNoSubtasks() throws Exception {
! try (var scope = new StructuredTaskScope()) {
scope.join();
}
}
/**
! * Test join with unfinished subtasks.
*/
@ParameterizedTest
@MethodSource("factories")
! void testJoinWithSubtasks(ThreadFactory factory) throws Exception {
! try (var scope = new StructuredTaskScope(null, factory)) {
Subtask<String> subtask = scope.fork(() -> {
! Thread.sleep(Duration.ofMillis(50));
return "foo";
});
scope.join();
assertEquals("foo", subtask.get());
}
}
/**
! * Test join is owner confined.
*/
! @ParameterizedTest
! @MethodSource("factories")
! void testJoinConfined(ThreadFactory factory) throws Exception {
! try (var scope = new StructuredTaskScope<Boolean>()) {
! // thread in scope cannot join
! Subtask<Boolean> subtask = scope.fork(() -> {
! assertThrows(WrongThreadException.class, () -> { scope.join(); });
! return true;
! });
! scope.join();
! assertTrue(subtask.get());
// random thread cannot join
try (var pool = Executors.newSingleThreadExecutor()) {
Future<Void> future = pool.submit(() -> {
assertThrows(WrongThreadException.class, scope::join);
return null;
});
future.get();
}
}
}
/**
* Test join with interrupt status set.
*/
@ParameterizedTest
@MethodSource("factories")
void testInterruptJoin1(ThreadFactory factory) throws Exception {
! try (var scope = new StructuredTaskScope(null, factory)) {
! var latch = new CountDownLatch(1);
Subtask<String> subtask = scope.fork(() -> {
! latch.await();
return "foo";
});
// join should throw
Thread.currentThread().interrupt();
try {
scope.join();
fail("join did not throw");
} catch (InterruptedException expected) {
! assertFalse(Thread.interrupted()); // interrupt status should be clear
- } finally {
- // let task continue
- latch.countDown();
}
-
- // join should complete
- scope.join();
- assertEquals("foo", subtask.get());
}
}
/**
* Test interrupt of thread blocked in join.
*/
@ParameterizedTest
@MethodSource("factories")
void testInterruptJoin2(ThreadFactory factory) throws Exception {
! try (var scope = new StructuredTaskScope(null, factory)) {
var latch = new CountDownLatch(1);
Subtask<String> subtask = scope.fork(() -> {
! latch.await();
return "foo";
});
! // join should throw
! scheduleInterruptAt("java.util.concurrent.StructuredTaskScope.join");
try {
scope.join();
fail("join did not throw");
} catch (InterruptedException expected) {
assertFalse(Thread.interrupted()); // interrupt status should be clear
- } finally {
- // let task continue
- latch.countDown();
}
-
- // join should complete
- scope.join();
- assertEquals("foo", subtask.get());
}
}
/**
! * Test join when scope is shutdown.
*/
@ParameterizedTest
@MethodSource("factories")
! void testJoinWithShutdown1(ThreadFactory factory) throws Exception {
! try (var scope = new StructuredTaskScope<String>(null, factory)) {
! var interrupted = new CountDownLatch(1);
- var finish = new CountDownLatch(1);
-
- Subtask<String> subtask = scope.fork(() -> {
- try {
- Thread.sleep(Duration.ofDays(1));
- } catch (InterruptedException e) {
- interrupted.countDown();
- }
- finish.await();
- return "foo";
- });
-
- scope.shutdown(); // should interrupt task
! interrupted.await();
!
- scope.join();
-
- // signal task to finish
- finish.countDown();
- }
- }
! /**
! * Test shutdown when owner is blocked in join.
! */
! @ParameterizedTest
- @MethodSource("factories")
- void testJoinWithShutdown2(ThreadFactory factory) throws Exception {
- class MyScope<T> extends StructuredTaskScope<T> {
- MyScope(ThreadFactory factory) {
- super(null, factory);
- }
- @Override
- protected void handleComplete(Subtask<? extends T> subtask) {
- shutdown();
}
- }
! try (var scope = new MyScope<String>(factory)) {
! Subtask<String> subtask1 = scope.fork(() -> {
- Thread.sleep(Duration.ofMillis(50));
- return "foo";
- });
- Subtask<String> subtask2 = scope.fork(() -> {
Thread.sleep(Duration.ofDays(1));
return "bar";
});
- // join should wakeup when shutdown is called
scope.join();
- // task1 should have completed successfully
- assertEquals(Subtask.State.SUCCESS, subtask1.state());
assertEquals("foo", subtask1.get());
- assertThrows(IllegalStateException.class, subtask1::exception);
-
- // task2 result/exception not available
assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
- assertThrows(IllegalStateException.class, subtask2::get);
- assertThrows(IllegalStateException.class, subtask2::exception);
}
}
/**
* Test join after scope is closed.
*/
@Test
void testJoinAfterClose() throws Exception {
! try (var scope = new StructuredTaskScope()) {
- scope.join();
scope.close();
assertThrows(IllegalStateException.class, () -> scope.join());
- assertThrows(IllegalStateException.class, () -> scope.joinUntil(Instant.now()));
}
}
/**
! * Test joinUntil, subtasks finish before deadline expires.
*/
@ParameterizedTest
@MethodSource("factories")
! void testJoinUntil1(ThreadFactory factory) throws Exception {
! try (var scope = new StructuredTaskScope<String>(null, factory)) {
Subtask<String> subtask = scope.fork(() -> {
! try {
- Thread.sleep(Duration.ofSeconds(2));
- } catch (InterruptedException e) { }
return "foo";
});
! long startMillis = millisTime();
! scope.joinUntil(Instant.now().plusSeconds(30));
! expectDuration(startMillis, /*min*/1900, /*max*/20_000);
assertEquals("foo", subtask.get());
}
}
/**
! * Test joinUntil, deadline expires before subtasks finish.
*/
@ParameterizedTest
@MethodSource("factories")
! void testJoinUntil2(ThreadFactory factory) throws Exception {
! try (var scope = new StructuredTaskScope<Object>(null, factory)) {
Subtask<Void> subtask = scope.fork(() -> {
Thread.sleep(Duration.ofDays(1));
return null;
});
! long startMillis = millisTime();
! try {
! scope.joinUntil(Instant.now().plusSeconds(2));
! } catch (TimeoutException e) {
- expectDuration(startMillis, /*min*/1900, /*max*/20_000);
- }
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
}
}
/**
! * Test joinUntil many times.
*/
@ParameterizedTest
@MethodSource("factories")
! void testJoinUntil3(ThreadFactory factory) throws Exception {
! try (var scope = new StructuredTaskScope<String>(null, factory)) {
! Subtask<String> subtask = scope.fork(() -> {
Thread.sleep(Duration.ofDays(1));
return null;
});
! for (int i = 0; i < 3; i++) {
! try {
! scope.joinUntil(Instant.now().plusMillis(50));
! fail("joinUntil did not throw");
- } catch (TimeoutException expected) {
- assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
- }
- }
}
}
/**
! * Test joinUntil with a deadline that has already expired.
*/
@ParameterizedTest
@MethodSource("factories")
! void testJoinUntil4(ThreadFactory factory) throws Exception {
! try (var scope = new StructuredTaskScope<Object>(null, factory)) {
! Subtask<Void> subtask = scope.fork(() -> {
! Thread.sleep(Duration.ofDays(1));
! return null;
});
! // now
! try {
! scope.joinUntil(Instant.now());
! fail("joinUntil did not throw");
- } catch (TimeoutException expected) {
- assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
}
! // in the past
! try {
! scope.joinUntil(Instant.now().minusSeconds(1));
! fail("joinUntil did not throw");
! } catch (TimeoutException expected) {
! assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
- }
}
}
/**
! * Test joinUntil with interrupt status set.
*/
@ParameterizedTest
@MethodSource("factories")
! void testInterruptJoinUntil1(ThreadFactory factory) throws Exception {
! try (var scope = new StructuredTaskScope<String>(null, factory)) {
! var latch = new CountDownLatch(1);
! Subtask<String> subtask = scope.fork(() -> {
! latch.await();
! return "foo";
});
! // joinUntil should throw
! Thread.currentThread().interrupt();
- try {
- scope.joinUntil(Instant.now().plusSeconds(30));
- fail("joinUntil did not throw");
- } catch (InterruptedException expected) {
- assertFalse(Thread.interrupted()); // interrupt status should be clear
- } finally {
- // let task continue
- latch.countDown();
}
! // join should complete
! scope.join();
! assertEquals("foo", subtask.get());
}
}
/**
! * Test interrupt of thread blocked in joinUntil.
- */
- @ParameterizedTest
- @MethodSource("factories")
- void testInterruptJoinUntil2(ThreadFactory factory) throws Exception {
- try (var scope = new StructuredTaskScope(null, factory)) {
- var latch = new CountDownLatch(1);
-
- Subtask<String> subtask = scope.fork(() -> {
- latch.await();
- return "foo";
- });
-
- // joinUntil should throw
- scheduleInterruptAt("java.util.concurrent.StructuredTaskScope.joinUntil");
- try {
- scope.joinUntil(Instant.now().plusSeconds(30));
- fail("joinUntil did not throw");
- } catch (InterruptedException expected) {
- assertFalse(Thread.interrupted()); // interrupt status should be clear
- } finally {
- // let task continue
- latch.countDown();
- }
-
- // join should complete
- scope.join();
- assertEquals("foo", subtask.get());
- }
- }
-
- /**
- * Test that shutdown interrupts unfinished subtasks.
- */
- @ParameterizedTest
- @MethodSource("factories")
- void testShutdownInterruptsThreads1(ThreadFactory factory) throws Exception {
- try (var scope = new StructuredTaskScope<Object>(null, factory)) {
- var interrupted = new AtomicBoolean();
- var latch = new CountDownLatch(1);
- var subtask = scope.fork(() -> {
- try {
- Thread.sleep(Duration.ofDays(1));
- } catch (InterruptedException e) {
- interrupted.set(true);
- } finally {
- latch.countDown();
- }
- return null;
- });
-
- scope.shutdown();
-
- // wait for task to complete
- latch.await();
- assertTrue(interrupted.get());
-
- scope.join();
-
- // subtask result/exception not available
- assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
- assertThrows(IllegalStateException.class, subtask::get);
- assertThrows(IllegalStateException.class, subtask::exception);
- }
- }
-
- /**
- * Test that shutdown does not interrupt current thread.
- */
- @ParameterizedTest
- @MethodSource("factories")
- void testShutdownInterruptsThreads2(ThreadFactory factory) throws Exception {
- try (var scope = new StructuredTaskScope<Object>(null, factory)) {
- var interrupted = new AtomicBoolean();
- var latch = new CountDownLatch(1);
- var subtask = scope.fork(() -> {
- try {
- scope.shutdown();
- interrupted.set(Thread.currentThread().isInterrupted());
- } finally {
- latch.countDown();
- }
- return null;
- });
-
- // wait for task to complete
- latch.await();
- assertFalse(interrupted.get());
-
- scope.join();
- }
- }
-
- /**
- * Test shutdown wakes join.
- */
- @ParameterizedTest
- @MethodSource("factories")
- void testShutdownWakesJoin(ThreadFactory factory) throws Exception {
- try (var scope = new StructuredTaskScope<Object>(null, factory)) {
- var latch = new CountDownLatch(1);
- scope.fork(() -> {
- Thread.sleep(Duration.ofMillis(100)); // give time for join to block
- scope.shutdown();
- latch.await();
- return null;
- });
-
- scope.join();
-
- // join woke up, allow task to complete
- latch.countDown();
- }
- }
-
- /**
- * Test shutdown after scope is closed.
- */
- @Test
- void testShutdownAfterClose() throws Exception {
- try (var scope = new StructuredTaskScope<Object>()) {
- scope.join();
- scope.close();
- assertThrows(IllegalStateException.class, scope::shutdown);
- }
- }
-
- /**
- * Test shutdown is confined to threads in the scope "tree".
- */
- @ParameterizedTest
- @MethodSource("factories")
- void testShutdownConfined(ThreadFactory factory) throws Exception {
- try (var scope1 = new StructuredTaskScope<Boolean>();
- var scope2 = new StructuredTaskScope<Boolean>()) {
-
- // thread in scope1 cannot shutdown scope2
- Subtask<Boolean> subtask1 = scope1.fork(() -> {
- assertThrows(WrongThreadException.class, scope2::shutdown);
- return true;
- });
-
- // wait for task in scope1 to complete to avoid racing with task in scope2
- while (subtask1.state() == Subtask.State.UNAVAILABLE) {
- Thread.sleep(10);
- }
-
- // thread in scope2 shutdown scope1
- Subtask<Boolean> subtask2 = scope2.fork(() -> {
- scope1.shutdown();
- return true;
- });
-
- scope2.join();
- scope1.join();
-
- assertTrue(subtask1.get());
- assertTrue(subtask1.get());
-
- // random thread cannot shutdown
- try (var pool = Executors.newSingleThreadExecutor()) {
- Future<Void> future = pool.submit(() -> {
- assertThrows(WrongThreadException.class, scope1::shutdown);
- assertThrows(WrongThreadException.class, scope2::shutdown);
- return null;
- });
- future.get();
- }
- }
- }
-
- /**
- * Test isShutdown.
- */
- @Test
- void testIsShutdown() {
- try (var scope = new StructuredTaskScope<Object>()) {
- assertFalse(scope.isShutdown()); // before shutdown
- scope.shutdown();
- assertTrue(scope.isShutdown()); // after shutdown
- scope.close();
- assertTrue(scope.isShutdown()); // after cose
- }
- }
-
- /**
- * Test close without join, no subtasks forked.
*/
@Test
void testCloseWithoutJoin1() {
! try (var scope = new StructuredTaskScope<Object>()) {
// do nothing
}
}
/**
! * Test close without join, unfinished subtasks.
*/
@ParameterizedTest
@MethodSource("factories")
void testCloseWithoutJoin2(ThreadFactory factory) {
! try (var scope = new StructuredTaskScope<String>(null, factory)) {
Subtask<String> subtask = scope.fork(() -> {
Thread.sleep(Duration.ofDays(1));
return null;
});
- assertThrows(IllegalStateException.class, scope::close);
-
- // subtask result/exception not available
- assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
- assertThrows(IllegalStateException.class, subtask::get);
- assertThrows(IllegalStateException.class, subtask::exception);
- }
- }
! /**
- * Test close without join, unfinished subtasks forked after join.
- */
- @ParameterizedTest
- @MethodSource("factories")
- void testCloseWithoutJoin3(ThreadFactory factory) throws Exception {
- try (var scope = new StructuredTaskScope(null, factory)) {
- scope.fork(() -> "foo");
- scope.join();
-
- Subtask<String> subtask = scope.fork(() -> {
- Thread.sleep(Duration.ofDays(1));
- return null;
- });
assertThrows(IllegalStateException.class, scope::close);
// subtask result/exception not available
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
}
assertEquals(100, threads.size());
threads.forEach(t -> assertTrue(t.isVirtual()));
}
/**
! * Test that fork create threads with the configured ThreadFactory.
*/
@ParameterizedTest
@MethodSource("factories")
! void testForkUsesThreadFactory(ThreadFactory factory) throws Exception {
! // TheadFactory that keeps reference to all threads it creates
! class RecordingThreadFactory implements ThreadFactory {
! final ThreadFactory delegate;
! final Set<Thread> threads = ConcurrentHashMap.newKeySet();
! RecordingThreadFactory(ThreadFactory delegate) {
! this.delegate = delegate;
! }
! @Override
+ public Thread newThread(Runnable task) {
+ Thread thread = delegate.newThread(task);
+ threads.add(thread);
+ return thread;
+ }
+ Set<Thread> threads() {
+ return threads;
+ }
+ }
+ var recordingThreadFactory = new RecordingThreadFactory(factory);
+ Set<Thread> threads = ConcurrentHashMap.newKeySet();
+ try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
+ cf -> cf.withThreadFactory(recordingThreadFactory))) {
+
+ for (int i = 0; i < 50; i++) {
+ // runnable
+ scope.fork(() -> {
+ threads.add(Thread.currentThread());
+ });
+
+ // callable
+ scope.fork(() -> {
+ threads.add(Thread.currentThread());
+ return null;
+ });
}
scope.join();
}
! assertEquals(100, threads.size());
+ assertEquals(recordingThreadFactory.threads(), threads);
}
/**
! * Test fork method is owner confined.
*/
@ParameterizedTest
@MethodSource("factories")
void testForkConfined(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
! cf -> cf.withThreadFactory(factory))) {
// random thread cannot fork
try (var pool = Executors.newSingleThreadExecutor()) {
Future<Void> future = pool.submit(() -> {
assertThrows(WrongThreadException.class, () -> {
! scope.fork(() -> null);
});
return null;
});
future.get();
}
+
+ // subtask cannot fork
+ Subtask<Boolean> subtask = scope.fork(() -> {
+ assertThrows(WrongThreadException.class, () -> {
+ scope.fork(() -> null);
+ });
+ return true;
+ });
+ scope.join();
+ assertTrue(subtask.get());
}
}
/**
! * Test fork after join, no subtasks forked before join.
*/
@ParameterizedTest
@MethodSource("factories")
! void testForkAfterJoin1(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
! cf -> cf.withThreadFactory(factory))) {
scope.join();
! assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
! }
+ }
! /**
! * Test fork after join, subtasks forked before join.
! */
! @ParameterizedTest
! @MethodSource("factories")
+ void testForkAfterJoin2(ThreadFactory factory) throws Exception {
+ try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
+ cf -> cf.withThreadFactory(factory))) {
+ scope.fork(() -> "foo");
scope.join();
! assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
}
}
/**
* Test fork after join throws.
*/
@ParameterizedTest
@MethodSource("factories")
void testForkAfterJoinThrows(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
+ cf -> cf.withThreadFactory(factory))) {
var latch = new CountDownLatch(1);
var subtask1 = scope.fork(() -> {
latch.await();
return "foo";
});
// join throws
Thread.currentThread().interrupt();
assertThrows(InterruptedException.class, scope::join);
! // fork should throw
! assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
}
}
/**
! * Test fork after task scope is cancelled. This test uses a custom Joiner to
+ * cancel execution.
*/
@ParameterizedTest
@MethodSource("factories")
! void testForkAfterCancel2(ThreadFactory factory) throws Exception {
! var countingThreadFactory = new CountingThreadFactory(factory);
! var testJoiner = new CancelAfterOneJoiner<String>();
!
! try (var scope = StructuredTaskScope.open(testJoiner,
! cf -> cf.withThreadFactory(countingThreadFactory))) {
!
! // fork subtask, the scope should be cancelled when the subtask completes
+ var subtask1 = scope.fork(() -> "foo");
+ while (!scope.isCancelled()) {
+ Thread.sleep(20);
+ }
+
+ assertEquals(1, countingThreadFactory.threadCount());
+ assertEquals(1, testJoiner.onForkCount());
+ assertEquals(1, testJoiner.onCompleteCount());
+
+ // fork second subtask, it should not run
+ var subtask2 = scope.fork(() -> "bar");
+
+ // onFork should be invoked, newThread and onComplete should not be invoked
+ assertEquals(1, countingThreadFactory.threadCount());
+ assertEquals(2, testJoiner.onForkCount());
+ assertEquals(1, testJoiner.onCompleteCount());
+
scope.join();
!
! assertEquals(1, countingThreadFactory.threadCount());
! assertEquals(2, testJoiner.onForkCount());
+ assertEquals(1, testJoiner.onCompleteCount());
+ assertEquals("foo", subtask1.get());
+ assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
}
}
/**
! * Test fork after task scope is closed.
*/
! @Test
! void testForkAfterClose() {
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
scope.close();
assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
}
}
/**
! * Test fork with a ThreadFactory that rejects creating a thread.
*/
@Test
! void testForkRejectedExecutionException() {
ThreadFactory factory = task -> null;
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
+ cf -> cf.withThreadFactory(factory))) {
assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null));
}
}
/**
* Test join with no subtasks.
*/
@Test
void testJoinWithNoSubtasks() throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
scope.join();
}
}
/**
! * Test join with a remaining subtask.
*/
@ParameterizedTest
@MethodSource("factories")
! void testJoinWithRemainingSubtasks(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
+ cf -> cf.withThreadFactory(factory))) {
Subtask<String> subtask = scope.fork(() -> {
! Thread.sleep(Duration.ofMillis(100));
return "foo";
});
scope.join();
assertEquals("foo", subtask.get());
}
}
/**
! * Test join after join completed with a result.
*/
! @Test
! void testJoinAfterJoin1() throws Exception {
! var results = new LinkedTransferQueue<>(List.of("foo", "bar", "baz"));
! Joiner<Object, String> joiner = results::take;
+ try (var scope = StructuredTaskScope.open(joiner)) {
+ scope.fork(() -> "foo");
+ assertEquals("foo", scope.join());
! // join already called
! for (int i = 0 ; i < 3; i++) {
! assertThrows(IllegalStateException.class, scope::join);
! }
! }
+ }
! /**
+ * Test join after join completed with an exception.
+ */
+ @Test
+ void testJoinAfterJoin2() throws Exception {
+ try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
+ scope.fork(() -> { throw new FooException(); });
+ Throwable ex = assertThrows(FailedException.class, scope::join);
+ assertTrue(ex.getCause() instanceof FooException);
! // join already called
+ for (int i = 0 ; i < 3; i++) {
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ }
+ }
+
+ /**
+ * Test join after join completed with a timeout.
+ */
+ @Test
+ void testJoinAfterJoin3() throws Exception {
+ try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
+ cf -> cf.withTimeout(Duration.ofMillis(100)))) {
+ // wait for scope to be cancelled by timeout
+ while (!scope.isCancelled()) {
+ Thread.sleep(20);
+ }
+ assertThrows(TimeoutException.class, scope::join);
+
+ // join already called
+ for (int i = 0 ; i < 3; i++) {
+ assertThrows(IllegalStateException.class, scope::join);
+ }
+ }
+ }
+
+ /**
+ * Test join method is owner confined.
+ */
+ @ParameterizedTest
+ @MethodSource("factories")
+ void testJoinConfined(ThreadFactory factory) throws Exception {
+ try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
+ cf -> cf.withThreadFactory(factory))) {
// random thread cannot join
try (var pool = Executors.newSingleThreadExecutor()) {
Future<Void> future = pool.submit(() -> {
assertThrows(WrongThreadException.class, scope::join);
return null;
});
future.get();
}
+
+ // subtask cannot join
+ Subtask<Boolean> subtask = scope.fork(() -> {
+ assertThrows(WrongThreadException.class, () -> { scope.join(); });
+ return true;
+ });
+ scope.join();
+ assertTrue(subtask.get());
}
}
/**
* Test join with interrupt status set.
*/
@ParameterizedTest
@MethodSource("factories")
void testInterruptJoin1(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
! cf -> cf.withThreadFactory(factory))) {
Subtask<String> subtask = scope.fork(() -> {
! Thread.sleep(60_000);
return "foo";
});
// join should throw
Thread.currentThread().interrupt();
try {
scope.join();
fail("join did not throw");
} catch (InterruptedException expected) {
! assertFalse(Thread.interrupted()); // interrupt status should be cleared
}
}
}
/**
* Test interrupt of thread blocked in join.
*/
@ParameterizedTest
@MethodSource("factories")
void testInterruptJoin2(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
+ cf -> cf.withThreadFactory(factory))) {
+
var latch = new CountDownLatch(1);
Subtask<String> subtask = scope.fork(() -> {
! Thread.sleep(60_000);
return "foo";
});
! // interrupt main thread when it blocks in join
! scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join");
try {
scope.join();
fail("join did not throw");
} catch (InterruptedException expected) {
assertFalse(Thread.interrupted()); // interrupt status should be clear
}
}
}
/**
! * Test join when scope is cancelled.
*/
@ParameterizedTest
@MethodSource("factories")
! void testJoinWhenCancelled(ThreadFactory factory) throws Exception {
! var countingThreadFactory = new CountingThreadFactory(factory);
! var testJoiner = new CancelAfterOneJoiner<String>();
! try (var scope = StructuredTaskScope.open(testJoiner,
! cf -> cf.withThreadFactory(countingThreadFactory))) {
! // fork subtask, the scope should be cancelled when the subtask completes
! var subtask1 = scope.fork(() -> "foo");
! while (!scope.isCancelled()) {
! Thread.sleep(20);
}
! // fork second subtask, it should not run
! var subtask2 = scope.fork(() -> {
Thread.sleep(Duration.ofDays(1));
return "bar";
});
scope.join();
assertEquals("foo", subtask1.get());
assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
}
}
/**
* Test join after scope is closed.
*/
@Test
void testJoinAfterClose() throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
scope.close();
assertThrows(IllegalStateException.class, () -> scope.join());
}
}
/**
! * Test join with timeout, subtasks finish before timeout expires.
*/
@ParameterizedTest
@MethodSource("factories")
! void testJoinWithTimeout1(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
+ cf -> cf.withThreadFactory(factory)
+ .withTimeout(Duration.ofDays(1)))) {
+
Subtask<String> subtask = scope.fork(() -> {
! Thread.sleep(Duration.ofSeconds(1));
return "foo";
});
! scope.join();
!
! assertFalse(scope.isCancelled());
assertEquals("foo", subtask.get());
}
}
/**
! * Test join with timeout, timeout expires before subtasks finish.
*/
@ParameterizedTest
@MethodSource("factories")
! void testJoinWithTimeout2(ThreadFactory factory) throws Exception {
! long startMillis = millisTime();
+ try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
+ cf -> cf.withThreadFactory(factory)
+ .withTimeout(Duration.ofSeconds(2)))) {
+
Subtask<Void> subtask = scope.fork(() -> {
Thread.sleep(Duration.ofDays(1));
return null;
});
! assertThrows(TimeoutException.class, scope::join);
! expectDuration(startMillis, /*min*/1900, /*max*/20_000);
!
! assertTrue(scope.isCancelled());
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
}
}
/**
! * Test join with timeout that has already expired.
*/
@ParameterizedTest
@MethodSource("factories")
! void testJoinWithTimeout3(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
! cf -> cf.withThreadFactory(factory)
+ .withTimeout(Duration.ofSeconds(-1)))) {
+
+ Subtask<Void> subtask = scope.fork(() -> {
Thread.sleep(Duration.ofDays(1));
return null;
});
! assertThrows(TimeoutException.class, scope::join);
!
! assertTrue(scope.isCancelled());
! assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
}
}
/**
! * Test that cancelling execution interrupts unfinished threads. This test uses
+ * a custom Joiner to cancel execution.
*/
@ParameterizedTest
@MethodSource("factories")
! void testCancelInterruptsThreads2(ThreadFactory factory) throws Exception {
! var testJoiner = new CancelAfterOneJoiner<String>();
!
! try (var scope = StructuredTaskScope.open(testJoiner,
! cf -> cf.withThreadFactory(factory))) {
+
+ // fork subtask1 that runs for a long time
+ var started = new CountDownLatch(1);
+ var interrupted = new CountDownLatch(1);
+ var subtask1 = scope.fork(() -> {
+ started.countDown();
+ try {
+ Thread.sleep(Duration.ofDays(1));
+ } catch (InterruptedException e) {
+ interrupted.countDown();
+ }
});
+ started.await();
! // fork subtask2, the scope should be cancelled when the subtask completes
! var subtask2 = scope.fork(() -> "bar");
! while (!scope.isCancelled()) {
! Thread.sleep(20);
}
! // subtask1 should be interrupted
! interrupted.await();
!
! scope.join();
! assertEquals(Subtask.State.UNAVAILABLE, subtask1.state());
! assertEquals("bar", subtask2.get());
}
}
/**
! * Test that timeout interrupts unfinished threads.
*/
@ParameterizedTest
@MethodSource("factories")
! void testTimeoutInterruptsThreads(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
! cf -> cf.withThreadFactory(factory)
+ .withTimeout(Duration.ofSeconds(2)))) {
! var started = new AtomicBoolean();
! var interrupted = new CountDownLatch(1);
! Subtask<Void> subtask = scope.fork(() -> {
+ started.set(true);
+ try {
+ Thread.sleep(Duration.ofDays(1));
+ } catch (InterruptedException e) {
+ interrupted.countDown();
+ }
+ return null;
});
! while (!scope.isCancelled()) {
! Thread.sleep(50);
}
! // if subtask started then it should be interrupted
! if (started.get()) {
! interrupted.await();
+ }
+
+ assertThrows(TimeoutException.class, scope::join);
+
+ assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
}
}
/**
! * Test close without join, no subtasks forked.
*/
@Test
void testCloseWithoutJoin1() {
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
// do nothing
}
}
/**
! * Test close without join, subtasks forked.
*/
@ParameterizedTest
@MethodSource("factories")
void testCloseWithoutJoin2(ThreadFactory factory) {
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
+ cf -> cf.withThreadFactory(factory))) {
Subtask<String> subtask = scope.fork(() -> {
Thread.sleep(Duration.ofDays(1));
return null;
});
! // first call to close should throw
assertThrows(IllegalStateException.class, scope::close);
+ // subsequent calls to close should not throw
+ for (int i = 0; i < 3; i++) {
+ scope.close();
+ }
+
// subtask result/exception not available
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
}
* Test close after join throws. Close should not throw as join attempted.
*/
@ParameterizedTest
@MethodSource("factories")
void testCloseAfterJoinThrows(ThreadFactory factory) throws Exception {
! try (var scope = new StructuredTaskScope<Object>()) {
var subtask = scope.fork(() -> {
Thread.sleep(Duration.ofDays(1));
return null;
});
// join throws
Thread.currentThread().interrupt();
assertThrows(InterruptedException.class, scope::join);
assertThrows(IllegalStateException.class, subtask::get);
- }
- }
-
- /**
- * Test close after joinUntil throws. Close should not throw as join attempted.
- */
- @ParameterizedTest
- @MethodSource("factories")
- void testCloseAfterJoinUntilThrows(ThreadFactory factory) throws Exception {
- try (var scope = new StructuredTaskScope<Object>()) {
- var subtask = scope.fork(() -> {
- Thread.sleep(Duration.ofDays(1));
- return null;
- });
! // joinUntil throws
- assertThrows(TimeoutException.class, () -> scope.joinUntil(Instant.now()));
- assertThrows(IllegalStateException.class, subtask::get);
- }
}
/**
! * Test close is owner confined.
*/
@ParameterizedTest
@MethodSource("factories")
void testCloseConfined(ThreadFactory factory) throws Exception {
! try (var scope = new StructuredTaskScope<Boolean>()) {
!
- // attempt to close from thread in scope
- Subtask<Boolean> subtask = scope.fork(() -> {
- assertThrows(WrongThreadException.class, scope::close);
- return true;
- });
-
- scope.join();
- assertTrue(subtask.get());
// random thread cannot close scope
try (var pool = Executors.newCachedThreadPool(factory)) {
Future<Boolean> future = pool.submit(() -> {
assertThrows(WrongThreadException.class, scope::close);
return null;
});
future.get();
}
}
}
/**
* Test close with interrupt status set.
*/
@ParameterizedTest
@MethodSource("factories")
void testInterruptClose1(ThreadFactory factory) throws Exception {
! try (var scope = new StructuredTaskScope<Object>(null, factory)) {
var done = new AtomicBoolean();
scope.fork(() -> {
try {
Thread.sleep(Duration.ofDays(1));
} catch (InterruptedException e) {
! // interrupted by shutdown, expected
}
Thread.sleep(Duration.ofMillis(100)); // force close to wait
done.set(true);
return null;
});
- scope.shutdown();
scope.join();
// invoke close with interrupt status set
Thread.currentThread().interrupt();
try {
* Test close after join throws. Close should not throw as join attempted.
*/
@ParameterizedTest
@MethodSource("factories")
void testCloseAfterJoinThrows(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
+ cf -> cf.withThreadFactory(factory))) {
var subtask = scope.fork(() -> {
Thread.sleep(Duration.ofDays(1));
return null;
});
// join throws
Thread.currentThread().interrupt();
assertThrows(InterruptedException.class, scope::join);
assertThrows(IllegalStateException.class, subtask::get);
! } // close should not throw
}
/**
! * Test close method is owner confined.
*/
@ParameterizedTest
@MethodSource("factories")
void testCloseConfined(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(),
! cf -> cf.withThreadFactory(factory))) {
// random thread cannot close scope
try (var pool = Executors.newCachedThreadPool(factory)) {
Future<Boolean> future = pool.submit(() -> {
assertThrows(WrongThreadException.class, scope::close);
return null;
});
future.get();
}
+
+ // subtask cannot close
+ Subtask<Boolean> subtask = scope.fork(() -> {
+ assertThrows(WrongThreadException.class, scope::close);
+ return true;
+ });
+ scope.join();
+ assertTrue(subtask.get());
}
}
/**
* Test close with interrupt status set.
*/
@ParameterizedTest
@MethodSource("factories")
void testInterruptClose1(ThreadFactory factory) throws Exception {
! var testJoiner = new CancelAfterOneJoiner<String>();
+ try (var scope = StructuredTaskScope.open(testJoiner,
+ cf -> cf.withThreadFactory(factory))) {
+
+ // fork first subtask, a straggler as it continues after being interrupted
+ var started = new CountDownLatch(1);
var done = new AtomicBoolean();
scope.fork(() -> {
+ started.countDown();
try {
Thread.sleep(Duration.ofDays(1));
} catch (InterruptedException e) {
! // interrupted by cancel, expected
}
Thread.sleep(Duration.ofMillis(100)); // force close to wait
done.set(true);
return null;
});
+ started.await();
+
+ // fork second subtask, the scope should be cancelled when this subtask completes
+ scope.fork(() -> "bar");
+ while (!scope.isCancelled()) {
+ Thread.sleep(20);
+ }
scope.join();
// invoke close with interrupt status set
Thread.currentThread().interrupt();
try {
* Test interrupting thread waiting in close.
*/
@ParameterizedTest
@MethodSource("factories")
void testInterruptClose2(ThreadFactory factory) throws Exception {
! try (var scope = new StructuredTaskScope<Object>(null, factory)) {
! var done = new AtomicBoolean();
Thread mainThread = Thread.currentThread();
scope.fork(() -> {
try {
Thread.sleep(Duration.ofDays(1));
} catch (InterruptedException e) {
! // interrupted by shutdown, expected
}
// interrupt main thread when it blocks in close
! interruptThreadAt(mainThread, "java.util.concurrent.StructuredTaskScope.close");
Thread.sleep(Duration.ofMillis(100)); // force close to wait
done.set(true);
return null;
});
- scope.shutdown(); // interrupts task
scope.join();
try {
scope.close();
} finally {
! assertTrue(Thread.interrupted()); // clear interrupt status
assertTrue(done.get());
}
}
}
/**
* Test that closing an enclosing scope closes the thread flock of a nested scope.
*/
@Test
void testCloseThrowsStructureViolation() throws Exception {
! try (var scope1 = new StructuredTaskScope<Object>()) {
! try (var scope2 = new StructuredTaskScope<Object>()) {
! // join + close enclosing scope
- scope1.join();
try {
scope1.close();
fail("close did not throw");
} catch (StructureViolationException expected) { }
! // underlying flock should be closed, fork should return a cancelled task
var executed = new AtomicBoolean();
! Subtask<Void> subtask = scope2.fork(() -> {
- executed.set(true);
- return null;
- });
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
scope2.join();
assertFalse(executed.get());
}
}
}
/**
! * A StructuredTaskScope that collects the subtasks notified to the handleComplete method.
- */
- private static class CollectAll<T> extends StructuredTaskScope<T> {
- private final Set<Subtask<? extends T>> subtasks = ConcurrentHashMap.newKeySet();
-
- CollectAll(ThreadFactory factory) {
- super(null, factory);
- }
-
- @Override
- protected void handleComplete(Subtask<? extends T> subtask) {
- subtasks.add(subtask);
- }
-
- Set<Subtask<? extends T>> subtasks() {
- return subtasks;
- }
-
- Subtask<? extends T> find(Callable<T> task) {
- return subtasks.stream()
- .filter(h -> task.equals(h.task()))
- .findAny()
- .orElseThrow();
- }
- }
-
- /**
- * Test that handleComplete method is invoked for tasks that complete before shutdown.
*/
! @ParameterizedTest
! @MethodSource("factories")
! void testHandleCompleteBeforeShutdown(ThreadFactory factory) throws Exception {
! try (var scope = new CollectAll<String>(factory)) {
! Callable<String> task1 = () -> "foo";
! Callable<String> task2 = () -> { throw new FooException(); };
- scope.fork(task1);
- scope.fork(task2);
- scope.join();
-
- var subtask1 = scope.find(task1);
- assertEquals("foo", subtask1.get());
-
- var subtask2 = scope.find(task2);
- assertTrue(subtask2.exception() instanceof FooException);
- }
- }
-
- /**
- * Test that handleComplete method is not invoked for tasks that finish after shutdown
- * or are forked after shutdown.
- */
- @ParameterizedTest
- @MethodSource("factories")
- void testHandleCompleteAfterShutdown(ThreadFactory factory) throws Exception {
- try (var scope = new CollectAll<String>(factory)) {
- Callable<String> task1 = () -> {
- try {
- Thread.sleep(Duration.ofDays(1));
- } catch (InterruptedException ignore) { }
- return "foo";
- };
- Callable<String> task2 = () -> {
- Thread.sleep(Duration.ofDays(1));
- return "bar";
- };
- Callable<String> task3 = () -> "baz";
-
- // forked before shutdown, will complete after shutdown
- var subtask1 = scope.fork(task1);
- var subtask2 = scope.fork(task2);
-
- scope.shutdown();
-
- // forked after shutdown
- var subtask3 = scope.fork(task3);
-
- scope.join();
-
- // handleComplete should not be called
- for (int i = 0; i < 3; i++) {
- assertEquals(0, scope.subtasks().size());
- Thread.sleep(20);
- }
-
- assertEquals(Subtask.State.UNAVAILABLE, subtask1.state());
- assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
- assertEquals(Subtask.State.UNAVAILABLE, subtask3.state());
}
}
/**
! * Test that the default handleComplete throws IllegalArgumentException if called
- * with a running task.
*/
@Test
! void testHandleCompleteThrows() throws Exception {
! class TestScope<T> extends StructuredTaskScope<T> {
! protected void handleComplete(Subtask<? extends T> subtask) {
! super.handleComplete(subtask);
}
! }
!
! try (var scope = new TestScope<String>()) {
! var subtask = scope.fork(() -> {
! Thread.sleep(Duration.ofDays(1));
! return "foo";
! });
-
- // running task
- assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
- assertThrows(IllegalArgumentException.class, () -> scope.handleComplete(subtask));
- scope.shutdown();
-
- // null task
- assertThrows(NullPointerException.class, () -> scope.handleComplete(null));
-
- scope.join();
}
}
/**
! * Test ensureOwnerAndJoined.
*/
! @ParameterizedTest
! @MethodSource("factories")
! void testEnsureOwnerAndJoined(ThreadFactory factory) throws Exception {
! class MyScope<T> extends StructuredTaskScope<T> {
! MyScope(ThreadFactory factory) {
! super(null, factory);
! }
! void invokeEnsureOwnerAndJoined() {
! super.ensureOwnerAndJoined();
! }
- }
-
- try (var scope = new MyScope<Boolean>(factory)) {
- // owner thread, before join
- scope.fork(() -> true);
- assertThrows(IllegalStateException.class, () -> {
- scope.invokeEnsureOwnerAndJoined();
- });
-
- // owner thread, after join
- scope.join();
- scope.invokeEnsureOwnerAndJoined();
-
- // thread in scope cannot invoke ensureOwnerAndJoined
- Subtask<Boolean> subtask = scope.fork(() -> {
- assertThrows(WrongThreadException.class, () -> {
- scope.invokeEnsureOwnerAndJoined();
- });
- return true;
- });
- scope.join();
- assertTrue(subtask.get());
-
- // random thread cannot invoke ensureOwnerAndJoined
- try (var pool = Executors.newSingleThreadExecutor()) {
- Future<Void> future = pool.submit(() -> {
- assertThrows(WrongThreadException.class, () -> {
- scope.invokeEnsureOwnerAndJoined();
- });
- return null;
- });
- future.get();
}
}
}
/**
! * Test ensureOwnerAndJoined after the task scope has been closed.
*/
! @ParameterizedTest
! @MethodSource("factories")
! void testEnsureOwnerAndJoinedAfterClose(ThreadFactory factory) throws Exception {
! class MyScope<T> extends StructuredTaskScope<T> {
! MyScope(ThreadFactory factory) {
! super(null, factory);
}
! public void invokeEnsureOwnerAndJoined() {
! super.ensureOwnerAndJoined();
}
! }
!
! // ensureOwnerAndJoined after close, join invoked
! try (var scope = new MyScope<String>(factory)) {
scope.fork(() -> "foo");
scope.join();
! scope.close();
- scope.invokeEnsureOwnerAndJoined(); // should not throw
}
! // ensureOwnerAndJoined after close, join not invoked
! try (var scope = new MyScope<String>(factory)) {
scope.fork(() -> "foo");
! assertThrows(IllegalStateException.class, scope::close);
! scope.invokeEnsureOwnerAndJoined(); // should not throw
}
}
-
/**
* Test toString.
*/
@Test
void testToString() throws Exception {
! ThreadFactory factory = Thread.ofVirtual().factory();
! try (var scope = new StructuredTaskScope<Object>("duke", factory)) {
- // open
- assertTrue(scope.toString().contains("duke"));
! // shutdown
- scope.shutdown();
assertTrue(scope.toString().contains("duke"));
// closed
- scope.join();
scope.close();
assertTrue(scope.toString().contains("duke"));
}
}
* Test interrupting thread waiting in close.
*/
@ParameterizedTest
@MethodSource("factories")
void testInterruptClose2(ThreadFactory factory) throws Exception {
! var testJoiner = new CancelAfterOneJoiner<String>();
! try (var scope = StructuredTaskScope.open(testJoiner,
+ cf -> cf.withThreadFactory(factory))) {
+
Thread mainThread = Thread.currentThread();
+
+ // fork first subtask, a straggler as it continues after being interrupted
+ var started = new CountDownLatch(1);
+ var done = new AtomicBoolean();
scope.fork(() -> {
+ started.countDown();
try {
Thread.sleep(Duration.ofDays(1));
} catch (InterruptedException e) {
! // interrupted by cancel, expected
}
// interrupt main thread when it blocks in close
! interruptThreadAt(mainThread, "java.util.concurrent.StructuredTaskScopeImpl.close");
Thread.sleep(Duration.ofMillis(100)); // force close to wait
done.set(true);
return null;
});
+ started.await();
+
+ // fork second subtask, the scope should be cancelled when this subtask completes
+ scope.fork(() -> "bar");
+ while (!scope.isCancelled()) {
+ Thread.sleep(20);
+ }
scope.join();
+
+ // main thread will be interrupted while blocked in close
try {
scope.close();
} finally {
! assertTrue(Thread.interrupted()); // clear interrupt status
assertTrue(done.get());
}
}
}
/**
* Test that closing an enclosing scope closes the thread flock of a nested scope.
*/
@Test
void testCloseThrowsStructureViolation() throws Exception {
! try (var scope1 = StructuredTaskScope.open(Joiner.awaitAll())) {
! try (var scope2 = StructuredTaskScope.open(Joiner.awaitAll())) {
! // close enclosing scope
try {
scope1.close();
fail("close did not throw");
} catch (StructureViolationException expected) { }
! // underlying flock should be closed
var executed = new AtomicBoolean();
! Subtask<?> subtask = scope2.fork(() -> executed.set(true));
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
scope2.join();
assertFalse(executed.get());
+ assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
}
}
}
/**
! * Test that isCancelled returns true after close.
*/
! @Test
! void testIsCancelledAfterClose() throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
! assertFalse(scope.isCancelled());
! scope.close();
! assertTrue(scope.isCancelled());
}
}
/**
! * Test Joiner.onFork throwing exception.
*/
@Test
! void testOnForkThrows() throws Exception {
! var joiner = new Joiner<String, Void>() {
! @Override
! public boolean onFork(Subtask<? extends String> subtask) {
+ throw new FooException();
}
! @Override
! public Void result() {
! return null;
! }
! };
! try (var scope = StructuredTaskScope.open(joiner)) {
! assertThrows(FooException.class, () -> scope.fork(() -> "foo"));
}
}
/**
! * Test Joiner.onFork returning true to cancel execution.
*/
! @Test
! void testOnForkCancelsExecution() throws Exception {
! var joiner = new Joiner<String, Void>() {
! @Override
! public boolean onFork(Subtask<? extends String> subtask) {
! return true;
! }
! @Override
! public Void result() {
! return null;
}
+ };
+ try (var scope = StructuredTaskScope.open(joiner)) {
+ assertFalse(scope.isCancelled());
+ scope.fork(() -> "foo");
+ assertTrue(scope.isCancelled());
+ scope.join();
}
}
/**
! * Test Joiner.onComplete throwing exception causes UHE to be invoked.
*/
! @Test
! void testOnCompleteThrows() throws Exception {
! var joiner = new Joiner<String, Void>() {
! @Override
! public boolean onComplete(Subtask<? extends String> subtask) {
! throw new FooException();
}
! @Override
! public Void result() {
+ return null;
}
! };
! var excRef = new AtomicReference<Throwable>();
! Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e);
! ThreadFactory factory = Thread.ofVirtual()
+ .uncaughtExceptionHandler(uhe)
+ .factory();
+ try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
scope.fork(() -> "foo");
scope.join();
! assertInstanceOf(FooException.class, excRef.get());
}
+ }
! /**
! * Test Joiner.onComplete returning true to cancel execution.
+ */
+ @Test
+ void testOnCompleteCancelsExecution() throws Exception {
+ var joiner = new Joiner<String, Void>() {
+ @Override
+ public boolean onComplete(Subtask<? extends String> subtask) {
+ return true;
+ }
+ @Override
+ public Void result() {
+ return null;
+ }
+ };
+ try (var scope = StructuredTaskScope.open(joiner)) {
+ assertFalse(scope.isCancelled());
scope.fork(() -> "foo");
! while (!scope.isCancelled()) {
! Thread.sleep(10);
+ }
+ scope.join();
}
}
/**
* Test toString.
*/
@Test
void testToString() throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
! cf -> cf.withName("duke"))) {
! // open
assertTrue(scope.toString().contains("duke"));
// closed
scope.close();
assertTrue(scope.toString().contains("duke"));
}
}
* Test Subtask with task that completes successfully.
*/
@ParameterizedTest
@MethodSource("factories")
void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception {
! try (var scope = new StructuredTaskScope<String>(null, factory)) {
! Callable<String> task = () -> "foo";
- Subtask<String> subtask = scope.fork(task);
! // before join, owner thread
! assertEquals(task, subtask.task());
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
scope.join();
// after join
- assertEquals(task, subtask.task());
assertEquals(Subtask.State.SUCCESS, subtask.state());
assertEquals("foo", subtask.get());
assertThrows(IllegalStateException.class, subtask::exception);
}
}
* Test Subtask with task that completes successfully.
*/
@ParameterizedTest
@MethodSource("factories")
void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
! cf -> cf.withThreadFactory(factory))) {
! Subtask<String> subtask = scope.fork(() -> "foo");
!
+ // before join
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
scope.join();
// after join
assertEquals(Subtask.State.SUCCESS, subtask.state());
assertEquals("foo", subtask.get());
assertThrows(IllegalStateException.class, subtask::exception);
}
}
* Test Subtask with task that fails.
*/
@ParameterizedTest
@MethodSource("factories")
void testSubtaskWhenFailed(ThreadFactory factory) throws Exception {
! try (var scope = new StructuredTaskScope<String>(null, factory)) {
! Callable<String> task = () -> { throw new FooException(); };
- Subtask<String> subtask = scope.fork(task);
! // before join, owner thread
! assertEquals(task, subtask.task());
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
scope.join();
// after join
- assertEquals(task, subtask.task());
assertEquals(Subtask.State.FAILED, subtask.state());
assertThrows(IllegalStateException.class, subtask::get);
assertTrue(subtask.exception() instanceof FooException);
}
}
* Test Subtask with task that fails.
*/
@ParameterizedTest
@MethodSource("factories")
void testSubtaskWhenFailed(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
! cf -> cf.withThreadFactory(factory))) {
! Subtask<String> subtask = scope.fork(() -> { throw new FooException(); });
!
+ // before join
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
scope.join();
// after join
assertEquals(Subtask.State.FAILED, subtask.state());
assertThrows(IllegalStateException.class, subtask::get);
assertTrue(subtask.exception() instanceof FooException);
}
}
* Test Subtask with a task that has not completed.
*/
@ParameterizedTest
@MethodSource("factories")
void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception {
! try (var scope = new StructuredTaskScope<Object>(null, factory)) {
! Callable<Void> task = () -> {
Thread.sleep(Duration.ofDays(1));
return null;
! };
- Subtask<Void> subtask = scope.fork(task);
// before join
- assertEquals(task, subtask.task());
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
// attempt join, join throws
Thread.currentThread().interrupt();
assertThrows(InterruptedException.class, scope::join);
// after join
- assertEquals(task, subtask.task());
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
}
}
/**
! * Test Subtask when forked after shutdown.
*/
@ParameterizedTest
@MethodSource("factories")
! void testSubtaskWhenShutdown(ThreadFactory factory) throws Exception {
! try (var scope = new StructuredTaskScope<Object>(null, factory)) {
! Callable<Void> task = () -> {
! Thread.sleep(Duration.ofDays(1));
! return null;
! };
! scope.shutdown();
- // fork after shutdown
- Subtask<Void> subtask = scope.fork(task);
scope.join();
! assertEquals(task, subtask.task());
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
}
}
* Test Subtask with a task that has not completed.
*/
@ParameterizedTest
@MethodSource("factories")
void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
! cf -> cf.withThreadFactory(factory))) {
+ Subtask<Void> subtask = scope.fork(() -> {
Thread.sleep(Duration.ofDays(1));
return null;
! });
// before join
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
// attempt join, join throws
Thread.currentThread().interrupt();
assertThrows(InterruptedException.class, scope::join);
// after join
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
}
}
/**
! * Test Subtask forked after execution cancelled.
*/
@ParameterizedTest
@MethodSource("factories")
! void testSubtaskWhenCancelled(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) {
! scope.fork(() -> "foo");
! while (!scope.isCancelled()) {
! Thread.sleep(20);
! }
+
+ var subtask = scope.fork(() -> "foo");
! // before join
+ assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
+ assertThrows(IllegalStateException.class, subtask::get);
+ assertThrows(IllegalStateException.class, subtask::exception);
scope.join();
!
+ // after join
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
assertThrows(IllegalStateException.class, subtask::get);
assertThrows(IllegalStateException.class, subtask::exception);
}
}
/**
* Test Subtask::toString.
*/
@Test
void testSubtaskToString() throws Exception {
! try (var scope = new StructuredTaskScope<Object>()) {
! // success
! var subtask1 = scope.fork(() -> "foo");
! scope.join();
! assertTrue(subtask1.toString().contains("Completed successfully"));
!
- // failed
var subtask2 = scope.fork(() -> { throw new FooException(); });
scope.join();
- assertTrue(subtask2.toString().contains("Failed"));
! // not completed
! Callable<Void> sleepForDay = () -> {
! Thread.sleep(Duration.ofDays(1));
! return null;
- };
- var subtask3 = scope.fork(sleepForDay);
- assertTrue(subtask3.toString().contains("Unavailable"));
! scope.shutdown();
! // forked after shutdown
! var subtask4 = scope.fork(sleepForDay);
! assertTrue(subtask4.toString().contains("Unavailable"));
! scope.join();
}
}
/**
! * Test ShutdownOnSuccess with no completed tasks.
*/
@Test
! void testShutdownOnSuccess1() throws Exception {
! try (var scope = new ShutdownOnSuccess<Object>()) {
! assertThrows(IllegalStateException.class, () -> scope.result());
! assertThrows(IllegalStateException.class, () -> scope.result(e -> null));
}
}
/**
! * Test ShutdownOnSuccess with tasks that complete successfully.
*/
@ParameterizedTest
@MethodSource("factories")
! void testShutdownOnSuccess2(ThreadFactory factory) throws Exception {
! try (var scope = new ShutdownOnSuccess<String>(null, factory)) {
scope.fork(() -> "foo");
! scope.join(); // ensures foo completes first
! scope.fork(() -> "bar");
- scope.join();
- assertEquals("foo", scope.result());
- assertEquals("foo", scope.result(e -> null));
}
}
/**
! * Test ShutdownOnSuccess with a task that completes successfully with a null result.
*/
@ParameterizedTest
@MethodSource("factories")
! void testShutdownOnSuccess3(ThreadFactory factory) throws Exception {
! try (var scope = new ShutdownOnSuccess<Object>(null, factory)) {
scope.fork(() -> null);
! scope.join();
! assertNull(scope.result());
- assertNull(scope.result(e -> null));
}
}
/**
! * Test ShutdownOnSuccess with tasks that complete succcessfully and tasks that fail.
*/
@ParameterizedTest
@MethodSource("factories")
! void testShutdownOnSuccess4(ThreadFactory factory) throws Exception {
! try (var scope = new ShutdownOnSuccess<String>(null, factory)) {
scope.fork(() -> "foo");
! scope.fork(() -> { throw new ArithmeticException(); });
! scope.join();
! assertEquals("foo", scope.result());
- assertEquals("foo", scope.result(e -> null));
}
}
/**
! * Test ShutdownOnSuccess with a task that fails.
*/
@ParameterizedTest
@MethodSource("factories")
! void testShutdownOnSuccess5(ThreadFactory factory) throws Exception {
! try (var scope = new ShutdownOnSuccess<Object>(null, factory)) {
! scope.fork(() -> { throw new ArithmeticException(); });
! scope.join();
! Throwable ex = assertThrows(ExecutionException.class, () -> scope.result());
! assertTrue(ex.getCause() instanceof ArithmeticException);
! ex = assertThrows(FooException.class, () -> scope.result(e -> new FooException(e)));
! assertTrue(ex.getCause() instanceof ArithmeticException);
}
}
/**
! * Test ShutdownOnSuccess methods are confined to the owner.
*/
@ParameterizedTest
@MethodSource("factories")
! void testShutdownOnSuccessConfined(ThreadFactory factory) throws Exception {
! // owner before join
! try (var scope = new ShutdownOnSuccess<Boolean>(null, factory)) {
! scope.fork(() -> { throw new FooException(); });
! assertThrows(IllegalStateException.class, scope::result);
! assertThrows(IllegalStateException.class, () -> {
! scope.result(e -> new RuntimeException(e));
! });
! scope.join();
}
! // non-owner
! try (var scope = new ShutdownOnSuccess<Boolean>(null, factory)) {
! Subtask<Boolean> subtask = scope.fork(() -> {
! assertThrows(WrongThreadException.class, scope::result);
! assertThrows(WrongThreadException.class, () -> {
! scope.result(e -> new RuntimeException(e));
! });
! return true;
! });
! scope.join();
! assertTrue(subtask.get());
}
}
/**
! * Test ShutdownOnFailure with no completed tasks.
*/
@Test
! void testShutdownOnFailure1() throws Throwable {
! try (var scope = new ShutdownOnFailure()) {
! assertTrue(scope.exception().isEmpty());
! scope.throwIfFailed();
- scope.throwIfFailed(e -> new FooException(e));
}
}
/**
! * Test ShutdownOnFailure with tasks that complete successfully.
*/
@ParameterizedTest
@MethodSource("factories")
! void testShutdownOnFailure2(ThreadFactory factory) throws Throwable {
! try (var scope = new ShutdownOnFailure(null, factory)) {
! scope.fork(() -> "foo");
! scope.fork(() -> "bar");
! scope.join();
! // no exception
! assertTrue(scope.exception().isEmpty());
! scope.throwIfFailed();
! scope.throwIfFailed(e -> new FooException(e));
}
}
/**
! * Test ShutdownOnFailure with tasks that complete succcessfully and tasks that fail.
*/
@ParameterizedTest
@MethodSource("factories")
! void testShutdownOnFailure3(ThreadFactory factory) throws Throwable {
! try (var scope = new ShutdownOnFailure(null, factory)) {
! // one task completes successfully, the other fails
! scope.fork(() -> "foo");
! scope.fork(() -> { throw new ArithmeticException(); });
! scope.join();
! Throwable ex = scope.exception().orElse(null);
! assertTrue(ex instanceof ArithmeticException);
! ex = assertThrows(ExecutionException.class, () -> scope.throwIfFailed());
- assertTrue(ex.getCause() instanceof ArithmeticException);
! ex = assertThrows(FooException.class,
! () -> scope.throwIfFailed(e -> new FooException(e)));
! assertTrue(ex.getCause() instanceof ArithmeticException);
}
}
/**
! * Test ShutdownOnFailure methods are confined to the owner.
*/
@ParameterizedTest
@MethodSource("factories")
! void testShutdownOnFailureConfined(ThreadFactory factory) throws Exception {
! // owner before join
! try (var scope = new ShutdownOnFailure(null, factory)) {
scope.fork(() -> "foo");
- assertThrows(IllegalStateException.class, scope::exception);
- assertThrows(IllegalStateException.class, scope::throwIfFailed);
- assertThrows(IllegalStateException.class, () -> {
- scope.throwIfFailed(e -> new RuntimeException(e));
- });
scope.join();
}
! // non-owner
! try (var scope = new ShutdownOnFailure(null, factory)) {
! Subtask<Boolean> subtask = scope.fork(() -> {
! assertThrows(WrongThreadException.class, scope::exception);
! assertThrows(WrongThreadException.class, scope::throwIfFailed);
! assertThrows(WrongThreadException.class, () -> {
! scope.throwIfFailed(e -> new RuntimeException(e));
! });
! return true;
! });
scope.join();
! assertTrue(subtask.get());
}
}
/**
* Test for NullPointerException.
*/
@Test
void testNulls() throws Exception {
! assertThrows(NullPointerException.class, () -> new StructuredTaskScope("", null));
! try (var scope = new StructuredTaskScope<Object>()) {
! assertThrows(NullPointerException.class, () -> scope.fork(null));
! assertThrows(NullPointerException.class, () -> scope.joinUntil(null));
}
! assertThrows(NullPointerException.class, () -> new ShutdownOnSuccess<Object>("", null));
! try (var scope = new ShutdownOnSuccess<Object>()) {
! assertThrows(NullPointerException.class, () -> scope.fork(null));
! assertThrows(NullPointerException.class, () -> scope.joinUntil(null));
! assertThrows(NullPointerException.class, () -> scope.result(null));
}
! assertThrows(NullPointerException.class, () -> new ShutdownOnFailure("", null));
! try (var scope = new ShutdownOnFailure()) {
! assertThrows(NullPointerException.class, () -> scope.fork(null));
! assertThrows(NullPointerException.class, () -> scope.joinUntil(null));
! assertThrows(NullPointerException.class, () -> scope.throwIfFailed(null));
}
}
/**
* A runtime exception for tests.
/**
* Test Subtask::toString.
*/
@Test
void testSubtaskToString() throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
! var latch = new CountDownLatch(1);
! var subtask1 = scope.fork(() -> {
! latch.await();
! return "foo";
! });
var subtask2 = scope.fork(() -> { throw new FooException(); });
+
+ // subtask1 result is unavailable
+ assertTrue(subtask1.toString().contains("Unavailable"));
+ latch.countDown();
+
scope.join();
! assertTrue(subtask1.toString().contains("Completed successfully"));
! assertTrue(subtask2.toString().contains("Failed"));
! }
! }
! /**
+ * Test Joiner.allSuccessfulOrThrow() with no subtasks.
+ */
+ @Test
+ void testAllSuccessfulOrThrow1() throws Throwable {
+ try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow())) {
+ var subtasks = scope.join().toList();
+ assertTrue(subtasks.isEmpty());
+ }
+ }
! /**
! * Test Joiner.allSuccessfulOrThrow() with subtasks that complete successfully.
! */
+ @ParameterizedTest
+ @MethodSource("factories")
+ void testAllSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
+ try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
+ cf -> cf.withThreadFactory(factory))) {
+ var subtask1 = scope.fork(() -> "foo");
+ var subtask2 = scope.fork(() -> "bar");
+ var subtasks = scope.join().toList();
+ assertEquals(List.of(subtask1, subtask2), subtasks);
+ assertEquals("foo", subtask1.get());
+ assertEquals("bar", subtask2.get());
+ }
+ }
! /**
+ * Test Joiner.allSuccessfulOrThrow() with a subtask that complete successfully and
+ * a subtask that fails.
+ */
+ @ParameterizedTest
+ @MethodSource("factories")
+ void testAllSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
+ try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
+ cf -> cf.withThreadFactory(factory))) {
+ scope.fork(() -> "foo");
+ scope.fork(() -> { throw new FooException(); });
+ try {
+ scope.join();
+ } catch (FailedException e) {
+ assertTrue(e.getCause() instanceof FooException);
+ }
}
}
/**
! * Test Joiner.anySuccessfulResultOrThrow() with no subtasks.
*/
@Test
! void testAnySuccessfulResultOrThrow1() throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
! try {
! scope.join();
+ } catch (FailedException e) {
+ assertTrue(e.getCause() instanceof NoSuchElementException);
+ }
}
}
/**
! * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully.
*/
@ParameterizedTest
@MethodSource("factories")
! void testAnySuccessfulResultOrThrow2(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
+ cf -> cf.withThreadFactory(factory))) {
scope.fork(() -> "foo");
! String result = scope.join();
! assertEquals("foo", result);
}
}
/**
! * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully
+ * with a null result.
*/
@ParameterizedTest
@MethodSource("factories")
! void testAnySuccessfulResultOrThrow3(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
+ cf -> cf.withThreadFactory(factory))) {
scope.fork(() -> null);
! String result = scope.join();
! assertNull(result);
}
}
/**
! * Test Joiner.anySuccessfulResultOrThrow() with a subtask that complete succcessfully
+ * and a subtask that fails.
*/
@ParameterizedTest
@MethodSource("factories")
! void testAnySuccessfulResultOrThrow4(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
+ cf -> cf.withThreadFactory(factory))) {
scope.fork(() -> "foo");
! scope.fork(() -> { throw new FooException(); });
! String first = scope.join();
! assertEquals("foo", first);
}
}
/**
! * Test Joiner.anySuccessfulResultOrThrow() with a subtask that fails.
*/
@ParameterizedTest
@MethodSource("factories")
! void testAnySuccessfulResultOrThrow5(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
! cf -> cf.withThreadFactory(factory))) {
! scope.fork(() -> { throw new FooException(); });
! Throwable ex = assertThrows(FailedException.class, scope::join);
! assertTrue(ex.getCause() instanceof FooException);
! }
! }
+
+ /**
+ * Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks.
+ */
+ @Test
+ void testAwaitSuccessfulOrThrow1() throws Throwable {
+ try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {
+ var result = scope.join();
+ assertNull(result);
}
}
/**
! * Test Joiner.awaitAllSuccessfulOrThrow() with subtasks that complete successfully.
*/
@ParameterizedTest
@MethodSource("factories")
! void testAwaitSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
! try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
! cf -> cf.withThreadFactory(factory))) {
! var subtask1 = scope.fork(() -> "foo");
! var subtask2 = scope.fork(() -> "bar");
! var result = scope.join();
! assertNull(result);
! assertEquals("foo", subtask1.get());
! assertEquals("bar", subtask2.get());
}
+ }
! /**
! * Test Joiner.awaitAllSuccessfulOrThrow() with a subtask that complete successfully and
! * a subtask that fails.
! */
! @ParameterizedTest
! @MethodSource("factories")
! void testAwaitSuccessfulOrThrow3(ThreadFactory factory) throws Throwable {
! try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
! cf -> cf.withThreadFactory(factory))) {
! scope.fork(() -> "foo");
! scope.fork(() -> { throw new FooException(); });
+ try {
+ scope.join();
+ } catch (FailedException e) {
+ assertTrue(e.getCause() instanceof FooException);
+ }
}
}
/**
! * Test Joiner.awaitAll() with no subtasks.
*/
@Test
! void testAwaitAll1() throws Throwable {
! try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
! var result = scope.join();
! assertNull(result);
}
}
/**
! * Test Joiner.awaitAll() with subtasks that complete successfully.
*/
@ParameterizedTest
@MethodSource("factories")
! void testAwaitAll2(ThreadFactory factory) throws Throwable {
! try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
! cf -> cf.withThreadFactory(factory))) {
! var subtask1 = scope.fork(() -> "foo");
! var subtask2 = scope.fork(() -> "bar");
+ var result = scope.join();
+ assertNull(result);
+ assertEquals("foo", subtask1.get());
+ assertEquals("bar", subtask2.get());
+ }
+ }
+
+ /**
+ * Test Joiner.awaitAll() with a subtask that complete successfully and a subtask
+ * that fails.
+ */
+ @ParameterizedTest
+ @MethodSource("factories")
+ void testAwaitAll3(ThreadFactory factory) throws Throwable {
+ try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
+ cf -> cf.withThreadFactory(factory))) {
+ var subtask1 = scope.fork(() -> "foo");
+ var subtask2 = scope.fork(() -> { throw new FooException(); });
+ var result = scope.join();
+ assertNull(result);
+ assertEquals("foo", subtask1.get());
+ assertTrue(subtask2.exception() instanceof FooException);
+ }
+ }
! /**
! * Test Joiner.allUntil(Predicate) with no subtasks.
! */
! @Test
+ void testAllUntil1() throws Throwable {
+ try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) {
+ var subtasks = scope.join();
+ assertEquals(0, subtasks.count());
}
}
/**
! * Test Joiner.allUntil(Predicate) with no cancellation.
*/
@ParameterizedTest
@MethodSource("factories")
! void testAllUntil2(ThreadFactory factory) throws Exception {
! try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false),
+ cf -> cf.withThreadFactory(factory))) {
! var subtask1 = scope.fork(() -> "foo");
! var subtask2 = scope.fork(() -> { throw new FooException(); });
!
! var subtasks = scope.join().toList();
+ assertEquals(2, subtasks.size());
+
+ assertTrue(subtasks.get(0) == subtask1);
+ assertTrue(subtasks.get(1) == subtask2);
+ assertEquals("foo", subtask1.get());
+ assertTrue(subtask2.exception() instanceof FooException);
+ }
+ }
+
+ /**
+ * Test Joiner.allUntil(Predicate) with cancellation after one subtask completes.
+ */
+ @ParameterizedTest
+ @MethodSource("factories")
+ void testAllUntil3(ThreadFactory factory) throws Exception {
+ try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> true),
+ cf -> cf.withThreadFactory(factory))) {
! var subtask1 = scope.fork(() -> "foo");
! var subtask2 = scope.fork(() -> {
+ Thread.sleep(Duration.ofDays(1));
+ return "bar";
+ });
! var subtasks = scope.join().toList();
! assertEquals(2, subtasks.size());
! assertTrue(subtasks.get(0) == subtask1);
! assertTrue(subtasks.get(1) == subtask2);
+ assertEquals("foo", subtask1.get());
+ assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
}
}
/**
! * Test Joiner.allUntil(Predicate) with cancellation after serveral subtasks complete.
*/
@ParameterizedTest
@MethodSource("factories")
! void testAllUntil4(ThreadFactory factory) throws Exception {
!
! // cancel execution after two or more failures
+ class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> {
+ final AtomicInteger failedCount = new AtomicInteger();
+ @Override
+ public boolean test(Subtask<? extends T> subtask) {
+ return subtask.state() == Subtask.State.FAILED
+ && failedCount.incrementAndGet() >= 2;
+ }
+ }
+ var joiner = Joiner.allUntil(new CancelAfterTwoFailures<String>());
+
+ try (var scope = StructuredTaskScope.open(joiner)) {
+ int forkCount = 0;
+
+ // fork subtasks until execution cancelled
+ while (!scope.isCancelled()) {
+ scope.fork(() -> "foo");
+ scope.fork(() -> { throw new FooException(); });
+ forkCount += 2;
+ Thread.sleep(Duration.ofMillis(10));
+ }
+
+ var subtasks = scope.join().toList();
+ assertEquals(forkCount, subtasks.size());
+
+ long failedCount = subtasks.stream()
+ .filter(s -> s.state() == Subtask.State.FAILED)
+ .count();
+ assertTrue(failedCount >= 2);
+ }
+ }
+
+ /**
+ * Test Test Joiner.allUntil(Predicate) where the Predicate's test method throws.
+ */
+ @Test
+ void testAllUntil5() throws Exception {
+ var joiner = Joiner.allUntil(_ -> { throw new FooException(); });
+ var excRef = new AtomicReference<Throwable>();
+ Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e);
+ ThreadFactory factory = Thread.ofVirtual()
+ .uncaughtExceptionHandler(uhe)
+ .factory();
+ try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
scope.fork(() -> "foo");
scope.join();
+ assertInstanceOf(FooException.class, excRef.get());
}
+ }
! /**
! * Test Joiner default methods.
! */
! @Test
! void testJoinerDefaultMethods() throws Exception {
! try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) {
!
! // need subtasks to test default methods
! var subtask1 = scope.fork(() -> "foo");
! while (!scope.isCancelled()) {
+ Thread.sleep(20);
+ }
+ var subtask2 = scope.fork(() -> "bar");
scope.join();
!
+ assertEquals(Subtask.State.SUCCESS, subtask1.state());
+ assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
+
+ // Joiner that does not override default methods
+ Joiner<Object, Void> joiner = () -> null;
+ assertThrows(NullPointerException.class, () -> joiner.onFork(null));
+ assertThrows(NullPointerException.class, () -> joiner.onComplete(null));
+ assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1));
+ assertFalse(joiner.onFork(subtask2));
+ assertFalse(joiner.onComplete(subtask1));
+ assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2));
+ }
+ }
+
+ /**
+ * Test the Config function apply method throwing an exception.
+ */
+ @Test
+ void testConfigFunctionThrows() throws Exception {
+ assertThrows(FooException.class,
+ () -> StructuredTaskScope.open(Joiner.awaitAll(),
+ cf -> { throw new FooException(); }));
+ }
+
+ /**
+ * Test Config equals/hashCode/toString
+ */
+ @Test
+ void testConfigMethods() throws Exception {
+ Function<Config, Config> testConfig = cf -> {
+ var name = "duke";
+ var threadFactory = Thread.ofPlatform().factory();
+ var timeout = Duration.ofSeconds(10);
+
+ assertEquals(cf, cf);
+ assertEquals(cf.withName(name), cf.withName(name));
+ assertEquals(cf.withThreadFactory(threadFactory), cf.withThreadFactory(threadFactory));
+ assertEquals(cf.withTimeout(timeout), cf.withTimeout(timeout));
+
+ assertNotEquals(cf, cf.withName(name));
+ assertNotEquals(cf, cf.withThreadFactory(threadFactory));
+ assertNotEquals(cf, cf.withTimeout(timeout));
+
+ assertEquals(cf.withName(name).hashCode(), cf.withName(name).hashCode());
+ assertEquals(cf.withThreadFactory(threadFactory).hashCode(),
+ cf.withThreadFactory(threadFactory).hashCode());
+ assertEquals(cf.withTimeout(timeout).hashCode(), cf.withTimeout(timeout).hashCode());
+
+ assertTrue(cf.withName(name).toString().contains(name));
+ assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString()));
+ assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString()));
+
+ return cf;
+ };
+ try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), testConfig)) {
+ // do nothing
}
}
/**
* Test for NullPointerException.
*/
@Test
void testNulls() throws Exception {
! assertThrows(NullPointerException.class,
! () -> StructuredTaskScope.open(null));
! assertThrows(NullPointerException.class,
! () -> StructuredTaskScope.open(null, cf -> cf));
+ assertThrows(NullPointerException.class,
+ () -> StructuredTaskScope.open(Joiner.awaitAll(), null));
+
+ assertThrows(NullPointerException.class, () -> Joiner.allUntil(null));
+
+ // fork
+ try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
+ assertThrows(NullPointerException.class, () -> scope.fork((Callable<Object>) null));
+ assertThrows(NullPointerException.class, () -> scope.fork((Runnable) null));
+ }
+
+ // Config and withXXX methods
+ assertThrows(NullPointerException.class,
+ () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> null));
+ assertThrows(NullPointerException.class,
+ () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withName(null)));
+ assertThrows(NullPointerException.class,
+ () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withThreadFactory(null)));
+ assertThrows(NullPointerException.class,
+ () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withTimeout(null)));
+
+ // Joiner.onFork/onComplete
+ assertThrows(NullPointerException.class,
+ () -> Joiner.awaitAllSuccessfulOrThrow().onFork(null));
+ assertThrows(NullPointerException.class,
+ () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(null));
+ assertThrows(NullPointerException.class,
+ () -> Joiner.awaitAll().onFork(null));
+ assertThrows(NullPointerException.class,
+ () -> Joiner.awaitAll().onComplete(null));
+ assertThrows(NullPointerException.class,
+ () -> Joiner.allSuccessfulOrThrow().onFork(null));
+ assertThrows(NullPointerException.class,
+ () -> Joiner.allSuccessfulOrThrow().onComplete(null));
+ assertThrows(NullPointerException.class,
+ () -> Joiner.anySuccessfulResultOrThrow().onFork(null));
+ assertThrows(NullPointerException.class,
+ () -> Joiner.anySuccessfulResultOrThrow().onComplete(null));
+ }
+
+ /**
+ * ThreadFactory that counts usage.
+ */
+ private static class CountingThreadFactory implements ThreadFactory {
+ final ThreadFactory delegate;
+ final AtomicInteger threadCount = new AtomicInteger();
+ CountingThreadFactory(ThreadFactory delegate) {
+ this.delegate = delegate;
+ }
+ @Override
+ public Thread newThread(Runnable task) {
+ threadCount.incrementAndGet();
+ return delegate.newThread(task);
+ }
+ int threadCount() {
+ return threadCount.get();
}
+ }
! /**
! * A joiner that counts that counts the number of subtasks that are forked and the
! * number of subtasks that complete.
! */
! private static class CountingJoiner<T> implements Joiner<T, Void> {
+ final AtomicInteger onForkCount = new AtomicInteger();
+ final AtomicInteger onCompleteCount = new AtomicInteger();
+ @Override
+ public boolean onFork(Subtask<? extends T> subtask) {
+ onForkCount.incrementAndGet();
+ return false;
}
+ @Override
+ public boolean onComplete(Subtask<? extends T> subtask) {
+ onCompleteCount.incrementAndGet();
+ return false;
+ }
+ @Override
+ public Void result() {
+ return null;
+ }
+ int onForkCount() {
+ return onForkCount.get();
+ }
+ int onCompleteCount() {
+ return onCompleteCount.get();
+ }
+ }
! /**
! * A joiner that cancels execution when a subtask completes. It also keeps a count
! * of the number of subtasks that are forked and the number of subtasks that complete.
! */
! private static class CancelAfterOneJoiner<T> implements Joiner<T, Void> {
+ final AtomicInteger onForkCount = new AtomicInteger();
+ final AtomicInteger onCompleteCount = new AtomicInteger();
+ @Override
+ public boolean onFork(Subtask<? extends T> subtask) {
+ onForkCount.incrementAndGet();
+ return false;
+ }
+ @Override
+ public boolean onComplete(Subtask<? extends T> subtask) {
+ onCompleteCount.incrementAndGet();
+ return true;
+ }
+ @Override
+ public Void result() {
+ return null;
+ }
+ int onForkCount() {
+ return onForkCount.get();
+ }
+ int onCompleteCount() {
+ return onCompleteCount.get();
}
}
/**
* A runtime exception for tests.
< prev index next >