1 /* 2 * Copyright (c) 2021, 2025, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test id=platform 26 * @bug 8284199 8296779 8306647 27 * @summary Basic tests for StructuredTaskScope 28 * @enablePreview 29 * @run junit/othervm -DthreadFactory=platform StructuredTaskScopeTest 30 */ 31 32 /* 33 * @test id=virtual 34 * @enablePreview 35 * @run junit/othervm -DthreadFactory=virtual StructuredTaskScopeTest 36 */ 37 38 import java.time.Duration; 39 import java.util.Arrays; 40 import java.util.ArrayList; 41 import java.util.List; 42 import java.util.Set; 43 import java.util.NoSuchElementException; 44 import java.util.concurrent.Callable; 45 import java.util.concurrent.ConcurrentHashMap; 46 import java.util.concurrent.CountDownLatch; 47 import java.util.concurrent.Executors; 48 import java.util.concurrent.Future; 49 import java.util.concurrent.LinkedTransferQueue; 50 import java.util.concurrent.ThreadFactory; 51 import java.util.concurrent.TimeUnit; 52 import java.util.concurrent.RejectedExecutionException; 53 import java.util.concurrent.ScheduledExecutorService; 54 import java.util.concurrent.StructuredTaskScope; 55 import java.util.concurrent.StructuredTaskScope.TimeoutException; 56 import java.util.concurrent.StructuredTaskScope.Configuration; 57 import java.util.concurrent.StructuredTaskScope.FailedException; 58 import java.util.concurrent.StructuredTaskScope.Joiner; 59 import java.util.concurrent.StructuredTaskScope.Subtask; 60 import java.util.concurrent.StructureViolationException; 61 import java.util.concurrent.atomic.AtomicBoolean; 62 import java.util.concurrent.atomic.AtomicInteger; 63 import java.util.concurrent.atomic.AtomicReference; 64 import java.util.function.Function; 65 import java.util.function.Predicate; 66 import java.util.stream.Stream; 67 import static java.lang.Thread.State.*; 68 69 import org.junit.jupiter.api.Test; 70 import org.junit.jupiter.api.BeforeAll; 71 import org.junit.jupiter.api.AfterAll; 72 import org.junit.jupiter.params.ParameterizedTest; 73 import org.junit.jupiter.params.provider.MethodSource; 74 import static org.junit.jupiter.api.Assertions.*; 75 76 class StructuredTaskScopeTest { 77 private static ScheduledExecutorService scheduler; 78 private static List<ThreadFactory> threadFactories; 79 80 @BeforeAll 81 static void setup() throws Exception { 82 scheduler = Executors.newSingleThreadScheduledExecutor(); 83 84 // thread factories 85 String value = System.getProperty("threadFactory"); 86 List<ThreadFactory> list = new ArrayList<>(); 87 if (value == null || value.equals("platform")) 88 list.add(Thread.ofPlatform().factory()); 89 if (value == null || value.equals("virtual")) 90 list.add(Thread.ofVirtual().factory()); 91 assertTrue(list.size() > 0, "No thread factories for tests"); 92 threadFactories = list; 93 } 94 95 @AfterAll 96 static void shutdown() { 97 scheduler.shutdown(); 98 } 99 100 private static Stream<ThreadFactory> factories() { 101 return threadFactories.stream(); 102 } 103 104 /** 105 * Test that fork creates virtual threads when no ThreadFactory is configured. 106 */ 107 @Test 108 void testForkCreatesVirtualThread() throws Exception { 109 Set<Thread> threads = ConcurrentHashMap.newKeySet(); 110 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 111 for (int i = 0; i < 50; i++) { 112 // runnable 113 scope.fork(() -> { 114 threads.add(Thread.currentThread()); 115 }); 116 117 // callable 118 scope.fork(() -> { 119 threads.add(Thread.currentThread()); 120 return null; 121 }); 122 } 123 scope.join(); 124 } 125 assertEquals(100, threads.size()); 126 threads.forEach(t -> assertTrue(t.isVirtual())); 127 } 128 129 /** 130 * Test that fork create threads with the configured ThreadFactory. 131 */ 132 @ParameterizedTest 133 @MethodSource("factories") 134 void testForkUsesThreadFactory(ThreadFactory factory) throws Exception { 135 // TheadFactory that keeps reference to all threads it creates 136 class RecordingThreadFactory implements ThreadFactory { 137 final ThreadFactory delegate; 138 final Set<Thread> threads = ConcurrentHashMap.newKeySet(); 139 RecordingThreadFactory(ThreadFactory delegate) { 140 this.delegate = delegate; 141 } 142 @Override 143 public Thread newThread(Runnable task) { 144 Thread thread = delegate.newThread(task); 145 threads.add(thread); 146 return thread; 147 } 148 Set<Thread> threads() { 149 return threads; 150 } 151 } 152 var recordingThreadFactory = new RecordingThreadFactory(factory); 153 Set<Thread> threads = ConcurrentHashMap.newKeySet(); 154 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 155 cf -> cf.withThreadFactory(recordingThreadFactory))) { 156 157 for (int i = 0; i < 50; i++) { 158 // runnable 159 scope.fork(() -> { 160 threads.add(Thread.currentThread()); 161 }); 162 163 // callable 164 scope.fork(() -> { 165 threads.add(Thread.currentThread()); 166 return null; 167 }); 168 } 169 scope.join(); 170 } 171 assertEquals(100, threads.size()); 172 assertEquals(recordingThreadFactory.threads(), threads); 173 } 174 175 /** 176 * Test fork method is owner confined. 177 */ 178 @ParameterizedTest 179 @MethodSource("factories") 180 void testForkConfined(ThreadFactory factory) throws Exception { 181 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(), 182 cf -> cf.withThreadFactory(factory))) { 183 184 // random thread cannot fork 185 try (var pool = Executors.newSingleThreadExecutor()) { 186 Future<Void> future = pool.submit(() -> { 187 assertThrows(WrongThreadException.class, () -> { 188 scope.fork(() -> null); 189 }); 190 return null; 191 }); 192 future.get(); 193 } 194 195 // subtask cannot fork 196 Subtask<Boolean> subtask = scope.fork(() -> { 197 assertThrows(WrongThreadException.class, () -> { 198 scope.fork(() -> null); 199 }); 200 return true; 201 }); 202 scope.join(); 203 assertTrue(subtask.get()); 204 } 205 } 206 207 /** 208 * Test fork after join, no subtasks forked before join. 209 */ 210 @ParameterizedTest 211 @MethodSource("factories") 212 void testForkAfterJoin1(ThreadFactory factory) throws Exception { 213 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 214 cf -> cf.withThreadFactory(factory))) { 215 scope.join(); 216 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar")); 217 } 218 } 219 220 /** 221 * Test fork after join, subtasks forked before join. 222 */ 223 @ParameterizedTest 224 @MethodSource("factories") 225 void testForkAfterJoin2(ThreadFactory factory) throws Exception { 226 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 227 cf -> cf.withThreadFactory(factory))) { 228 scope.fork(() -> "foo"); 229 scope.join(); 230 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar")); 231 } 232 } 233 234 /** 235 * Test fork after join throws. 236 */ 237 @ParameterizedTest 238 @MethodSource("factories") 239 void testForkAfterJoinThrows(ThreadFactory factory) throws Exception { 240 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 241 cf -> cf.withThreadFactory(factory))) { 242 var latch = new CountDownLatch(1); 243 var subtask1 = scope.fork(() -> { 244 latch.await(); 245 return "foo"; 246 }); 247 248 // join throws 249 Thread.currentThread().interrupt(); 250 assertThrows(InterruptedException.class, scope::join); 251 252 // fork should throw 253 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar")); 254 } 255 } 256 257 /** 258 * Test fork after task scope is cancelled. This test uses a custom Joiner to 259 * cancel execution. 260 */ 261 @ParameterizedTest 262 @MethodSource("factories") 263 void testForkAfterCancel2(ThreadFactory factory) throws Exception { 264 var countingThreadFactory = new CountingThreadFactory(factory); 265 var testJoiner = new CancelAfterOneJoiner<String>(); 266 267 try (var scope = StructuredTaskScope.open(testJoiner, 268 cf -> cf.withThreadFactory(countingThreadFactory))) { 269 270 // fork subtask, the scope should be cancelled when the subtask completes 271 var subtask1 = scope.fork(() -> "foo"); 272 awaitCancelled(scope); 273 274 assertEquals(1, countingThreadFactory.threadCount()); 275 assertEquals(1, testJoiner.onForkCount()); 276 assertEquals(1, testJoiner.onCompleteCount()); 277 278 // fork second subtask, it should not run 279 var subtask2 = scope.fork(() -> "bar"); 280 281 // onFork should be invoked, newThread and onComplete should not be invoked 282 assertEquals(1, countingThreadFactory.threadCount()); 283 assertEquals(2, testJoiner.onForkCount()); 284 assertEquals(1, testJoiner.onCompleteCount()); 285 286 scope.join(); 287 288 assertEquals(1, countingThreadFactory.threadCount()); 289 assertEquals(2, testJoiner.onForkCount()); 290 assertEquals(1, testJoiner.onCompleteCount()); 291 assertEquals("foo", subtask1.get()); 292 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 293 } 294 } 295 296 /** 297 * Test fork after task scope is closed. 298 */ 299 @Test 300 void testForkAfterClose() { 301 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 302 scope.close(); 303 assertThrows(IllegalStateException.class, () -> scope.fork(() -> null)); 304 } 305 } 306 307 /** 308 * Test fork with a ThreadFactory that rejects creating a thread. 309 */ 310 @Test 311 void testForkRejectedExecutionException() { 312 ThreadFactory factory = task -> null; 313 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 314 cf -> cf.withThreadFactory(factory))) { 315 assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null)); 316 } 317 } 318 319 /** 320 * Test join with no subtasks. 321 */ 322 @Test 323 void testJoinWithNoSubtasks() throws Exception { 324 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 325 scope.join(); 326 } 327 } 328 329 /** 330 * Test join with a remaining subtask. 331 */ 332 @ParameterizedTest 333 @MethodSource("factories") 334 void testJoinWithRemainingSubtasks(ThreadFactory factory) throws Exception { 335 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 336 cf -> cf.withThreadFactory(factory))) { 337 Subtask<String> subtask = scope.fork(() -> { 338 Thread.sleep(Duration.ofMillis(100)); 339 return "foo"; 340 }); 341 scope.join(); 342 assertEquals("foo", subtask.get()); 343 } 344 } 345 346 /** 347 * Test join after join completed with a result. 348 */ 349 @Test 350 void testJoinAfterJoin1() throws Exception { 351 var results = new LinkedTransferQueue<>(List.of("foo", "bar", "baz")); 352 Joiner<Object, String> joiner = results::take; 353 try (var scope = StructuredTaskScope.open(joiner)) { 354 scope.fork(() -> "foo"); 355 assertEquals("foo", scope.join()); 356 357 // join already called 358 for (int i = 0 ; i < 3; i++) { 359 assertThrows(IllegalStateException.class, scope::join); 360 } 361 } 362 } 363 364 /** 365 * Test join after join completed with an exception. 366 */ 367 @Test 368 void testJoinAfterJoin2() throws Exception { 369 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) { 370 scope.fork(() -> { throw new FooException(); }); 371 Throwable ex = assertThrows(FailedException.class, scope::join); 372 assertTrue(ex.getCause() instanceof FooException); 373 374 // join already called 375 for (int i = 0 ; i < 3; i++) { 376 assertThrows(IllegalStateException.class, scope::join); 377 } 378 } 379 } 380 381 /** 382 * Test join after join completed with a timeout. 383 */ 384 @Test 385 void testJoinAfterJoin3() throws Exception { 386 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(), 387 cf -> cf.withTimeout(Duration.ofMillis(100)))) { 388 // wait for scope to be cancelled by timeout 389 awaitCancelled(scope); 390 assertThrows(TimeoutException.class, scope::join); 391 392 // join already called 393 for (int i = 0 ; i < 3; i++) { 394 assertThrows(IllegalStateException.class, scope::join); 395 } 396 } 397 } 398 399 /** 400 * Test join method is owner confined. 401 */ 402 @ParameterizedTest 403 @MethodSource("factories") 404 void testJoinConfined(ThreadFactory factory) throws Exception { 405 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(), 406 cf -> cf.withThreadFactory(factory))) { 407 408 // random thread cannot join 409 try (var pool = Executors.newSingleThreadExecutor()) { 410 Future<Void> future = pool.submit(() -> { 411 assertThrows(WrongThreadException.class, scope::join); 412 return null; 413 }); 414 future.get(); 415 } 416 417 // subtask cannot join 418 Subtask<Boolean> subtask = scope.fork(() -> { 419 assertThrows(WrongThreadException.class, () -> { scope.join(); }); 420 return true; 421 }); 422 scope.join(); 423 assertTrue(subtask.get()); 424 } 425 } 426 427 /** 428 * Test join with interrupt status set. 429 */ 430 @ParameterizedTest 431 @MethodSource("factories") 432 void testInterruptJoin1(ThreadFactory factory) throws Exception { 433 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 434 cf -> cf.withThreadFactory(factory))) { 435 436 Subtask<String> subtask = scope.fork(() -> { 437 Thread.sleep(60_000); 438 return "foo"; 439 }); 440 441 // join should throw 442 Thread.currentThread().interrupt(); 443 try { 444 scope.join(); 445 fail("join did not throw"); 446 } catch (InterruptedException expected) { 447 assertFalse(Thread.interrupted()); // interrupt status should be cleared 448 } 449 } 450 } 451 452 /** 453 * Test interrupt of thread blocked in join. 454 */ 455 @ParameterizedTest 456 @MethodSource("factories") 457 void testInterruptJoin2(ThreadFactory factory) throws Exception { 458 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 459 cf -> cf.withThreadFactory(factory))) { 460 461 var latch = new CountDownLatch(1); 462 Subtask<String> subtask = scope.fork(() -> { 463 Thread.sleep(60_000); 464 return "foo"; 465 }); 466 467 // interrupt main thread when it blocks in join 468 scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join"); 469 try { 470 scope.join(); 471 fail("join did not throw"); 472 } catch (InterruptedException expected) { 473 assertFalse(Thread.interrupted()); // interrupt status should be clear 474 } 475 } 476 } 477 478 /** 479 * Test join when scope is cancelled. 480 */ 481 @ParameterizedTest 482 @MethodSource("factories") 483 void testJoinWhenCancelled(ThreadFactory factory) throws Exception { 484 var countingThreadFactory = new CountingThreadFactory(factory); 485 var testJoiner = new CancelAfterOneJoiner<String>(); 486 487 try (var scope = StructuredTaskScope.open(testJoiner, 488 cf -> cf.withThreadFactory(countingThreadFactory))) { 489 490 // fork subtask, the scope should be cancelled when the subtask completes 491 var subtask1 = scope.fork(() -> "foo"); 492 awaitCancelled(scope); 493 494 // fork second subtask, it should not run 495 var subtask2 = scope.fork(() -> { 496 Thread.sleep(Duration.ofDays(1)); 497 return "bar"; 498 }); 499 500 scope.join(); 501 502 assertEquals("foo", subtask1.get()); 503 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 504 } 505 } 506 507 /** 508 * Test join after scope is closed. 509 */ 510 @Test 511 void testJoinAfterClose() throws Exception { 512 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 513 scope.close(); 514 assertThrows(IllegalStateException.class, () -> scope.join()); 515 } 516 } 517 518 /** 519 * Test join with timeout, subtasks finish before timeout expires. 520 */ 521 @ParameterizedTest 522 @MethodSource("factories") 523 void testJoinWithTimeout1(ThreadFactory factory) throws Exception { 524 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 525 cf -> cf.withThreadFactory(factory) 526 .withTimeout(Duration.ofDays(1)))) { 527 528 Subtask<String> subtask = scope.fork(() -> { 529 Thread.sleep(Duration.ofSeconds(1)); 530 return "foo"; 531 }); 532 533 scope.join(); 534 535 assertFalse(scope.isCancelled()); 536 assertEquals("foo", subtask.get()); 537 } 538 } 539 540 /** 541 * Test join with timeout, timeout expires before subtasks finish. 542 */ 543 @ParameterizedTest 544 @MethodSource("factories") 545 void testJoinWithTimeout2(ThreadFactory factory) throws Exception { 546 long startMillis = millisTime(); 547 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 548 cf -> cf.withThreadFactory(factory) 549 .withTimeout(Duration.ofSeconds(2)))) { 550 551 Subtask<Void> subtask = scope.fork(() -> { 552 Thread.sleep(Duration.ofDays(1)); 553 return null; 554 }); 555 556 assertThrows(TimeoutException.class, scope::join); 557 expectDuration(startMillis, /*min*/1900, /*max*/20_000); 558 559 assertTrue(scope.isCancelled()); 560 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 561 } 562 } 563 564 /** 565 * Test join with timeout that has already expired. 566 */ 567 @ParameterizedTest 568 @MethodSource("factories") 569 void testJoinWithTimeout3(ThreadFactory factory) throws Exception { 570 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 571 cf -> cf.withThreadFactory(factory) 572 .withTimeout(Duration.ofSeconds(-1)))) { 573 574 Subtask<Void> subtask = scope.fork(() -> { 575 Thread.sleep(Duration.ofDays(1)); 576 return null; 577 }); 578 579 assertThrows(TimeoutException.class, scope::join); 580 581 assertTrue(scope.isCancelled()); 582 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 583 } 584 } 585 586 /** 587 * Test that cancelling execution interrupts unfinished threads. This test uses 588 * a custom Joiner to cancel execution. 589 */ 590 @ParameterizedTest 591 @MethodSource("factories") 592 void testCancelInterruptsThreads2(ThreadFactory factory) throws Exception { 593 var testJoiner = new CancelAfterOneJoiner<String>(); 594 595 try (var scope = StructuredTaskScope.open(testJoiner, 596 cf -> cf.withThreadFactory(factory))) { 597 598 // fork subtask1 that runs for a long time 599 var started = new CountDownLatch(1); 600 var interrupted = new CountDownLatch(1); 601 var subtask1 = scope.fork(() -> { 602 started.countDown(); 603 try { 604 Thread.sleep(Duration.ofDays(1)); 605 } catch (InterruptedException e) { 606 interrupted.countDown(); 607 } 608 }); 609 started.await(); 610 611 // fork subtask2, the scope should be cancelled when the subtask completes 612 var subtask2 = scope.fork(() -> "bar"); 613 awaitCancelled(scope); 614 615 // subtask1 should be interrupted 616 interrupted.await(); 617 618 scope.join(); 619 assertEquals(Subtask.State.UNAVAILABLE, subtask1.state()); 620 assertEquals("bar", subtask2.get()); 621 } 622 } 623 624 /** 625 * Test that timeout interrupts unfinished threads. 626 */ 627 @ParameterizedTest 628 @MethodSource("factories") 629 void testTimeoutInterruptsThreads(ThreadFactory factory) throws Exception { 630 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 631 cf -> cf.withThreadFactory(factory) 632 .withTimeout(Duration.ofSeconds(2)))) { 633 634 var started = new AtomicBoolean(); 635 var interrupted = new CountDownLatch(1); 636 Subtask<Void> subtask = scope.fork(() -> { 637 started.set(true); 638 try { 639 Thread.sleep(Duration.ofDays(1)); 640 } catch (InterruptedException e) { 641 interrupted.countDown(); 642 } 643 return null; 644 }); 645 646 // wait for scope to be cancelled by timeout 647 awaitCancelled(scope); 648 649 // if subtask started then it should be interrupted 650 if (started.get()) { 651 interrupted.await(); 652 } 653 654 assertThrows(TimeoutException.class, scope::join); 655 656 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 657 } 658 } 659 660 /** 661 * Test close without join, no subtasks forked. 662 */ 663 @Test 664 void testCloseWithoutJoin1() { 665 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 666 // do nothing 667 } 668 } 669 670 /** 671 * Test close without join, subtasks forked. 672 */ 673 @ParameterizedTest 674 @MethodSource("factories") 675 void testCloseWithoutJoin2(ThreadFactory factory) { 676 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 677 cf -> cf.withThreadFactory(factory))) { 678 Subtask<String> subtask = scope.fork(() -> { 679 Thread.sleep(Duration.ofDays(1)); 680 return null; 681 }); 682 683 // first call to close should throw 684 assertThrows(IllegalStateException.class, scope::close); 685 686 // subsequent calls to close should not throw 687 for (int i = 0; i < 3; i++) { 688 scope.close(); 689 } 690 691 // subtask result/exception not available 692 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 693 assertThrows(IllegalStateException.class, subtask::get); 694 assertThrows(IllegalStateException.class, subtask::exception); 695 } 696 } 697 698 /** 699 * Test close after join throws. Close should not throw as join attempted. 700 */ 701 @ParameterizedTest 702 @MethodSource("factories") 703 void testCloseAfterJoinThrows(ThreadFactory factory) throws Exception { 704 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 705 cf -> cf.withThreadFactory(factory))) { 706 var subtask = scope.fork(() -> { 707 Thread.sleep(Duration.ofDays(1)); 708 return null; 709 }); 710 711 // join throws 712 Thread.currentThread().interrupt(); 713 assertThrows(InterruptedException.class, scope::join); 714 assertThrows(IllegalStateException.class, subtask::get); 715 716 } // close should not throw 717 } 718 719 /** 720 * Test close method is owner confined. 721 */ 722 @ParameterizedTest 723 @MethodSource("factories") 724 void testCloseConfined(ThreadFactory factory) throws Exception { 725 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(), 726 cf -> cf.withThreadFactory(factory))) { 727 728 // random thread cannot close scope 729 try (var pool = Executors.newCachedThreadPool(factory)) { 730 Future<Boolean> future = pool.submit(() -> { 731 assertThrows(WrongThreadException.class, scope::close); 732 return null; 733 }); 734 future.get(); 735 } 736 737 // subtask cannot close 738 Subtask<Boolean> subtask = scope.fork(() -> { 739 assertThrows(WrongThreadException.class, scope::close); 740 return true; 741 }); 742 scope.join(); 743 assertTrue(subtask.get()); 744 } 745 } 746 747 /** 748 * Test close with interrupt status set. 749 */ 750 @ParameterizedTest 751 @MethodSource("factories") 752 void testInterruptClose1(ThreadFactory factory) throws Exception { 753 var testJoiner = new CancelAfterOneJoiner<String>(); 754 try (var scope = StructuredTaskScope.open(testJoiner, 755 cf -> cf.withThreadFactory(factory))) { 756 757 // fork first subtask, a straggler as it continues after being interrupted 758 var started = new CountDownLatch(1); 759 var done = new AtomicBoolean(); 760 scope.fork(() -> { 761 started.countDown(); 762 try { 763 Thread.sleep(Duration.ofDays(1)); 764 } catch (InterruptedException e) { 765 // interrupted by cancel, expected 766 } 767 Thread.sleep(Duration.ofMillis(100)); // force close to wait 768 done.set(true); 769 return null; 770 }); 771 started.await(); 772 773 // fork second subtask, the scope should be cancelled when this subtask completes 774 scope.fork(() -> "bar"); 775 awaitCancelled(scope); 776 777 scope.join(); 778 779 // invoke close with interrupt status set 780 Thread.currentThread().interrupt(); 781 try { 782 scope.close(); 783 } finally { 784 assertTrue(Thread.interrupted()); // clear interrupt status 785 assertTrue(done.get()); 786 } 787 } 788 } 789 790 /** 791 * Test interrupting thread waiting in close. 792 */ 793 @ParameterizedTest 794 @MethodSource("factories") 795 void testInterruptClose2(ThreadFactory factory) throws Exception { 796 var testJoiner = new CancelAfterOneJoiner<String>(); 797 try (var scope = StructuredTaskScope.open(testJoiner, 798 cf -> cf.withThreadFactory(factory))) { 799 800 Thread mainThread = Thread.currentThread(); 801 802 // fork first subtask, a straggler as it continues after being interrupted 803 var started = new CountDownLatch(1); 804 var done = new AtomicBoolean(); 805 scope.fork(() -> { 806 started.countDown(); 807 try { 808 Thread.sleep(Duration.ofDays(1)); 809 } catch (InterruptedException e) { 810 // interrupted by cancel, expected 811 } 812 813 // interrupt main thread when it blocks in close 814 interruptThreadAt(mainThread, "java.util.concurrent.StructuredTaskScopeImpl.close"); 815 816 Thread.sleep(Duration.ofMillis(100)); // force close to wait 817 done.set(true); 818 return null; 819 }); 820 started.await(); 821 822 // fork second subtask, the scope should be cancelled when this subtask completes 823 scope.fork(() -> "bar"); 824 awaitCancelled(scope); 825 826 scope.join(); 827 828 // main thread will be interrupted while blocked in close 829 try { 830 scope.close(); 831 } finally { 832 assertTrue(Thread.interrupted()); // clear interrupt status 833 assertTrue(done.get()); 834 } 835 } 836 } 837 838 /** 839 * Test that closing an enclosing scope closes the thread flock of a nested scope. 840 */ 841 @Test 842 void testCloseThrowsStructureViolation() throws Exception { 843 try (var scope1 = StructuredTaskScope.open(Joiner.awaitAll())) { 844 try (var scope2 = StructuredTaskScope.open(Joiner.awaitAll())) { 845 846 // close enclosing scope 847 try { 848 scope1.close(); 849 fail("close did not throw"); 850 } catch (StructureViolationException expected) { } 851 852 // underlying flock should be closed 853 var executed = new AtomicBoolean(); 854 Subtask<?> subtask = scope2.fork(() -> executed.set(true)); 855 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 856 scope2.join(); 857 assertFalse(executed.get()); 858 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 859 } 860 } 861 } 862 863 /** 864 * Test that isCancelled returns true after close. 865 */ 866 @Test 867 void testIsCancelledAfterClose() throws Exception { 868 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 869 assertFalse(scope.isCancelled()); 870 scope.close(); 871 assertTrue(scope.isCancelled()); 872 } 873 } 874 875 /** 876 * Test Joiner.onFork throwing exception. 877 */ 878 @Test 879 void testOnForkThrows() throws Exception { 880 var joiner = new Joiner<String, Void>() { 881 @Override 882 public boolean onFork(Subtask<? extends String> subtask) { 883 throw new FooException(); 884 } 885 @Override 886 public Void result() { 887 return null; 888 } 889 }; 890 try (var scope = StructuredTaskScope.open(joiner)) { 891 assertThrows(FooException.class, () -> scope.fork(() -> "foo")); 892 } 893 } 894 895 /** 896 * Test Joiner.onFork returning true to cancel execution. 897 */ 898 @Test 899 void testOnForkCancelsExecution() throws Exception { 900 var joiner = new Joiner<String, Void>() { 901 @Override 902 public boolean onFork(Subtask<? extends String> subtask) { 903 return true; 904 } 905 @Override 906 public Void result() { 907 return null; 908 } 909 }; 910 try (var scope = StructuredTaskScope.open(joiner)) { 911 assertFalse(scope.isCancelled()); 912 scope.fork(() -> "foo"); 913 assertTrue(scope.isCancelled()); 914 scope.join(); 915 } 916 } 917 918 /** 919 * Test Joiner.onComplete throwing exception causes UHE to be invoked. 920 */ 921 @Test 922 void testOnCompleteThrows() throws Exception { 923 var joiner = new Joiner<String, Void>() { 924 @Override 925 public boolean onComplete(Subtask<? extends String> subtask) { 926 throw new FooException(); 927 } 928 @Override 929 public Void result() { 930 return null; 931 } 932 }; 933 var excRef = new AtomicReference<Throwable>(); 934 Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e); 935 ThreadFactory factory = Thread.ofVirtual() 936 .uncaughtExceptionHandler(uhe) 937 .factory(); 938 try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) { 939 scope.fork(() -> "foo"); 940 scope.join(); 941 assertInstanceOf(FooException.class, excRef.get()); 942 } 943 } 944 945 /** 946 * Test Joiner.onComplete returning true to cancel execution. 947 */ 948 @Test 949 void testOnCompleteCancelsExecution() throws Exception { 950 var joiner = new Joiner<String, Void>() { 951 @Override 952 public boolean onComplete(Subtask<? extends String> subtask) { 953 return true; 954 } 955 @Override 956 public Void result() { 957 return null; 958 } 959 }; 960 try (var scope = StructuredTaskScope.open(joiner)) { 961 assertFalse(scope.isCancelled()); 962 scope.fork(() -> "foo"); 963 awaitCancelled(scope); 964 scope.join(); 965 } 966 } 967 968 /** 969 * Test toString. 970 */ 971 @Test 972 void testToString() throws Exception { 973 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 974 cf -> cf.withName("duke"))) { 975 976 // open 977 assertTrue(scope.toString().contains("duke")); 978 979 // closed 980 scope.close(); 981 assertTrue(scope.toString().contains("duke")); 982 } 983 } 984 985 /** 986 * Test Subtask with task that completes successfully. 987 */ 988 @ParameterizedTest 989 @MethodSource("factories") 990 void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception { 991 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 992 cf -> cf.withThreadFactory(factory))) { 993 994 Subtask<String> subtask = scope.fork(() -> "foo"); 995 996 // before join 997 assertThrows(IllegalStateException.class, subtask::get); 998 assertThrows(IllegalStateException.class, subtask::exception); 999 1000 scope.join(); 1001 1002 // after join 1003 assertEquals(Subtask.State.SUCCESS, subtask.state()); 1004 assertEquals("foo", subtask.get()); 1005 assertThrows(IllegalStateException.class, subtask::exception); 1006 } 1007 } 1008 1009 /** 1010 * Test Subtask with task that fails. 1011 */ 1012 @ParameterizedTest 1013 @MethodSource("factories") 1014 void testSubtaskWhenFailed(ThreadFactory factory) throws Exception { 1015 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 1016 cf -> cf.withThreadFactory(factory))) { 1017 1018 Subtask<String> subtask = scope.fork(() -> { throw new FooException(); }); 1019 1020 // before join 1021 assertThrows(IllegalStateException.class, subtask::get); 1022 assertThrows(IllegalStateException.class, subtask::exception); 1023 1024 scope.join(); 1025 1026 // after join 1027 assertEquals(Subtask.State.FAILED, subtask.state()); 1028 assertThrows(IllegalStateException.class, subtask::get); 1029 assertTrue(subtask.exception() instanceof FooException); 1030 } 1031 } 1032 1033 /** 1034 * Test Subtask with a task that has not completed. 1035 */ 1036 @ParameterizedTest 1037 @MethodSource("factories") 1038 void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception { 1039 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 1040 cf -> cf.withThreadFactory(factory))) { 1041 Subtask<Void> subtask = scope.fork(() -> { 1042 Thread.sleep(Duration.ofDays(1)); 1043 return null; 1044 }); 1045 1046 // before join 1047 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1048 assertThrows(IllegalStateException.class, subtask::get); 1049 assertThrows(IllegalStateException.class, subtask::exception); 1050 1051 // attempt join, join throws 1052 Thread.currentThread().interrupt(); 1053 assertThrows(InterruptedException.class, scope::join); 1054 1055 // after join 1056 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1057 assertThrows(IllegalStateException.class, subtask::get); 1058 assertThrows(IllegalStateException.class, subtask::exception); 1059 } 1060 } 1061 1062 /** 1063 * Test Subtask forked after execution cancelled. 1064 */ 1065 @ParameterizedTest 1066 @MethodSource("factories") 1067 void testSubtaskWhenCancelled(ThreadFactory factory) throws Exception { 1068 try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) { 1069 scope.fork(() -> "foo"); 1070 awaitCancelled(scope); 1071 1072 var subtask = scope.fork(() -> "foo"); 1073 1074 // before join 1075 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1076 assertThrows(IllegalStateException.class, subtask::get); 1077 assertThrows(IllegalStateException.class, subtask::exception); 1078 1079 scope.join(); 1080 1081 // after join 1082 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1083 assertThrows(IllegalStateException.class, subtask::get); 1084 assertThrows(IllegalStateException.class, subtask::exception); 1085 } 1086 } 1087 1088 /** 1089 * Test Subtask::toString. 1090 */ 1091 @Test 1092 void testSubtaskToString() throws Exception { 1093 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 1094 var latch = new CountDownLatch(1); 1095 var subtask1 = scope.fork(() -> { 1096 latch.await(); 1097 return "foo"; 1098 }); 1099 var subtask2 = scope.fork(() -> { throw new FooException(); }); 1100 1101 // subtask1 result is unavailable 1102 assertTrue(subtask1.toString().contains("Unavailable")); 1103 latch.countDown(); 1104 1105 scope.join(); 1106 1107 assertTrue(subtask1.toString().contains("Completed successfully")); 1108 assertTrue(subtask2.toString().contains("Failed")); 1109 } 1110 } 1111 1112 /** 1113 * Test Joiner.allSuccessfulOrThrow() with no subtasks. 1114 */ 1115 @Test 1116 void testAllSuccessfulOrThrow1() throws Throwable { 1117 try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow())) { 1118 var subtasks = scope.join().toList(); 1119 assertTrue(subtasks.isEmpty()); 1120 } 1121 } 1122 1123 /** 1124 * Test Joiner.allSuccessfulOrThrow() with subtasks that complete successfully. 1125 */ 1126 @ParameterizedTest 1127 @MethodSource("factories") 1128 void testAllSuccessfulOrThrow2(ThreadFactory factory) throws Throwable { 1129 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(), 1130 cf -> cf.withThreadFactory(factory))) { 1131 var subtask1 = scope.fork(() -> "foo"); 1132 var subtask2 = scope.fork(() -> "bar"); 1133 var subtasks = scope.join().toList(); 1134 assertEquals(List.of(subtask1, subtask2), subtasks); 1135 assertEquals("foo", subtask1.get()); 1136 assertEquals("bar", subtask2.get()); 1137 } 1138 } 1139 1140 /** 1141 * Test Joiner.allSuccessfulOrThrow() with a subtask that complete successfully and 1142 * a subtask that fails. 1143 */ 1144 @ParameterizedTest 1145 @MethodSource("factories") 1146 void testAllSuccessfulOrThrow3(ThreadFactory factory) throws Throwable { 1147 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(), 1148 cf -> cf.withThreadFactory(factory))) { 1149 scope.fork(() -> "foo"); 1150 scope.fork(() -> { throw new FooException(); }); 1151 try { 1152 scope.join(); 1153 } catch (FailedException e) { 1154 assertTrue(e.getCause() instanceof FooException); 1155 } 1156 } 1157 } 1158 1159 /** 1160 * Test Joiner.anySuccessfulResultOrThrow() with no subtasks. 1161 */ 1162 @Test 1163 void testAnySuccessfulResultOrThrow1() throws Exception { 1164 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) { 1165 try { 1166 scope.join(); 1167 } catch (FailedException e) { 1168 assertTrue(e.getCause() instanceof NoSuchElementException); 1169 } 1170 } 1171 } 1172 1173 /** 1174 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully. 1175 */ 1176 @ParameterizedTest 1177 @MethodSource("factories") 1178 void testAnySuccessfulResultOrThrow2(ThreadFactory factory) throws Exception { 1179 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(), 1180 cf -> cf.withThreadFactory(factory))) { 1181 scope.fork(() -> "foo"); 1182 String result = scope.join(); 1183 assertEquals("foo", result); 1184 } 1185 } 1186 1187 /** 1188 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully 1189 * with a null result. 1190 */ 1191 @ParameterizedTest 1192 @MethodSource("factories") 1193 void testAnySuccessfulResultOrThrow3(ThreadFactory factory) throws Exception { 1194 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(), 1195 cf -> cf.withThreadFactory(factory))) { 1196 scope.fork(() -> null); 1197 String result = scope.join(); 1198 assertNull(result); 1199 } 1200 } 1201 1202 /** 1203 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that complete succcessfully 1204 * and a subtask that fails. 1205 */ 1206 @ParameterizedTest 1207 @MethodSource("factories") 1208 void testAnySuccessfulResultOrThrow4(ThreadFactory factory) throws Exception { 1209 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(), 1210 cf -> cf.withThreadFactory(factory))) { 1211 scope.fork(() -> "foo"); 1212 scope.fork(() -> { throw new FooException(); }); 1213 String first = scope.join(); 1214 assertEquals("foo", first); 1215 } 1216 } 1217 1218 /** 1219 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that fails. 1220 */ 1221 @ParameterizedTest 1222 @MethodSource("factories") 1223 void testAnySuccessfulResultOrThrow5(ThreadFactory factory) throws Exception { 1224 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(), 1225 cf -> cf.withThreadFactory(factory))) { 1226 scope.fork(() -> { throw new FooException(); }); 1227 Throwable ex = assertThrows(FailedException.class, scope::join); 1228 assertTrue(ex.getCause() instanceof FooException); 1229 } 1230 } 1231 1232 /** 1233 * Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks. 1234 */ 1235 @Test 1236 void testAwaitSuccessfulOrThrow1() throws Throwable { 1237 try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) { 1238 var result = scope.join(); 1239 assertNull(result); 1240 } 1241 } 1242 1243 /** 1244 * Test Joiner.awaitAllSuccessfulOrThrow() with subtasks that complete successfully. 1245 */ 1246 @ParameterizedTest 1247 @MethodSource("factories") 1248 void testAwaitSuccessfulOrThrow2(ThreadFactory factory) throws Throwable { 1249 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(), 1250 cf -> cf.withThreadFactory(factory))) { 1251 var subtask1 = scope.fork(() -> "foo"); 1252 var subtask2 = scope.fork(() -> "bar"); 1253 var result = scope.join(); 1254 assertNull(result); 1255 assertEquals("foo", subtask1.get()); 1256 assertEquals("bar", subtask2.get()); 1257 } 1258 } 1259 1260 /** 1261 * Test Joiner.awaitAllSuccessfulOrThrow() with a subtask that complete successfully and 1262 * a subtask that fails. 1263 */ 1264 @ParameterizedTest 1265 @MethodSource("factories") 1266 void testAwaitSuccessfulOrThrow3(ThreadFactory factory) throws Throwable { 1267 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(), 1268 cf -> cf.withThreadFactory(factory))) { 1269 scope.fork(() -> "foo"); 1270 scope.fork(() -> { throw new FooException(); }); 1271 try { 1272 scope.join(); 1273 } catch (FailedException e) { 1274 assertTrue(e.getCause() instanceof FooException); 1275 } 1276 } 1277 } 1278 1279 /** 1280 * Test Joiner.awaitAll() with no subtasks. 1281 */ 1282 @Test 1283 void testAwaitAll1() throws Throwable { 1284 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 1285 var result = scope.join(); 1286 assertNull(result); 1287 } 1288 } 1289 1290 /** 1291 * Test Joiner.awaitAll() with subtasks that complete successfully. 1292 */ 1293 @ParameterizedTest 1294 @MethodSource("factories") 1295 void testAwaitAll2(ThreadFactory factory) throws Throwable { 1296 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 1297 cf -> cf.withThreadFactory(factory))) { 1298 var subtask1 = scope.fork(() -> "foo"); 1299 var subtask2 = scope.fork(() -> "bar"); 1300 var result = scope.join(); 1301 assertNull(result); 1302 assertEquals("foo", subtask1.get()); 1303 assertEquals("bar", subtask2.get()); 1304 } 1305 } 1306 1307 /** 1308 * Test Joiner.awaitAll() with a subtask that complete successfully and a subtask 1309 * that fails. 1310 */ 1311 @ParameterizedTest 1312 @MethodSource("factories") 1313 void testAwaitAll3(ThreadFactory factory) throws Throwable { 1314 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 1315 cf -> cf.withThreadFactory(factory))) { 1316 var subtask1 = scope.fork(() -> "foo"); 1317 var subtask2 = scope.fork(() -> { throw new FooException(); }); 1318 var result = scope.join(); 1319 assertNull(result); 1320 assertEquals("foo", subtask1.get()); 1321 assertTrue(subtask2.exception() instanceof FooException); 1322 } 1323 } 1324 1325 /** 1326 * Test Joiner.allUntil(Predicate) with no subtasks. 1327 */ 1328 @Test 1329 void testAllUntil1() throws Throwable { 1330 try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) { 1331 var subtasks = scope.join(); 1332 assertEquals(0, subtasks.count()); 1333 } 1334 } 1335 1336 /** 1337 * Test Joiner.allUntil(Predicate) with no cancellation. 1338 */ 1339 @ParameterizedTest 1340 @MethodSource("factories") 1341 void testAllUntil2(ThreadFactory factory) throws Exception { 1342 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false), 1343 cf -> cf.withThreadFactory(factory))) { 1344 1345 var subtask1 = scope.fork(() -> "foo"); 1346 var subtask2 = scope.fork(() -> { throw new FooException(); }); 1347 1348 var subtasks = scope.join().toList(); 1349 assertEquals(2, subtasks.size()); 1350 1351 assertSame(subtask1, subtasks.get(0)); 1352 assertSame(subtask2, subtasks.get(1)); 1353 assertEquals("foo", subtask1.get()); 1354 assertTrue(subtask2.exception() instanceof FooException); 1355 } 1356 } 1357 1358 /** 1359 * Test Joiner.allUntil(Predicate) with cancellation after one subtask completes. 1360 */ 1361 @ParameterizedTest 1362 @MethodSource("factories") 1363 void testAllUntil3(ThreadFactory factory) throws Exception { 1364 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> true), 1365 cf -> cf.withThreadFactory(factory))) { 1366 1367 var subtask1 = scope.fork(() -> "foo"); 1368 var subtask2 = scope.fork(() -> { 1369 Thread.sleep(Duration.ofDays(1)); 1370 return "bar"; 1371 }); 1372 1373 var subtasks = scope.join().toList(); 1374 1375 assertEquals(2, subtasks.size()); 1376 assertSame(subtask1, subtasks.get(0)); 1377 assertSame(subtask2, subtasks.get(1)); 1378 assertEquals("foo", subtask1.get()); 1379 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 1380 } 1381 } 1382 1383 /** 1384 * Test Joiner.allUntil(Predicate) with cancellation after serveral subtasks complete. 1385 */ 1386 @ParameterizedTest 1387 @MethodSource("factories") 1388 void testAllUntil4(ThreadFactory factory) throws Exception { 1389 1390 // cancel execution after two or more failures 1391 class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> { 1392 final AtomicInteger failedCount = new AtomicInteger(); 1393 @Override 1394 public boolean test(Subtask<? extends T> subtask) { 1395 return subtask.state() == Subtask.State.FAILED 1396 && failedCount.incrementAndGet() >= 2; 1397 } 1398 } 1399 var joiner = Joiner.allUntil(new CancelAfterTwoFailures<String>()); 1400 1401 try (var scope = StructuredTaskScope.open(joiner)) { 1402 int forkCount = 0; 1403 1404 // fork subtasks until execution cancelled 1405 while (!scope.isCancelled()) { 1406 scope.fork(() -> "foo"); 1407 scope.fork(() -> { throw new FooException(); }); 1408 forkCount += 2; 1409 Thread.sleep(Duration.ofMillis(20)); 1410 } 1411 1412 var subtasks = scope.join().toList(); 1413 assertEquals(forkCount, subtasks.size()); 1414 1415 long failedCount = subtasks.stream() 1416 .filter(s -> s.state() == Subtask.State.FAILED) 1417 .count(); 1418 assertTrue(failedCount >= 2); 1419 } 1420 } 1421 1422 /** 1423 * Test Test Joiner.allUntil(Predicate) where the Predicate's test method throws. 1424 */ 1425 @Test 1426 void testAllUntil5() throws Exception { 1427 var joiner = Joiner.allUntil(_ -> { throw new FooException(); }); 1428 var excRef = new AtomicReference<Throwable>(); 1429 Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e); 1430 ThreadFactory factory = Thread.ofVirtual() 1431 .uncaughtExceptionHandler(uhe) 1432 .factory(); 1433 try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) { 1434 scope.fork(() -> "foo"); 1435 scope.join(); 1436 assertInstanceOf(FooException.class, excRef.get()); 1437 } 1438 } 1439 1440 /** 1441 * Test Joiner default methods. 1442 */ 1443 @Test 1444 void testJoinerDefaultMethods() throws Exception { 1445 try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) { 1446 1447 // need subtasks to test default methods 1448 var subtask1 = scope.fork(() -> "foo"); 1449 awaitCancelled(scope); 1450 var subtask2 = scope.fork(() -> "bar"); 1451 scope.join(); 1452 1453 assertEquals(Subtask.State.SUCCESS, subtask1.state()); 1454 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 1455 1456 // Joiner that does not override default methods 1457 Joiner<Object, Void> joiner = () -> null; 1458 assertThrows(NullPointerException.class, () -> joiner.onFork(null)); 1459 assertThrows(NullPointerException.class, () -> joiner.onComplete(null)); 1460 assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1)); 1461 assertFalse(joiner.onFork(subtask2)); 1462 assertFalse(joiner.onComplete(subtask1)); 1463 assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2)); 1464 } 1465 } 1466 1467 /** 1468 * Test Joiners onFork/onComplete methods with a subtask in an unexpected state. 1469 */ 1470 @Test 1471 void testJoinersWithUnavailableResult() throws Exception { 1472 try (var scope = StructuredTaskScope.open()) { 1473 var done = new CountDownLatch(1); 1474 var subtask = scope.fork(() -> { 1475 done.await(); 1476 return null; 1477 }); 1478 1479 // onComplete with uncompleted task should throw IAE 1480 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1481 assertThrows(IllegalArgumentException.class, 1482 () -> Joiner.allSuccessfulOrThrow().onComplete(subtask)); 1483 assertThrows(IllegalArgumentException.class, 1484 () -> Joiner.anySuccessfulResultOrThrow().onComplete(subtask)); 1485 assertThrows(IllegalArgumentException.class, 1486 () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(subtask)); 1487 assertThrows(IllegalArgumentException.class, 1488 () -> Joiner.awaitAll().onComplete(subtask)); 1489 assertThrows(IllegalArgumentException.class, 1490 () -> Joiner.allUntil(_ -> false).onComplete(subtask)); 1491 1492 done.countDown(); 1493 scope.join(); 1494 1495 // onFork with completed task should throw IAE 1496 assertEquals(Subtask.State.SUCCESS, subtask.state()); 1497 assertThrows(IllegalArgumentException.class, 1498 () -> Joiner.allSuccessfulOrThrow().onFork(subtask)); 1499 assertThrows(IllegalArgumentException.class, 1500 () -> Joiner.anySuccessfulResultOrThrow().onFork(subtask)); 1501 assertThrows(IllegalArgumentException.class, 1502 () -> Joiner.awaitAllSuccessfulOrThrow().onFork(subtask)); 1503 assertThrows(IllegalArgumentException.class, 1504 () -> Joiner.awaitAll().onFork(subtask)); 1505 assertThrows(IllegalArgumentException.class, 1506 () -> Joiner.allUntil(_ -> false).onFork(subtask)); 1507 } 1508 1509 } 1510 1511 /** 1512 * Test the Configuration function apply method throwing an exception. 1513 */ 1514 @Test 1515 void testConfigFunctionThrows() throws Exception { 1516 assertThrows(FooException.class, 1517 () -> StructuredTaskScope.open(Joiner.awaitAll(), 1518 cf -> { throw new FooException(); })); 1519 } 1520 1521 /** 1522 * Test Configuration equals/hashCode/toString 1523 */ 1524 @Test 1525 void testConfigMethods() throws Exception { 1526 Function<Configuration, Configuration> testConfig = cf -> { 1527 var name = "duke"; 1528 var threadFactory = Thread.ofPlatform().factory(); 1529 var timeout = Duration.ofSeconds(10); 1530 1531 assertEquals(cf, cf); 1532 assertEquals(cf.withName(name), cf.withName(name)); 1533 assertEquals(cf.withThreadFactory(threadFactory), cf.withThreadFactory(threadFactory)); 1534 assertEquals(cf.withTimeout(timeout), cf.withTimeout(timeout)); 1535 1536 assertNotEquals(cf, cf.withName(name)); 1537 assertNotEquals(cf, cf.withThreadFactory(threadFactory)); 1538 assertNotEquals(cf, cf.withTimeout(timeout)); 1539 1540 assertEquals(cf.withName(name).hashCode(), cf.withName(name).hashCode()); 1541 assertEquals(cf.withThreadFactory(threadFactory).hashCode(), 1542 cf.withThreadFactory(threadFactory).hashCode()); 1543 assertEquals(cf.withTimeout(timeout).hashCode(), cf.withTimeout(timeout).hashCode()); 1544 1545 assertTrue(cf.withName(name).toString().contains(name)); 1546 assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString())); 1547 assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString())); 1548 1549 return cf; 1550 }; 1551 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), testConfig)) { 1552 // do nothing 1553 } 1554 } 1555 1556 /** 1557 * Test for NullPointerException. 1558 */ 1559 @Test 1560 void testNulls() throws Exception { 1561 assertThrows(NullPointerException.class, 1562 () -> StructuredTaskScope.open(null)); 1563 assertThrows(NullPointerException.class, 1564 () -> StructuredTaskScope.open(null, cf -> cf)); 1565 assertThrows(NullPointerException.class, 1566 () -> StructuredTaskScope.open(Joiner.awaitAll(), null)); 1567 1568 assertThrows(NullPointerException.class, () -> Joiner.allUntil(null)); 1569 1570 // fork 1571 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 1572 assertThrows(NullPointerException.class, () -> scope.fork((Callable<Object>) null)); 1573 assertThrows(NullPointerException.class, () -> scope.fork((Runnable) null)); 1574 } 1575 1576 // Configuration and withXXX methods 1577 assertThrows(NullPointerException.class, 1578 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> null)); 1579 assertThrows(NullPointerException.class, 1580 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withName(null))); 1581 assertThrows(NullPointerException.class, 1582 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withThreadFactory(null))); 1583 assertThrows(NullPointerException.class, 1584 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withTimeout(null))); 1585 1586 // Joiner.onFork/onComplete 1587 assertThrows(NullPointerException.class, 1588 () -> Joiner.awaitAllSuccessfulOrThrow().onFork(null)); 1589 assertThrows(NullPointerException.class, 1590 () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(null)); 1591 assertThrows(NullPointerException.class, 1592 () -> Joiner.awaitAll().onFork(null)); 1593 assertThrows(NullPointerException.class, 1594 () -> Joiner.awaitAll().onComplete(null)); 1595 assertThrows(NullPointerException.class, 1596 () -> Joiner.allSuccessfulOrThrow().onFork(null)); 1597 assertThrows(NullPointerException.class, 1598 () -> Joiner.allSuccessfulOrThrow().onComplete(null)); 1599 assertThrows(NullPointerException.class, 1600 () -> Joiner.anySuccessfulResultOrThrow().onFork(null)); 1601 assertThrows(NullPointerException.class, 1602 () -> Joiner.anySuccessfulResultOrThrow().onComplete(null)); 1603 } 1604 1605 /** 1606 * ThreadFactory that counts usage. 1607 */ 1608 private static class CountingThreadFactory implements ThreadFactory { 1609 final ThreadFactory delegate; 1610 final AtomicInteger threadCount = new AtomicInteger(); 1611 CountingThreadFactory(ThreadFactory delegate) { 1612 this.delegate = delegate; 1613 } 1614 @Override 1615 public Thread newThread(Runnable task) { 1616 threadCount.incrementAndGet(); 1617 return delegate.newThread(task); 1618 } 1619 int threadCount() { 1620 return threadCount.get(); 1621 } 1622 } 1623 1624 /** 1625 * A joiner that counts that counts the number of subtasks that are forked and the 1626 * number of subtasks that complete. 1627 */ 1628 private static class CountingJoiner<T> implements Joiner<T, Void> { 1629 final AtomicInteger onForkCount = new AtomicInteger(); 1630 final AtomicInteger onCompleteCount = new AtomicInteger(); 1631 @Override 1632 public boolean onFork(Subtask<? extends T> subtask) { 1633 onForkCount.incrementAndGet(); 1634 return false; 1635 } 1636 @Override 1637 public boolean onComplete(Subtask<? extends T> subtask) { 1638 onCompleteCount.incrementAndGet(); 1639 return false; 1640 } 1641 @Override 1642 public Void result() { 1643 return null; 1644 } 1645 int onForkCount() { 1646 return onForkCount.get(); 1647 } 1648 int onCompleteCount() { 1649 return onCompleteCount.get(); 1650 } 1651 } 1652 1653 /** 1654 * A joiner that cancels execution when a subtask completes. It also keeps a count 1655 * of the number of subtasks that are forked and the number of subtasks that complete. 1656 */ 1657 private static class CancelAfterOneJoiner<T> implements Joiner<T, Void> { 1658 final AtomicInteger onForkCount = new AtomicInteger(); 1659 final AtomicInteger onCompleteCount = new AtomicInteger(); 1660 @Override 1661 public boolean onFork(Subtask<? extends T> subtask) { 1662 onForkCount.incrementAndGet(); 1663 return false; 1664 } 1665 @Override 1666 public boolean onComplete(Subtask<? extends T> subtask) { 1667 onCompleteCount.incrementAndGet(); 1668 return true; 1669 } 1670 @Override 1671 public Void result() { 1672 return null; 1673 } 1674 int onForkCount() { 1675 return onForkCount.get(); 1676 } 1677 int onCompleteCount() { 1678 return onCompleteCount.get(); 1679 } 1680 } 1681 1682 /** 1683 * A runtime exception for tests. 1684 */ 1685 private static class FooException extends RuntimeException { 1686 FooException() { } 1687 FooException(Throwable cause) { super(cause); } 1688 } 1689 1690 /** 1691 * Returns the current time in milliseconds. 1692 */ 1693 private long millisTime() { 1694 long now = System.nanoTime(); 1695 return TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS); 1696 } 1697 1698 /** 1699 * Check the duration of a task 1700 * @param start start time, in milliseconds 1701 * @param min minimum expected duration, in milliseconds 1702 * @param max maximum expected duration, in milliseconds 1703 * @return the duration (now - start), in milliseconds 1704 */ 1705 private long expectDuration(long start, long min, long max) { 1706 long duration = millisTime() - start; 1707 assertTrue(duration >= min, 1708 "Duration " + duration + "ms, expected >= " + min + "ms"); 1709 assertTrue(duration <= max, 1710 "Duration " + duration + "ms, expected <= " + max + "ms"); 1711 return duration; 1712 } 1713 1714 /** 1715 * Wait for the given scope to be cancelled. 1716 */ 1717 private static void awaitCancelled(StructuredTaskScope<?, ?> scope) throws InterruptedException { 1718 while (!scope.isCancelled()) { 1719 Thread.sleep(Duration.ofMillis(20)); 1720 } 1721 } 1722 1723 /** 1724 * Interrupts a thread when it waits (timed or untimed) at location "{@code c.m}". 1725 * {@code c} is the fully qualified class name and {@code m} is the method name. 1726 */ 1727 private void interruptThreadAt(Thread target, String location) throws InterruptedException { 1728 int index = location.lastIndexOf('.'); 1729 String className = location.substring(0, index); 1730 String methodName = location.substring(index + 1); 1731 1732 boolean found = false; 1733 while (!found) { 1734 Thread.State state = target.getState(); 1735 assertTrue(state != TERMINATED); 1736 if ((state == WAITING || state == TIMED_WAITING) 1737 && contains(target.getStackTrace(), className, methodName)) { 1738 found = true; 1739 } else { 1740 Thread.sleep(20); 1741 } 1742 } 1743 target.interrupt(); 1744 } 1745 1746 /** 1747 * Schedules the current thread to be interrupted when it waits (timed or untimed) 1748 * at the given location. 1749 */ 1750 private void scheduleInterruptAt(String location) { 1751 Thread target = Thread.currentThread(); 1752 scheduler.submit(() -> { 1753 interruptThreadAt(target, location); 1754 return null; 1755 }); 1756 } 1757 1758 /** 1759 * Returns true if the given stack trace contains an element for the given class 1760 * and method name. 1761 */ 1762 private boolean contains(StackTraceElement[] stack, String className, String methodName) { 1763 return Arrays.stream(stack) 1764 .anyMatch(e -> className.equals(e.getClassName()) 1765 && methodName.equals(e.getMethodName())); 1766 } 1767 }