1 /* 2 * Copyright (c) 2021, 2025, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test id=platform 26 * @bug 8284199 8296779 8306647 27 * @summary Basic tests for StructuredTaskScope 28 * @enablePreview 29 * @run junit/othervm -DthreadFactory=platform StructuredTaskScopeTest 30 */ 31 32 /* 33 * @test id=virtual 34 * @enablePreview 35 * @run junit/othervm -DthreadFactory=virtual StructuredTaskScopeTest 36 */ 37 38 import java.time.Duration; 39 import java.util.Arrays; 40 import java.util.ArrayList; 41 import java.util.List; 42 import java.util.Set; 43 import java.util.NoSuchElementException; 44 import java.util.concurrent.Callable; 45 import java.util.concurrent.ConcurrentHashMap; 46 import java.util.concurrent.CountDownLatch; 47 import java.util.concurrent.Executors; 48 import java.util.concurrent.Future; 49 import java.util.concurrent.LinkedTransferQueue; 50 import java.util.concurrent.ThreadFactory; 51 import java.util.concurrent.TimeUnit; 52 import java.util.concurrent.RejectedExecutionException; 53 import java.util.concurrent.ScheduledExecutorService; 54 import java.util.concurrent.StructuredTaskScope; 55 import java.util.concurrent.StructuredTaskScope.TimeoutException; 56 import java.util.concurrent.StructuredTaskScope.Configuration; 57 import java.util.concurrent.StructuredTaskScope.FailedException; 58 import java.util.concurrent.StructuredTaskScope.Joiner; 59 import java.util.concurrent.StructuredTaskScope.Subtask; 60 import java.util.concurrent.StructureViolationException; 61 import java.util.concurrent.atomic.AtomicBoolean; 62 import java.util.concurrent.atomic.AtomicInteger; 63 import java.util.concurrent.atomic.AtomicReference; 64 import java.util.function.Predicate; 65 import java.util.function.UnaryOperator; 66 import java.util.stream.Stream; 67 import static java.lang.Thread.State.*; 68 69 import org.junit.jupiter.api.Test; 70 import org.junit.jupiter.api.BeforeAll; 71 import org.junit.jupiter.api.AfterAll; 72 import org.junit.jupiter.params.ParameterizedTest; 73 import org.junit.jupiter.params.provider.MethodSource; 74 import static org.junit.jupiter.api.Assertions.*; 75 76 class StructuredTaskScopeTest { 77 private static ScheduledExecutorService scheduler; 78 private static List<ThreadFactory> threadFactories; 79 80 @BeforeAll 81 static void setup() throws Exception { 82 scheduler = Executors.newSingleThreadScheduledExecutor(); 83 84 // thread factories 85 String value = System.getProperty("threadFactory"); 86 List<ThreadFactory> list = new ArrayList<>(); 87 if (value == null || value.equals("platform")) 88 list.add(Thread.ofPlatform().factory()); 89 if (value == null || value.equals("virtual")) 90 list.add(Thread.ofVirtual().factory()); 91 assertTrue(list.size() > 0, "No thread factories for tests"); 92 threadFactories = list; 93 } 94 95 @AfterAll 96 static void shutdown() { 97 scheduler.shutdown(); 98 } 99 100 private static Stream<ThreadFactory> factories() { 101 return threadFactories.stream(); 102 } 103 104 /** 105 * Test that fork creates virtual threads when no ThreadFactory is configured. 106 */ 107 @Test 108 void testForkCreatesVirtualThread() throws Exception { 109 Set<Thread> threads = ConcurrentHashMap.newKeySet(); 110 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 111 for (int i = 0; i < 50; i++) { 112 // runnable 113 scope.fork(() -> { 114 threads.add(Thread.currentThread()); 115 }); 116 117 // callable 118 scope.fork(() -> { 119 threads.add(Thread.currentThread()); 120 return null; 121 }); 122 } 123 scope.join(); 124 } 125 assertEquals(100, threads.size()); 126 threads.forEach(t -> assertTrue(t.isVirtual())); 127 } 128 129 /** 130 * Test that fork create threads with the configured ThreadFactory. 131 */ 132 @ParameterizedTest 133 @MethodSource("factories") 134 void testForkUsesThreadFactory(ThreadFactory factory) throws Exception { 135 // TheadFactory that keeps reference to all threads it creates 136 class RecordingThreadFactory implements ThreadFactory { 137 final ThreadFactory delegate; 138 final Set<Thread> threads = ConcurrentHashMap.newKeySet(); 139 RecordingThreadFactory(ThreadFactory delegate) { 140 this.delegate = delegate; 141 } 142 @Override 143 public Thread newThread(Runnable task) { 144 Thread thread = delegate.newThread(task); 145 threads.add(thread); 146 return thread; 147 } 148 Set<Thread> threads() { 149 return threads; 150 } 151 } 152 var recordingThreadFactory = new RecordingThreadFactory(factory); 153 Set<Thread> threads = ConcurrentHashMap.newKeySet(); 154 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 155 cf -> cf.withThreadFactory(recordingThreadFactory))) { 156 157 for (int i = 0; i < 50; i++) { 158 // runnable 159 scope.fork(() -> { 160 threads.add(Thread.currentThread()); 161 }); 162 163 // callable 164 scope.fork(() -> { 165 threads.add(Thread.currentThread()); 166 return null; 167 }); 168 } 169 scope.join(); 170 } 171 assertEquals(100, threads.size()); 172 assertEquals(recordingThreadFactory.threads(), threads); 173 } 174 175 /** 176 * Test fork method is owner confined. 177 */ 178 @ParameterizedTest 179 @MethodSource("factories") 180 void testForkConfined(ThreadFactory factory) throws Exception { 181 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(), 182 cf -> cf.withThreadFactory(factory))) { 183 184 // random thread cannot fork 185 try (var pool = Executors.newSingleThreadExecutor()) { 186 Future<Void> future = pool.submit(() -> { 187 assertThrows(WrongThreadException.class, () -> { 188 scope.fork(() -> null); 189 }); 190 return null; 191 }); 192 future.get(); 193 } 194 195 // subtask cannot fork 196 Subtask<Boolean> subtask = scope.fork(() -> { 197 assertThrows(WrongThreadException.class, () -> { 198 scope.fork(() -> null); 199 }); 200 return true; 201 }); 202 scope.join(); 203 assertTrue(subtask.get()); 204 } 205 } 206 207 /** 208 * Test fork after join, no subtasks forked before join. 209 */ 210 @ParameterizedTest 211 @MethodSource("factories") 212 void testForkAfterJoinCompleted1(ThreadFactory factory) throws Exception { 213 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 214 cf -> cf.withThreadFactory(factory))) { 215 scope.join(); 216 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar")); 217 } 218 } 219 220 /** 221 * Test fork after join, subtasks forked before join. 222 */ 223 @ParameterizedTest 224 @MethodSource("factories") 225 void testForkAfterJoinCompleted2(ThreadFactory factory) throws Exception { 226 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 227 cf -> cf.withThreadFactory(factory))) { 228 scope.fork(() -> "foo"); 229 scope.join(); 230 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar")); 231 } 232 } 233 234 /** 235 * Test fork after join interrupted. 236 */ 237 @ParameterizedTest 238 @MethodSource("factories") 239 void testForkAfterJoinInterrupted(ThreadFactory factory) throws Exception { 240 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 241 cf -> cf.withThreadFactory(factory))) { 242 var subtask1 = scope.fork(() -> { 243 Thread.sleep(Duration.ofDays(1)); 244 return "foo"; 245 }); 246 247 // join throws 248 Thread.currentThread().interrupt(); 249 assertThrows(InterruptedException.class, scope::join); 250 251 // fork should throw 252 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar")); 253 } 254 } 255 256 /** 257 * Test fork after join timeout. 258 */ 259 @ParameterizedTest 260 @MethodSource("factories") 261 void testForkAfterJoinTimeout(ThreadFactory factory) throws Exception { 262 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 263 cf -> cf.withThreadFactory(factory) 264 .withTimeout(Duration.ofMillis(100)))) { 265 awaitCancelled(scope); 266 267 // join throws 268 assertThrows(TimeoutException.class, scope::join); 269 270 // fork should throw 271 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar")); 272 } 273 } 274 275 /** 276 * Test fork after task scope is cancelled. This test uses a custom Joiner to 277 * cancel execution. 278 */ 279 @ParameterizedTest 280 @MethodSource("factories") 281 void testForkAfterCancel2(ThreadFactory factory) throws Exception { 282 var countingThreadFactory = new CountingThreadFactory(factory); 283 var testJoiner = new CancelAfterOneJoiner<String>(); 284 285 try (var scope = StructuredTaskScope.open(testJoiner, 286 cf -> cf.withThreadFactory(countingThreadFactory))) { 287 288 // fork subtask, the scope should be cancelled when the subtask completes 289 var subtask1 = scope.fork(() -> "foo"); 290 awaitCancelled(scope); 291 292 assertEquals(1, countingThreadFactory.threadCount()); 293 assertEquals(1, testJoiner.onForkCount()); 294 assertEquals(1, testJoiner.onCompleteCount()); 295 296 // fork second subtask, it should not run 297 var subtask2 = scope.fork(() -> "bar"); 298 299 // onFork should be invoked, newThread and onComplete should not be invoked 300 assertEquals(1, countingThreadFactory.threadCount()); 301 assertEquals(2, testJoiner.onForkCount()); 302 assertEquals(1, testJoiner.onCompleteCount()); 303 304 scope.join(); 305 306 assertEquals(1, countingThreadFactory.threadCount()); 307 assertEquals(2, testJoiner.onForkCount()); 308 assertEquals(1, testJoiner.onCompleteCount()); 309 assertEquals("foo", subtask1.get()); 310 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 311 } 312 } 313 314 /** 315 * Test fork after task scope is closed. 316 */ 317 @ParameterizedTest 318 @MethodSource("factories") 319 void testForkAfterClose(ThreadFactory factory) { 320 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 321 cf -> cf.withThreadFactory(factory))) { 322 scope.close(); 323 assertThrows(IllegalStateException.class, () -> scope.fork(() -> null)); 324 } 325 } 326 327 /** 328 * Test fork with a ThreadFactory that rejects creating a thread. 329 */ 330 @Test 331 void testForkRejectedExecutionException() { 332 ThreadFactory factory = task -> null; 333 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 334 cf -> cf.withThreadFactory(factory))) { 335 assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null)); 336 } 337 } 338 339 /** 340 * Test join with no subtasks. 341 */ 342 @Test 343 void testJoinWithNoSubtasks() throws Exception { 344 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 345 scope.join(); 346 } 347 } 348 349 /** 350 * Test join with a remaining subtask. 351 */ 352 @ParameterizedTest 353 @MethodSource("factories") 354 void testJoinWithRemainingSubtasks(ThreadFactory factory) throws Exception { 355 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 356 cf -> cf.withThreadFactory(factory))) { 357 Subtask<String> subtask = scope.fork(() -> { 358 Thread.sleep(Duration.ofMillis(100)); 359 return "foo"; 360 }); 361 scope.join(); 362 assertEquals("foo", subtask.get()); 363 } 364 } 365 366 /** 367 * Test join after join completed with a result. 368 */ 369 @Test 370 void testJoinAfterJoin1() throws Exception { 371 var results = new LinkedTransferQueue<>(List.of("foo", "bar", "baz")); 372 Joiner<Object, String> joiner = results::take; 373 try (var scope = StructuredTaskScope.open(joiner)) { 374 scope.fork(() -> "foo"); 375 assertEquals("foo", scope.join()); 376 377 // join already called 378 for (int i = 0 ; i < 3; i++) { 379 assertThrows(IllegalStateException.class, scope::join); 380 } 381 } 382 } 383 384 /** 385 * Test join after join completed with an exception. 386 */ 387 @Test 388 void testJoinAfterJoin2() throws Exception { 389 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) { 390 scope.fork(() -> { throw new FooException(); }); 391 Throwable ex = assertThrows(FailedException.class, scope::join); 392 assertTrue(ex.getCause() instanceof FooException); 393 394 // join already called 395 for (int i = 0 ; i < 3; i++) { 396 assertThrows(IllegalStateException.class, scope::join); 397 } 398 } 399 } 400 401 /** 402 * Test join after join completed with a timeout. 403 */ 404 @Test 405 void testJoinAfterJoinInterrupted() throws Exception { 406 try (var scope = StructuredTaskScope.open()) { 407 var latch = new CountDownLatch(1); 408 var subtask = scope.fork(() -> { 409 latch.await(); 410 return "foo"; 411 }); 412 413 // join throws InterruptedException 414 Thread.currentThread().interrupt(); 415 assertThrows(InterruptedException.class, scope::join); 416 417 latch.countDown(); 418 419 // retry join to get result 420 scope.join(); 421 assertEquals("foo", subtask.get()); 422 423 // retry after otbaining result 424 assertThrows(IllegalStateException.class, scope::join); 425 } 426 } 427 428 /** 429 * Test join after join completed with a timeout. 430 */ 431 @Test 432 void testJoinAfterJoinTimeout() throws Exception { 433 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(), 434 cf -> cf.withTimeout(Duration.ofMillis(100)))) { 435 // wait for scope to be cancelled by timeout 436 awaitCancelled(scope); 437 assertThrows(TimeoutException.class, scope::join); 438 439 // join already called 440 for (int i = 0 ; i < 3; i++) { 441 assertThrows(IllegalStateException.class, scope::join); 442 } 443 } 444 } 445 446 /** 447 * Test join invoked from Joiner.onTimeout. 448 */ 449 @Test 450 void testJoinInOnTimeout() throws Exception { 451 Thread owner = Thread.currentThread(); 452 var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>(); 453 454 var joiner = new Joiner<String, Void>() { 455 @Override 456 public void onTimeout() { 457 assertTrue(Thread.currentThread() == owner); 458 var scope = scopeRef.get(); 459 assertThrows(IllegalStateException.class, scope::join); 460 } 461 @Override 462 public Void result() { 463 return null; 464 } 465 }; 466 467 try (var scope = StructuredTaskScope.open(joiner, 468 cf -> cf.withTimeout(Duration.ofMillis(100)))) { 469 awaitCancelled(scope); 470 scopeRef.set(scope); 471 scope.join(); // invokes onTimeout 472 } 473 } 474 475 /** 476 * Test join method is owner confined. 477 */ 478 @ParameterizedTest 479 @MethodSource("factories") 480 void testJoinConfined(ThreadFactory factory) throws Exception { 481 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(), 482 cf -> cf.withThreadFactory(factory))) { 483 484 // random thread cannot join 485 try (var pool = Executors.newSingleThreadExecutor()) { 486 Future<Void> future = pool.submit(() -> { 487 assertThrows(WrongThreadException.class, scope::join); 488 return null; 489 }); 490 future.get(); 491 } 492 493 // subtask cannot join 494 Subtask<Boolean> subtask = scope.fork(() -> { 495 assertThrows(WrongThreadException.class, () -> { scope.join(); }); 496 return true; 497 }); 498 scope.join(); 499 assertTrue(subtask.get()); 500 } 501 } 502 503 /** 504 * Test join with interrupt status set. 505 */ 506 @ParameterizedTest 507 @MethodSource("factories") 508 void testInterruptJoin1(ThreadFactory factory) throws Exception { 509 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 510 cf -> cf.withThreadFactory(factory))) { 511 512 Subtask<String> subtask = scope.fork(() -> { 513 Thread.sleep(Duration.ofDays(1)); 514 return "foo"; 515 }); 516 517 // join should throw 518 Thread.currentThread().interrupt(); 519 try { 520 scope.join(); 521 fail("join did not throw"); 522 } catch (InterruptedException expected) { 523 assertFalse(Thread.interrupted()); // interrupt status should be cleared 524 } 525 } 526 } 527 528 /** 529 * Test interrupt of thread blocked in join. 530 */ 531 @ParameterizedTest 532 @MethodSource("factories") 533 void testInterruptJoin2(ThreadFactory factory) throws Exception { 534 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 535 cf -> cf.withThreadFactory(factory))) { 536 Subtask<String> subtask = scope.fork(() -> { 537 Thread.sleep(Duration.ofDays(1)); 538 return "foo"; 539 }); 540 541 // interrupt main thread when it blocks in join 542 scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join"); 543 try { 544 scope.join(); 545 fail("join did not throw"); 546 } catch (InterruptedException expected) { 547 assertFalse(Thread.interrupted()); // interrupt status should be clear 548 } 549 } 550 } 551 552 /** 553 * Test join when scope is cancelled. 554 */ 555 @ParameterizedTest 556 @MethodSource("factories") 557 void testJoinWhenCancelled(ThreadFactory factory) throws Exception { 558 var countingThreadFactory = new CountingThreadFactory(factory); 559 var testJoiner = new CancelAfterOneJoiner<String>(); 560 561 try (var scope = StructuredTaskScope.open(testJoiner, 562 cf -> cf.withThreadFactory(countingThreadFactory))) { 563 564 // fork subtask, the scope should be cancelled when the subtask completes 565 var subtask1 = scope.fork(() -> "foo"); 566 awaitCancelled(scope); 567 568 // fork second subtask, it should not run 569 var subtask2 = scope.fork(() -> { 570 Thread.sleep(Duration.ofDays(1)); 571 return "bar"; 572 }); 573 574 scope.join(); 575 576 assertEquals("foo", subtask1.get()); 577 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 578 } 579 } 580 581 /** 582 * Test join after scope is closed. 583 */ 584 @Test 585 void testJoinAfterClose() throws Exception { 586 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 587 scope.close(); 588 assertThrows(IllegalStateException.class, () -> scope.join()); 589 } 590 } 591 592 /** 593 * Test join with timeout, subtasks finish before timeout expires. 594 */ 595 @ParameterizedTest 596 @MethodSource("factories") 597 void testJoinWithTimeout1(ThreadFactory factory) throws Exception { 598 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 599 cf -> cf.withThreadFactory(factory) 600 .withTimeout(Duration.ofDays(1)))) { 601 602 Subtask<String> subtask = scope.fork(() -> { 603 Thread.sleep(Duration.ofSeconds(1)); 604 return "foo"; 605 }); 606 607 scope.join(); 608 609 assertFalse(scope.isCancelled()); 610 assertEquals("foo", subtask.get()); 611 } 612 } 613 614 /** 615 * Test join with timeout, timeout expires before subtasks finish. 616 */ 617 @ParameterizedTest 618 @MethodSource("factories") 619 void testJoinWithTimeout2(ThreadFactory factory) throws Exception { 620 long startMillis = millisTime(); 621 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 622 cf -> cf.withThreadFactory(factory) 623 .withTimeout(Duration.ofSeconds(2)))) { 624 625 Subtask<Void> subtask = scope.fork(() -> { 626 Thread.sleep(Duration.ofDays(1)); 627 return null; 628 }); 629 630 assertThrows(TimeoutException.class, scope::join); 631 expectDuration(startMillis, /*min*/1900, /*max*/20_000); 632 633 assertTrue(scope.isCancelled()); 634 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 635 } 636 } 637 638 /** 639 * Test join with timeout that has already expired. 640 */ 641 @ParameterizedTest 642 @MethodSource("factories") 643 void testJoinWithTimeout3(ThreadFactory factory) throws Exception { 644 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 645 cf -> cf.withThreadFactory(factory) 646 .withTimeout(Duration.ofSeconds(-1)))) { 647 648 Subtask<Void> subtask = scope.fork(() -> { 649 Thread.sleep(Duration.ofDays(1)); 650 return null; 651 }); 652 653 assertThrows(TimeoutException.class, scope::join); 654 655 assertTrue(scope.isCancelled()); 656 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 657 } 658 } 659 660 /** 661 * Test that cancelling execution interrupts unfinished threads. This test uses 662 * a custom Joiner to cancel execution. 663 */ 664 @ParameterizedTest 665 @MethodSource("factories") 666 void testCancelInterruptsThreads2(ThreadFactory factory) throws Exception { 667 var testJoiner = new CancelAfterOneJoiner<String>(); 668 669 try (var scope = StructuredTaskScope.open(testJoiner, 670 cf -> cf.withThreadFactory(factory))) { 671 672 // fork subtask1 that runs for a long time 673 var started = new CountDownLatch(1); 674 var interrupted = new CountDownLatch(1); 675 var subtask1 = scope.fork(() -> { 676 started.countDown(); 677 try { 678 Thread.sleep(Duration.ofDays(1)); 679 } catch (InterruptedException e) { 680 interrupted.countDown(); 681 } 682 }); 683 started.await(); 684 685 // fork subtask2, the scope should be cancelled when the subtask completes 686 var subtask2 = scope.fork(() -> "bar"); 687 awaitCancelled(scope); 688 689 // subtask1 should be interrupted 690 interrupted.await(); 691 692 scope.join(); 693 assertEquals(Subtask.State.UNAVAILABLE, subtask1.state()); 694 assertEquals("bar", subtask2.get()); 695 } 696 } 697 698 /** 699 * Test that timeout interrupts unfinished threads. 700 */ 701 @ParameterizedTest 702 @MethodSource("factories") 703 void testTimeoutInterruptsThreads(ThreadFactory factory) throws Exception { 704 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 705 cf -> cf.withThreadFactory(factory) 706 .withTimeout(Duration.ofSeconds(2)))) { 707 708 var started = new AtomicBoolean(); 709 var interrupted = new CountDownLatch(1); 710 Subtask<Void> subtask = scope.fork(() -> { 711 started.set(true); 712 try { 713 Thread.sleep(Duration.ofDays(1)); 714 } catch (InterruptedException e) { 715 interrupted.countDown(); 716 } 717 return null; 718 }); 719 720 // wait for scope to be cancelled by timeout 721 awaitCancelled(scope); 722 723 // if subtask started then it should be interrupted 724 if (started.get()) { 725 interrupted.await(); 726 } 727 728 assertThrows(TimeoutException.class, scope::join); 729 730 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 731 } 732 } 733 734 /** 735 * Test close without join, no subtasks forked. 736 */ 737 @Test 738 void testCloseWithoutJoin1() { 739 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 740 // do nothing 741 } 742 } 743 744 /** 745 * Test close without join, subtasks forked. 746 */ 747 @ParameterizedTest 748 @MethodSource("factories") 749 void testCloseWithoutJoin2(ThreadFactory factory) { 750 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 751 cf -> cf.withThreadFactory(factory))) { 752 Subtask<String> subtask = scope.fork(() -> { 753 Thread.sleep(Duration.ofDays(1)); 754 return null; 755 }); 756 757 // first call to close should throw 758 assertThrows(IllegalStateException.class, scope::close); 759 760 // subsequent calls to close should not throw 761 for (int i = 0; i < 3; i++) { 762 scope.close(); 763 } 764 765 // subtask result/exception not available 766 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 767 assertThrows(IllegalStateException.class, subtask::get); 768 assertThrows(IllegalStateException.class, subtask::exception); 769 } 770 } 771 772 /** 773 * Test close after join throws. Close should not throw as join attempted. 774 */ 775 @ParameterizedTest 776 @MethodSource("factories") 777 void testCloseAfterJoinThrows(ThreadFactory factory) throws Exception { 778 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 779 cf -> cf.withThreadFactory(factory))) { 780 var subtask = scope.fork(() -> { 781 Thread.sleep(Duration.ofDays(1)); 782 return null; 783 }); 784 785 // join throws 786 Thread.currentThread().interrupt(); 787 assertThrows(InterruptedException.class, scope::join); 788 assertThrows(IllegalStateException.class, subtask::get); 789 790 } // close should not throw 791 } 792 793 /** 794 * Test close method is owner confined. 795 */ 796 @ParameterizedTest 797 @MethodSource("factories") 798 void testCloseConfined(ThreadFactory factory) throws Exception { 799 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(), 800 cf -> cf.withThreadFactory(factory))) { 801 802 // random thread cannot close scope 803 try (var pool = Executors.newCachedThreadPool(factory)) { 804 Future<Boolean> future = pool.submit(() -> { 805 assertThrows(WrongThreadException.class, scope::close); 806 return null; 807 }); 808 future.get(); 809 } 810 811 // subtask cannot close 812 Subtask<Boolean> subtask = scope.fork(() -> { 813 assertThrows(WrongThreadException.class, scope::close); 814 return true; 815 }); 816 scope.join(); 817 assertTrue(subtask.get()); 818 } 819 } 820 821 /** 822 * Test close with interrupt status set. 823 */ 824 @ParameterizedTest 825 @MethodSource("factories") 826 void testInterruptClose1(ThreadFactory factory) throws Exception { 827 var testJoiner = new CancelAfterOneJoiner<String>(); 828 try (var scope = StructuredTaskScope.open(testJoiner, 829 cf -> cf.withThreadFactory(factory))) { 830 831 // fork first subtask, a straggler as it continues after being interrupted 832 var started = new CountDownLatch(1); 833 var done = new AtomicBoolean(); 834 scope.fork(() -> { 835 started.countDown(); 836 try { 837 Thread.sleep(Duration.ofDays(1)); 838 } catch (InterruptedException e) { 839 // interrupted by cancel, expected 840 } 841 Thread.sleep(Duration.ofMillis(100)); // force close to wait 842 done.set(true); 843 return null; 844 }); 845 started.await(); 846 847 // fork second subtask, the scope should be cancelled when this subtask completes 848 scope.fork(() -> "bar"); 849 awaitCancelled(scope); 850 851 scope.join(); 852 853 // invoke close with interrupt status set 854 Thread.currentThread().interrupt(); 855 try { 856 scope.close(); 857 } finally { 858 assertTrue(Thread.interrupted()); // clear interrupt status 859 assertTrue(done.get()); 860 } 861 } 862 } 863 864 /** 865 * Test interrupting thread waiting in close. 866 */ 867 @ParameterizedTest 868 @MethodSource("factories") 869 void testInterruptClose2(ThreadFactory factory) throws Exception { 870 var testJoiner = new CancelAfterOneJoiner<String>(); 871 try (var scope = StructuredTaskScope.open(testJoiner, 872 cf -> cf.withThreadFactory(factory))) { 873 874 Thread mainThread = Thread.currentThread(); 875 876 // fork first subtask, a straggler as it continues after being interrupted 877 var started = new CountDownLatch(1); 878 var done = new AtomicBoolean(); 879 scope.fork(() -> { 880 started.countDown(); 881 try { 882 Thread.sleep(Duration.ofDays(1)); 883 } catch (InterruptedException e) { 884 // interrupted by cancel, expected 885 } 886 887 // interrupt main thread when it blocks in close 888 interruptThreadAt(mainThread, "java.util.concurrent.StructuredTaskScopeImpl.close"); 889 890 Thread.sleep(Duration.ofMillis(100)); // force close to wait 891 done.set(true); 892 return null; 893 }); 894 started.await(); 895 896 // fork second subtask, the scope should be cancelled when this subtask completes 897 scope.fork(() -> "bar"); 898 awaitCancelled(scope); 899 900 scope.join(); 901 902 // main thread will be interrupted while blocked in close 903 try { 904 scope.close(); 905 } finally { 906 assertTrue(Thread.interrupted()); // clear interrupt status 907 assertTrue(done.get()); 908 } 909 } 910 } 911 912 /** 913 * Test that closing an enclosing scope closes the thread flock of a nested scope. 914 */ 915 @Test 916 void testCloseThrowsStructureViolation() throws Exception { 917 try (var scope1 = StructuredTaskScope.open(Joiner.awaitAll())) { 918 try (var scope2 = StructuredTaskScope.open(Joiner.awaitAll())) { 919 920 // close enclosing scope 921 try { 922 scope1.close(); 923 fail("close did not throw"); 924 } catch (StructureViolationException expected) { } 925 926 // underlying flock should be closed 927 var executed = new AtomicBoolean(); 928 Subtask<?> subtask = scope2.fork(() -> executed.set(true)); 929 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 930 scope2.join(); 931 assertFalse(executed.get()); 932 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 933 } 934 } 935 } 936 937 /** 938 * Test that isCancelled returns true after close. 939 */ 940 @Test 941 void testIsCancelledAfterClose() throws Exception { 942 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 943 assertFalse(scope.isCancelled()); 944 scope.close(); 945 assertTrue(scope.isCancelled()); 946 } 947 } 948 949 /** 950 * Test Joiner.onFork throwing exception. 951 */ 952 @Test 953 void testOnForkThrows() throws Exception { 954 var joiner = new Joiner<String, Void>() { 955 @Override 956 public boolean onFork(Subtask<? extends String> subtask) { 957 throw new FooException(); 958 } 959 @Override 960 public Void result() { 961 return null; 962 } 963 }; 964 try (var scope = StructuredTaskScope.open(joiner)) { 965 assertThrows(FooException.class, () -> scope.fork(() -> "foo")); 966 } 967 } 968 969 /** 970 * Test Joiner.onFork returning true to cancel execution. 971 */ 972 @Test 973 void testOnForkCancelsExecution() throws Exception { 974 var joiner = new Joiner<String, Void>() { 975 @Override 976 public boolean onFork(Subtask<? extends String> subtask) { 977 return true; 978 } 979 @Override 980 public Void result() { 981 return null; 982 } 983 }; 984 try (var scope = StructuredTaskScope.open(joiner)) { 985 assertFalse(scope.isCancelled()); 986 scope.fork(() -> "foo"); 987 assertTrue(scope.isCancelled()); 988 scope.join(); 989 } 990 } 991 992 /** 993 * Test Joiner.onComplete throwing exception causes UHE to be invoked. 994 */ 995 @Test 996 void testOnCompleteThrows() throws Exception { 997 var joiner = new Joiner<String, Void>() { 998 @Override 999 public boolean onComplete(Subtask<? extends String> subtask) { 1000 throw new FooException(); 1001 } 1002 @Override 1003 public Void result() { 1004 return null; 1005 } 1006 }; 1007 var excRef = new AtomicReference<Throwable>(); 1008 Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e); 1009 ThreadFactory factory = Thread.ofVirtual() 1010 .uncaughtExceptionHandler(uhe) 1011 .factory(); 1012 try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) { 1013 scope.fork(() -> "foo"); 1014 scope.join(); 1015 assertInstanceOf(FooException.class, excRef.get()); 1016 } 1017 } 1018 1019 /** 1020 * Test Joiner.onComplete returning true to cancel execution. 1021 */ 1022 @Test 1023 void testOnCompleteCancelsExecution() throws Exception { 1024 var joiner = new Joiner<String, Void>() { 1025 @Override 1026 public boolean onComplete(Subtask<? extends String> subtask) { 1027 return true; 1028 } 1029 @Override 1030 public Void result() { 1031 return null; 1032 } 1033 }; 1034 try (var scope = StructuredTaskScope.open(joiner)) { 1035 assertFalse(scope.isCancelled()); 1036 scope.fork(() -> "foo"); 1037 awaitCancelled(scope); 1038 scope.join(); 1039 } 1040 } 1041 1042 /** 1043 * Test Joiner.onTimeout invoked by owner thread when timeout expires. 1044 */ 1045 @Test 1046 void testOnTimeoutInvoked() throws Exception { 1047 var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>(); 1048 Thread owner = Thread.currentThread(); 1049 var invokeCount = new AtomicInteger(); 1050 var joiner = new Joiner<String, Void>() { 1051 @Override 1052 public void onTimeout() { 1053 assertTrue(Thread.currentThread() == owner); 1054 assertTrue(scopeRef.get().isCancelled()); 1055 invokeCount.incrementAndGet(); 1056 } 1057 @Override 1058 public Void result() { 1059 return null; 1060 } 1061 }; 1062 try (var scope = StructuredTaskScope.open(joiner, 1063 cf -> cf.withTimeout(Duration.ofMillis(100)))) { 1064 scopeRef.set(scope); 1065 scope.fork(() -> { 1066 Thread.sleep(Duration.ofDays(1)); 1067 return null; 1068 }); 1069 scope.join(); 1070 assertEquals(1, invokeCount.get()); 1071 } 1072 } 1073 1074 /** 1075 * Test Joiner.onTimeout throwing an excepiton. 1076 */ 1077 @Test 1078 void testOnTimeoutThrows() throws Exception { 1079 var joiner = new Joiner<String, Void>() { 1080 @Override 1081 public void onTimeout() { 1082 throw new FooException(); 1083 } 1084 @Override 1085 public Void result() { 1086 return null; 1087 } 1088 }; 1089 try (var scope = StructuredTaskScope.open(joiner, 1090 cf -> cf.withTimeout(Duration.ofMillis(100)))) { 1091 // wait for scope to be cancelled by timeout 1092 awaitCancelled(scope); 1093 1094 // join should throw FooException on first usage 1095 assertThrows(FooException.class, scope::join); 1096 1097 // retry after onTimeout fails 1098 assertThrows(IllegalStateException.class, scope::join); 1099 } 1100 } 1101 1102 /** 1103 * Test toString. 1104 */ 1105 @Test 1106 void testToString() throws Exception { 1107 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 1108 cf -> cf.withName("duke"))) { 1109 1110 // open 1111 assertTrue(scope.toString().contains("duke")); 1112 1113 // closed 1114 scope.close(); 1115 assertTrue(scope.toString().contains("duke")); 1116 } 1117 } 1118 1119 /** 1120 * Test Subtask with task that completes successfully. 1121 */ 1122 @ParameterizedTest 1123 @MethodSource("factories") 1124 void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception { 1125 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 1126 cf -> cf.withThreadFactory(factory))) { 1127 1128 Subtask<String> subtask = scope.fork(() -> "foo"); 1129 1130 // before join 1131 assertThrows(IllegalStateException.class, subtask::get); 1132 assertThrows(IllegalStateException.class, subtask::exception); 1133 1134 scope.join(); 1135 1136 // after join 1137 assertEquals(Subtask.State.SUCCESS, subtask.state()); 1138 assertEquals("foo", subtask.get()); 1139 assertThrows(IllegalStateException.class, subtask::exception); 1140 } 1141 } 1142 1143 /** 1144 * Test Subtask with task that fails. 1145 */ 1146 @ParameterizedTest 1147 @MethodSource("factories") 1148 void testSubtaskWhenFailed(ThreadFactory factory) throws Exception { 1149 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 1150 cf -> cf.withThreadFactory(factory))) { 1151 1152 Subtask<String> subtask = scope.fork(() -> { throw new FooException(); }); 1153 1154 // before join 1155 assertThrows(IllegalStateException.class, subtask::get); 1156 assertThrows(IllegalStateException.class, subtask::exception); 1157 1158 scope.join(); 1159 1160 // after join 1161 assertEquals(Subtask.State.FAILED, subtask.state()); 1162 assertThrows(IllegalStateException.class, subtask::get); 1163 assertTrue(subtask.exception() instanceof FooException); 1164 } 1165 } 1166 1167 /** 1168 * Test Subtask with a task that has not completed. 1169 */ 1170 @ParameterizedTest 1171 @MethodSource("factories") 1172 void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception { 1173 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 1174 cf -> cf.withThreadFactory(factory))) { 1175 Subtask<Void> subtask = scope.fork(() -> { 1176 Thread.sleep(Duration.ofDays(1)); 1177 return null; 1178 }); 1179 1180 // before join 1181 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1182 assertThrows(IllegalStateException.class, subtask::get); 1183 assertThrows(IllegalStateException.class, subtask::exception); 1184 1185 // attempt join, join throws 1186 Thread.currentThread().interrupt(); 1187 assertThrows(InterruptedException.class, scope::join); 1188 1189 // after join 1190 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1191 assertThrows(IllegalStateException.class, subtask::get); 1192 assertThrows(IllegalStateException.class, subtask::exception); 1193 } 1194 } 1195 1196 /** 1197 * Test Subtask forked after execution cancelled. 1198 */ 1199 @ParameterizedTest 1200 @MethodSource("factories") 1201 void testSubtaskWhenCancelled(ThreadFactory factory) throws Exception { 1202 try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) { 1203 scope.fork(() -> "foo"); 1204 awaitCancelled(scope); 1205 1206 var subtask = scope.fork(() -> "foo"); 1207 1208 // before join 1209 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1210 assertThrows(IllegalStateException.class, subtask::get); 1211 assertThrows(IllegalStateException.class, subtask::exception); 1212 1213 scope.join(); 1214 1215 // after join 1216 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1217 assertThrows(IllegalStateException.class, subtask::get); 1218 assertThrows(IllegalStateException.class, subtask::exception); 1219 } 1220 } 1221 1222 /** 1223 * Test Subtask::toString. 1224 */ 1225 @Test 1226 void testSubtaskToString() throws Exception { 1227 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 1228 var latch = new CountDownLatch(1); 1229 var subtask1 = scope.fork(() -> { 1230 latch.await(); 1231 return "foo"; 1232 }); 1233 var subtask2 = scope.fork(() -> { throw new FooException(); }); 1234 1235 // subtask1 result is unavailable 1236 assertTrue(subtask1.toString().contains("Unavailable")); 1237 latch.countDown(); 1238 1239 scope.join(); 1240 1241 assertTrue(subtask1.toString().contains("Completed successfully")); 1242 assertTrue(subtask2.toString().contains("Failed")); 1243 } 1244 } 1245 1246 /** 1247 * Test Joiner.allSuccessfulOrThrow() with no subtasks. 1248 */ 1249 @Test 1250 void testAllSuccessfulOrThrow1() throws Throwable { 1251 try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow())) { 1252 var subtasks = scope.join().toList(); 1253 assertTrue(subtasks.isEmpty()); 1254 } 1255 } 1256 1257 /** 1258 * Test Joiner.allSuccessfulOrThrow() with subtasks that complete successfully. 1259 */ 1260 @ParameterizedTest 1261 @MethodSource("factories") 1262 void testAllSuccessfulOrThrow2(ThreadFactory factory) throws Throwable { 1263 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(), 1264 cf -> cf.withThreadFactory(factory))) { 1265 var subtask1 = scope.fork(() -> "foo"); 1266 var subtask2 = scope.fork(() -> "bar"); 1267 var subtasks = scope.join().toList(); 1268 assertEquals(List.of(subtask1, subtask2), subtasks); 1269 assertEquals("foo", subtask1.get()); 1270 assertEquals("bar", subtask2.get()); 1271 } 1272 } 1273 1274 /** 1275 * Test Joiner.allSuccessfulOrThrow() with a subtask that complete successfully and 1276 * a subtask that fails. 1277 */ 1278 @ParameterizedTest 1279 @MethodSource("factories") 1280 void testAllSuccessfulOrThrow3(ThreadFactory factory) throws Throwable { 1281 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(), 1282 cf -> cf.withThreadFactory(factory))) { 1283 scope.fork(() -> "foo"); 1284 scope.fork(() -> { throw new FooException(); }); 1285 try { 1286 scope.join(); 1287 } catch (FailedException e) { 1288 assertTrue(e.getCause() instanceof FooException); 1289 } 1290 } 1291 } 1292 1293 /** 1294 * Test Joiner.allSuccessfulOrThrow() with a timeout. 1295 */ 1296 @Test 1297 void testAllSuccessfulOrThrow4() throws Exception { 1298 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(), 1299 cf -> cf.withTimeout(Duration.ofMillis(100)))) { 1300 scope.fork(() -> "foo"); 1301 scope.fork(() -> { 1302 Thread.sleep(Duration.ofDays(1)); 1303 return "bar"; 1304 }); 1305 assertThrows(TimeoutException.class, scope::join); 1306 1307 // retry after join throws TimeoutException 1308 assertThrows(IllegalStateException.class, scope::join); 1309 } 1310 } 1311 1312 /** 1313 * Test Joiner.anySuccessfulResultOrThrow() with no subtasks. 1314 */ 1315 @Test 1316 void testAnySuccessfulResultOrThrow1() throws Exception { 1317 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) { 1318 try { 1319 scope.join(); 1320 } catch (FailedException e) { 1321 assertTrue(e.getCause() instanceof NoSuchElementException); 1322 } 1323 } 1324 } 1325 1326 /** 1327 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully. 1328 */ 1329 @ParameterizedTest 1330 @MethodSource("factories") 1331 void testAnySuccessfulResultOrThrow2(ThreadFactory factory) throws Exception { 1332 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(), 1333 cf -> cf.withThreadFactory(factory))) { 1334 scope.fork(() -> "foo"); 1335 String result = scope.join(); 1336 assertEquals("foo", result); 1337 } 1338 } 1339 1340 /** 1341 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully 1342 * with a null result. 1343 */ 1344 @ParameterizedTest 1345 @MethodSource("factories") 1346 void testAnySuccessfulResultOrThrow3(ThreadFactory factory) throws Exception { 1347 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(), 1348 cf -> cf.withThreadFactory(factory))) { 1349 scope.fork(() -> null); 1350 String result = scope.join(); 1351 assertNull(result); 1352 } 1353 } 1354 1355 /** 1356 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that complete succcessfully 1357 * and a subtask that fails. 1358 */ 1359 @ParameterizedTest 1360 @MethodSource("factories") 1361 void testAnySuccessfulResultOrThrow4(ThreadFactory factory) throws Exception { 1362 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(), 1363 cf -> cf.withThreadFactory(factory))) { 1364 scope.fork(() -> "foo"); 1365 scope.fork(() -> { throw new FooException(); }); 1366 String first = scope.join(); 1367 assertEquals("foo", first); 1368 } 1369 } 1370 1371 /** 1372 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that fails. 1373 */ 1374 @ParameterizedTest 1375 @MethodSource("factories") 1376 void testAnySuccessfulResultOrThrow5(ThreadFactory factory) throws Exception { 1377 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(), 1378 cf -> cf.withThreadFactory(factory))) { 1379 scope.fork(() -> { throw new FooException(); }); 1380 Throwable ex = assertThrows(FailedException.class, scope::join); 1381 assertTrue(ex.getCause() instanceof FooException); 1382 } 1383 } 1384 1385 /** 1386 * Test Joiner.allSuccessfulOrThrow() with a timeout. 1387 */ 1388 @Test 1389 void anySuccessfulResultOrThrow6() throws Exception { 1390 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(), 1391 cf -> cf.withTimeout(Duration.ofMillis(100)))) { 1392 scope.fork(() -> { throw new FooException(); }); 1393 scope.fork(() -> { 1394 Thread.sleep(Duration.ofDays(1)); 1395 return "bar"; 1396 }); 1397 assertThrows(TimeoutException.class, scope::join); 1398 1399 // retry after join throws TimeoutException 1400 assertThrows(IllegalStateException.class, scope::join); 1401 } 1402 } 1403 1404 /** 1405 * Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks. 1406 */ 1407 @Test 1408 void testAwaitSuccessfulOrThrow1() throws Throwable { 1409 try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) { 1410 var result = scope.join(); 1411 assertNull(result); 1412 } 1413 } 1414 1415 /** 1416 * Test Joiner.awaitAllSuccessfulOrThrow() with subtasks that complete successfully. 1417 */ 1418 @ParameterizedTest 1419 @MethodSource("factories") 1420 void testAwaitSuccessfulOrThrow2(ThreadFactory factory) throws Throwable { 1421 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(), 1422 cf -> cf.withThreadFactory(factory))) { 1423 var subtask1 = scope.fork(() -> "foo"); 1424 var subtask2 = scope.fork(() -> "bar"); 1425 var result = scope.join(); 1426 assertNull(result); 1427 assertEquals("foo", subtask1.get()); 1428 assertEquals("bar", subtask2.get()); 1429 } 1430 } 1431 1432 /** 1433 * Test Joiner.awaitAllSuccessfulOrThrow() with a subtask that complete successfully and 1434 * a subtask that fails. 1435 */ 1436 @ParameterizedTest 1437 @MethodSource("factories") 1438 void testAwaitSuccessfulOrThrow3(ThreadFactory factory) throws Throwable { 1439 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(), 1440 cf -> cf.withThreadFactory(factory))) { 1441 scope.fork(() -> "foo"); 1442 scope.fork(() -> { throw new FooException(); }); 1443 try { 1444 scope.join(); 1445 } catch (FailedException e) { 1446 assertTrue(e.getCause() instanceof FooException); 1447 } 1448 } 1449 } 1450 1451 /** 1452 * Test Joiner.awaitAllSuccessfulOrThrow() with a timeout. 1453 */ 1454 @Test 1455 void testAwaitSuccessfulOrThrow4() throws Exception { 1456 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(), 1457 cf -> cf.withTimeout(Duration.ofMillis(100)))) { 1458 scope.fork(() -> "foo"); 1459 scope.fork(() -> { 1460 Thread.sleep(Duration.ofDays(1)); 1461 return "bar"; 1462 }); 1463 assertThrows(TimeoutException.class, scope::join); 1464 1465 // retry after join throws TimeoutException 1466 assertThrows(IllegalStateException.class, scope::join); 1467 } 1468 } 1469 1470 /** 1471 * Test Joiner.awaitAll() with no subtasks. 1472 */ 1473 @Test 1474 void testAwaitAll1() throws Throwable { 1475 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 1476 var result = scope.join(); 1477 assertNull(result); 1478 } 1479 } 1480 1481 /** 1482 * Test Joiner.awaitAll() with subtasks that complete successfully. 1483 */ 1484 @ParameterizedTest 1485 @MethodSource("factories") 1486 void testAwaitAll2(ThreadFactory factory) throws Throwable { 1487 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 1488 cf -> cf.withThreadFactory(factory))) { 1489 var subtask1 = scope.fork(() -> "foo"); 1490 var subtask2 = scope.fork(() -> "bar"); 1491 var result = scope.join(); 1492 assertNull(result); 1493 assertEquals("foo", subtask1.get()); 1494 assertEquals("bar", subtask2.get()); 1495 } 1496 } 1497 1498 /** 1499 * Test Joiner.awaitAll() with a subtask that complete successfully and a subtask 1500 * that fails. 1501 */ 1502 @ParameterizedTest 1503 @MethodSource("factories") 1504 void testAwaitAll3(ThreadFactory factory) throws Throwable { 1505 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 1506 cf -> cf.withThreadFactory(factory))) { 1507 var subtask1 = scope.fork(() -> "foo"); 1508 var subtask2 = scope.fork(() -> { throw new FooException(); }); 1509 var result = scope.join(); 1510 assertNull(result); 1511 assertEquals("foo", subtask1.get()); 1512 assertTrue(subtask2.exception() instanceof FooException); 1513 } 1514 } 1515 1516 /** 1517 * Test Joiner.awaitAll() with a timeout. 1518 */ 1519 @Test 1520 void testAwaitAll4() throws Exception { 1521 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 1522 cf -> cf.withTimeout(Duration.ofMillis(100)))) { 1523 scope.fork(() -> "foo"); 1524 scope.fork(() -> { 1525 Thread.sleep(Duration.ofDays(1)); 1526 return "bar"; 1527 }); 1528 assertThrows(TimeoutException.class, scope::join); 1529 1530 // retry after join throws TimeoutException 1531 assertThrows(IllegalStateException.class, scope::join); 1532 } 1533 } 1534 1535 /** 1536 * Test Joiner.allUntil(Predicate) with no subtasks. 1537 */ 1538 @Test 1539 void testAllUntil1() throws Throwable { 1540 try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) { 1541 var subtasks = scope.join(); 1542 assertEquals(0, subtasks.count()); 1543 } 1544 } 1545 1546 /** 1547 * Test Joiner.allUntil(Predicate) with no cancellation. 1548 */ 1549 @ParameterizedTest 1550 @MethodSource("factories") 1551 void testAllUntil2(ThreadFactory factory) throws Exception { 1552 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false), 1553 cf -> cf.withThreadFactory(factory))) { 1554 1555 var subtask1 = scope.fork(() -> "foo"); 1556 var subtask2 = scope.fork(() -> { throw new FooException(); }); 1557 1558 var subtasks = scope.join().toList(); 1559 assertEquals(2, subtasks.size()); 1560 1561 assertSame(subtask1, subtasks.get(0)); 1562 assertSame(subtask2, subtasks.get(1)); 1563 assertEquals("foo", subtask1.get()); 1564 assertTrue(subtask2.exception() instanceof FooException); 1565 } 1566 } 1567 1568 /** 1569 * Test Joiner.allUntil(Predicate) with cancellation after one subtask completes. 1570 */ 1571 @ParameterizedTest 1572 @MethodSource("factories") 1573 void testAllUntil3(ThreadFactory factory) throws Exception { 1574 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> true), 1575 cf -> cf.withThreadFactory(factory))) { 1576 1577 var subtask1 = scope.fork(() -> "foo"); 1578 var subtask2 = scope.fork(() -> { 1579 Thread.sleep(Duration.ofDays(1)); 1580 return "bar"; 1581 }); 1582 1583 var subtasks = scope.join().toList(); 1584 1585 assertEquals(2, subtasks.size()); 1586 assertSame(subtask1, subtasks.get(0)); 1587 assertSame(subtask2, subtasks.get(1)); 1588 assertEquals("foo", subtask1.get()); 1589 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 1590 } 1591 } 1592 1593 /** 1594 * Test Joiner.allUntil(Predicate) with cancellation after serveral subtasks complete. 1595 */ 1596 @ParameterizedTest 1597 @MethodSource("factories") 1598 void testAllUntil4(ThreadFactory factory) throws Exception { 1599 1600 // cancel execution after two or more failures 1601 class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> { 1602 final AtomicInteger failedCount = new AtomicInteger(); 1603 @Override 1604 public boolean test(Subtask<? extends T> subtask) { 1605 return subtask.state() == Subtask.State.FAILED 1606 && failedCount.incrementAndGet() >= 2; 1607 } 1608 } 1609 var joiner = Joiner.allUntil(new CancelAfterTwoFailures<String>()); 1610 1611 try (var scope = StructuredTaskScope.open(joiner)) { 1612 int forkCount = 0; 1613 1614 // fork subtasks until execution cancelled 1615 while (!scope.isCancelled()) { 1616 scope.fork(() -> "foo"); 1617 scope.fork(() -> { throw new FooException(); }); 1618 forkCount += 2; 1619 Thread.sleep(Duration.ofMillis(20)); 1620 } 1621 1622 var subtasks = scope.join().toList(); 1623 assertEquals(forkCount, subtasks.size()); 1624 1625 long failedCount = subtasks.stream() 1626 .filter(s -> s.state() == Subtask.State.FAILED) 1627 .count(); 1628 assertTrue(failedCount >= 2); 1629 } 1630 } 1631 1632 /** 1633 * Test Test Joiner.allUntil(Predicate) where the Predicate's test method throws. 1634 */ 1635 @Test 1636 void testAllUntil5() throws Exception { 1637 var joiner = Joiner.allUntil(_ -> { throw new FooException(); }); 1638 var excRef = new AtomicReference<Throwable>(); 1639 Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e); 1640 ThreadFactory factory = Thread.ofVirtual() 1641 .uncaughtExceptionHandler(uhe) 1642 .factory(); 1643 try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) { 1644 scope.fork(() -> "foo"); 1645 scope.join(); 1646 assertInstanceOf(FooException.class, excRef.get()); 1647 } 1648 } 1649 1650 /** 1651 * Test Joiner.allUntil(Predicate) with a timeout. 1652 */ 1653 @Test 1654 void testAllUntil6() throws Exception { 1655 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false), 1656 cf -> cf.withTimeout(Duration.ofMillis(100)))) { 1657 var subtask1 = scope.fork(() -> "foo"); 1658 var subtask2 = scope.fork(() -> { 1659 Thread.sleep(Duration.ofDays(1)); 1660 return "bar"; 1661 }); 1662 1663 // TimeoutException should not be thrown 1664 var subtasks = scope.join().toList(); 1665 1666 // stream should have two elements, subtask1 may or may not have completed 1667 assertEquals(2, subtasks.size()); 1668 assertSame(subtask1, subtasks.get(0)); 1669 assertSame(subtask2, subtasks.get(1)); 1670 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 1671 1672 // retry after join throws TimeoutException 1673 assertThrows(IllegalStateException.class, scope::join); 1674 } 1675 } 1676 1677 /** 1678 * Test Joiner default methods. 1679 */ 1680 @Test 1681 void testJoinerDefaultMethods() throws Exception { 1682 try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) { 1683 1684 // need subtasks to test default methods 1685 var subtask1 = scope.fork(() -> "foo"); 1686 awaitCancelled(scope); 1687 var subtask2 = scope.fork(() -> "bar"); 1688 scope.join(); 1689 1690 assertEquals(Subtask.State.SUCCESS, subtask1.state()); 1691 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 1692 1693 // Joiner that does not override default methods 1694 Joiner<Object, Void> joiner = () -> null; 1695 assertThrows(NullPointerException.class, () -> joiner.onFork(null)); 1696 assertThrows(NullPointerException.class, () -> joiner.onComplete(null)); 1697 assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1)); 1698 assertFalse(joiner.onFork(subtask2)); 1699 assertFalse(joiner.onComplete(subtask1)); 1700 assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2)); 1701 assertThrows(TimeoutException.class, joiner::onTimeout); 1702 } 1703 } 1704 1705 /** 1706 * Test Joiners onFork/onComplete methods with a subtask in an unexpected state. 1707 */ 1708 @Test 1709 void testJoinersWithUnavailableResult() throws Exception { 1710 try (var scope = StructuredTaskScope.open()) { 1711 var done = new CountDownLatch(1); 1712 var subtask = scope.fork(() -> { 1713 done.await(); 1714 return null; 1715 }); 1716 1717 // onComplete with uncompleted task should throw IAE 1718 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1719 assertThrows(IllegalArgumentException.class, 1720 () -> Joiner.allSuccessfulOrThrow().onComplete(subtask)); 1721 assertThrows(IllegalArgumentException.class, 1722 () -> Joiner.anySuccessfulResultOrThrow().onComplete(subtask)); 1723 assertThrows(IllegalArgumentException.class, 1724 () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(subtask)); 1725 assertThrows(IllegalArgumentException.class, 1726 () -> Joiner.awaitAll().onComplete(subtask)); 1727 assertThrows(IllegalArgumentException.class, 1728 () -> Joiner.allUntil(_ -> false).onComplete(subtask)); 1729 1730 done.countDown(); 1731 scope.join(); 1732 1733 // onFork with completed task should throw IAE 1734 assertEquals(Subtask.State.SUCCESS, subtask.state()); 1735 assertThrows(IllegalArgumentException.class, 1736 () -> Joiner.allSuccessfulOrThrow().onFork(subtask)); 1737 assertThrows(IllegalArgumentException.class, 1738 () -> Joiner.anySuccessfulResultOrThrow().onFork(subtask)); 1739 assertThrows(IllegalArgumentException.class, 1740 () -> Joiner.awaitAllSuccessfulOrThrow().onFork(subtask)); 1741 assertThrows(IllegalArgumentException.class, 1742 () -> Joiner.awaitAll().onFork(subtask)); 1743 assertThrows(IllegalArgumentException.class, 1744 () -> Joiner.allUntil(_ -> false).onFork(subtask)); 1745 } 1746 1747 } 1748 1749 /** 1750 * Test the Configuration function apply method throwing an exception. 1751 */ 1752 @Test 1753 void testConfigFunctionThrows() throws Exception { 1754 assertThrows(FooException.class, 1755 () -> StructuredTaskScope.open(Joiner.awaitAll(), 1756 cf -> { throw new FooException(); })); 1757 } 1758 1759 /** 1760 * Test Configuration equals/hashCode/toString 1761 */ 1762 @Test 1763 void testConfigMethods() throws Exception { 1764 UnaryOperator<Configuration> configOperator = cf -> { 1765 var name = "duke"; 1766 var threadFactory = Thread.ofPlatform().factory(); 1767 var timeout = Duration.ofSeconds(10); 1768 1769 assertEquals(cf, cf); 1770 assertEquals(cf.withName(name), cf.withName(name)); 1771 assertEquals(cf.withThreadFactory(threadFactory), cf.withThreadFactory(threadFactory)); 1772 assertEquals(cf.withTimeout(timeout), cf.withTimeout(timeout)); 1773 1774 assertNotEquals(cf, cf.withName(name)); 1775 assertNotEquals(cf, cf.withThreadFactory(threadFactory)); 1776 assertNotEquals(cf, cf.withTimeout(timeout)); 1777 1778 assertEquals(cf.withName(name).hashCode(), cf.withName(name).hashCode()); 1779 assertEquals(cf.withThreadFactory(threadFactory).hashCode(), 1780 cf.withThreadFactory(threadFactory).hashCode()); 1781 assertEquals(cf.withTimeout(timeout).hashCode(), cf.withTimeout(timeout).hashCode()); 1782 1783 assertTrue(cf.withName(name).toString().contains(name)); 1784 assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString())); 1785 assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString())); 1786 1787 return cf; 1788 }; 1789 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), configOperator)) { 1790 // do nothing 1791 } 1792 } 1793 1794 /** 1795 * Test for NullPointerException. 1796 */ 1797 @Test 1798 void testNulls() throws Exception { 1799 assertThrows(NullPointerException.class, 1800 () -> StructuredTaskScope.open(null)); 1801 assertThrows(NullPointerException.class, 1802 () -> StructuredTaskScope.open(null, cf -> cf)); 1803 assertThrows(NullPointerException.class, 1804 () -> StructuredTaskScope.open(Joiner.awaitAll(), null)); 1805 1806 assertThrows(NullPointerException.class, () -> Joiner.allUntil(null)); 1807 1808 // fork 1809 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 1810 assertThrows(NullPointerException.class, () -> scope.fork((Callable<Object>) null)); 1811 assertThrows(NullPointerException.class, () -> scope.fork((Runnable) null)); 1812 } 1813 1814 // Configuration and withXXX methods 1815 assertThrows(NullPointerException.class, 1816 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> null)); 1817 assertThrows(NullPointerException.class, 1818 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withName(null))); 1819 assertThrows(NullPointerException.class, 1820 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withThreadFactory(null))); 1821 assertThrows(NullPointerException.class, 1822 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withTimeout(null))); 1823 1824 // Joiner.onFork/onComplete 1825 assertThrows(NullPointerException.class, 1826 () -> Joiner.awaitAllSuccessfulOrThrow().onFork(null)); 1827 assertThrows(NullPointerException.class, 1828 () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(null)); 1829 assertThrows(NullPointerException.class, 1830 () -> Joiner.awaitAll().onFork(null)); 1831 assertThrows(NullPointerException.class, 1832 () -> Joiner.awaitAll().onComplete(null)); 1833 assertThrows(NullPointerException.class, 1834 () -> Joiner.allSuccessfulOrThrow().onFork(null)); 1835 assertThrows(NullPointerException.class, 1836 () -> Joiner.allSuccessfulOrThrow().onComplete(null)); 1837 assertThrows(NullPointerException.class, 1838 () -> Joiner.anySuccessfulResultOrThrow().onFork(null)); 1839 assertThrows(NullPointerException.class, 1840 () -> Joiner.anySuccessfulResultOrThrow().onComplete(null)); 1841 } 1842 1843 /** 1844 * ThreadFactory that counts usage. 1845 */ 1846 private static class CountingThreadFactory implements ThreadFactory { 1847 final ThreadFactory delegate; 1848 final AtomicInteger threadCount = new AtomicInteger(); 1849 CountingThreadFactory(ThreadFactory delegate) { 1850 this.delegate = delegate; 1851 } 1852 @Override 1853 public Thread newThread(Runnable task) { 1854 threadCount.incrementAndGet(); 1855 return delegate.newThread(task); 1856 } 1857 int threadCount() { 1858 return threadCount.get(); 1859 } 1860 } 1861 1862 /** 1863 * A joiner that counts that counts the number of subtasks that are forked and the 1864 * number of subtasks that complete. 1865 */ 1866 private static class CountingJoiner<T> implements Joiner<T, Void> { 1867 final AtomicInteger onForkCount = new AtomicInteger(); 1868 final AtomicInteger onCompleteCount = new AtomicInteger(); 1869 @Override 1870 public boolean onFork(Subtask<? extends T> subtask) { 1871 onForkCount.incrementAndGet(); 1872 return false; 1873 } 1874 @Override 1875 public boolean onComplete(Subtask<? extends T> subtask) { 1876 onCompleteCount.incrementAndGet(); 1877 return false; 1878 } 1879 @Override 1880 public Void result() { 1881 return null; 1882 } 1883 int onForkCount() { 1884 return onForkCount.get(); 1885 } 1886 int onCompleteCount() { 1887 return onCompleteCount.get(); 1888 } 1889 } 1890 1891 /** 1892 * A joiner that cancels execution when a subtask completes. It also keeps a count 1893 * of the number of subtasks that are forked and the number of subtasks that complete. 1894 */ 1895 private static class CancelAfterOneJoiner<T> implements Joiner<T, Void> { 1896 final AtomicInteger onForkCount = new AtomicInteger(); 1897 final AtomicInteger onCompleteCount = new AtomicInteger(); 1898 @Override 1899 public boolean onFork(Subtask<? extends T> subtask) { 1900 onForkCount.incrementAndGet(); 1901 return false; 1902 } 1903 @Override 1904 public boolean onComplete(Subtask<? extends T> subtask) { 1905 onCompleteCount.incrementAndGet(); 1906 return true; 1907 } 1908 @Override 1909 public Void result() { 1910 return null; 1911 } 1912 int onForkCount() { 1913 return onForkCount.get(); 1914 } 1915 int onCompleteCount() { 1916 return onCompleteCount.get(); 1917 } 1918 } 1919 1920 /** 1921 * A runtime exception for tests. 1922 */ 1923 private static class FooException extends RuntimeException { 1924 FooException() { } 1925 FooException(Throwable cause) { super(cause); } 1926 } 1927 1928 /** 1929 * Returns the current time in milliseconds. 1930 */ 1931 private long millisTime() { 1932 long now = System.nanoTime(); 1933 return TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS); 1934 } 1935 1936 /** 1937 * Check the duration of a task 1938 * @param start start time, in milliseconds 1939 * @param min minimum expected duration, in milliseconds 1940 * @param max maximum expected duration, in milliseconds 1941 * @return the duration (now - start), in milliseconds 1942 */ 1943 private long expectDuration(long start, long min, long max) { 1944 long duration = millisTime() - start; 1945 assertTrue(duration >= min, 1946 "Duration " + duration + "ms, expected >= " + min + "ms"); 1947 assertTrue(duration <= max, 1948 "Duration " + duration + "ms, expected <= " + max + "ms"); 1949 return duration; 1950 } 1951 1952 /** 1953 * Wait for the given scope to be cancelled. 1954 */ 1955 private static void awaitCancelled(StructuredTaskScope<?, ?> scope) throws InterruptedException { 1956 while (!scope.isCancelled()) { 1957 Thread.sleep(Duration.ofMillis(20)); 1958 } 1959 } 1960 1961 /** 1962 * Interrupts a thread when it waits (timed or untimed) at location "{@code c.m}". 1963 * {@code c} is the fully qualified class name and {@code m} is the method name. 1964 */ 1965 private void interruptThreadAt(Thread target, String location) throws InterruptedException { 1966 int index = location.lastIndexOf('.'); 1967 String className = location.substring(0, index); 1968 String methodName = location.substring(index + 1); 1969 1970 boolean found = false; 1971 while (!found) { 1972 Thread.State state = target.getState(); 1973 assertTrue(state != TERMINATED); 1974 if ((state == WAITING || state == TIMED_WAITING) 1975 && contains(target.getStackTrace(), className, methodName)) { 1976 found = true; 1977 } else { 1978 Thread.sleep(20); 1979 } 1980 } 1981 target.interrupt(); 1982 } 1983 1984 /** 1985 * Schedules the current thread to be interrupted when it waits (timed or untimed) 1986 * at the given location. 1987 */ 1988 private void scheduleInterruptAt(String location) { 1989 Thread target = Thread.currentThread(); 1990 scheduler.submit(() -> { 1991 interruptThreadAt(target, location); 1992 return null; 1993 }); 1994 } 1995 1996 /** 1997 * Returns true if the given stack trace contains an element for the given class 1998 * and method name. 1999 */ 2000 private boolean contains(StackTraceElement[] stack, String className, String methodName) { 2001 return Arrays.stream(stack) 2002 .anyMatch(e -> className.equals(e.getClassName()) 2003 && methodName.equals(e.getMethodName())); 2004 } 2005 }