1 /* 2 * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test id=platform 26 * @bug 8284199 8296779 8306647 27 * @summary Basic tests for StructuredTaskScope 28 * @enablePreview 29 * @run junit/othervm -DthreadFactory=platform StructuredTaskScopeTest 30 */ 31 32 /* 33 * @test id=virtual 34 * @enablePreview 35 * @run junit/othervm -DthreadFactory=virtual StructuredTaskScopeTest 36 */ 37 38 import java.time.Duration; 39 import java.util.Arrays; 40 import java.util.ArrayList; 41 import java.util.List; 42 import java.util.Set; 43 import java.util.NoSuchElementException; 44 import java.util.concurrent.Callable; 45 import java.util.concurrent.ConcurrentHashMap; 46 import java.util.concurrent.CountDownLatch; 47 import java.util.concurrent.Executors; 48 import java.util.concurrent.Future; 49 import java.util.concurrent.LinkedTransferQueue; 50 import java.util.concurrent.ThreadFactory; 51 import java.util.concurrent.TimeUnit; 52 import java.util.concurrent.RejectedExecutionException; 53 import java.util.concurrent.ScheduledExecutorService; 54 import java.util.concurrent.StructuredTaskScope; 55 import java.util.concurrent.StructuredTaskScope.TimeoutException; 56 import java.util.concurrent.StructuredTaskScope.Config; 57 import java.util.concurrent.StructuredTaskScope.FailedException; 58 import java.util.concurrent.StructuredTaskScope.Joiner; 59 import java.util.concurrent.StructuredTaskScope.Subtask; 60 import java.util.concurrent.StructureViolationException; 61 import java.util.concurrent.atomic.AtomicBoolean; 62 import java.util.concurrent.atomic.AtomicInteger; 63 import java.util.concurrent.atomic.AtomicReference; 64 import java.util.function.Function; 65 import java.util.function.Predicate; 66 import java.util.stream.Stream; 67 import static java.lang.Thread.State.*; 68 69 import org.junit.jupiter.api.Test; 70 import org.junit.jupiter.api.BeforeAll; 71 import org.junit.jupiter.api.AfterAll; 72 import org.junit.jupiter.params.ParameterizedTest; 73 import org.junit.jupiter.params.provider.MethodSource; 74 import static org.junit.jupiter.api.Assertions.*; 75 76 class StructuredTaskScopeTest { 77 private static ScheduledExecutorService scheduler; 78 private static List<ThreadFactory> threadFactories; 79 80 @BeforeAll 81 static void setup() throws Exception { 82 scheduler = Executors.newSingleThreadScheduledExecutor(); 83 84 // thread factories 85 String value = System.getProperty("threadFactory"); 86 List<ThreadFactory> list = new ArrayList<>(); 87 if (value == null || value.equals("platform")) 88 list.add(Thread.ofPlatform().factory()); 89 if (value == null || value.equals("virtual")) 90 list.add(Thread.ofVirtual().factory()); 91 assertTrue(list.size() > 0, "No thread factories for tests"); 92 threadFactories = list; 93 } 94 95 @AfterAll 96 static void shutdown() { 97 scheduler.shutdown(); 98 } 99 100 private static Stream<ThreadFactory> factories() { 101 return threadFactories.stream(); 102 } 103 104 /** 105 * Test that fork creates virtual threads when no ThreadFactory is configured. 106 */ 107 @Test 108 void testForkCreatesVirtualThread() throws Exception { 109 Set<Thread> threads = ConcurrentHashMap.newKeySet(); 110 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 111 for (int i = 0; i < 50; i++) { 112 // runnable 113 scope.fork(() -> { 114 threads.add(Thread.currentThread()); 115 }); 116 117 // callable 118 scope.fork(() -> { 119 threads.add(Thread.currentThread()); 120 return null; 121 }); 122 } 123 scope.join(); 124 } 125 assertEquals(100, threads.size()); 126 threads.forEach(t -> assertTrue(t.isVirtual())); 127 } 128 129 /** 130 * Test that fork create threads with the configured ThreadFactory. 131 */ 132 @ParameterizedTest 133 @MethodSource("factories") 134 void testForkUsesThreadFactory(ThreadFactory factory) throws Exception { 135 // TheadFactory that keeps reference to all threads it creates 136 class RecordingThreadFactory implements ThreadFactory { 137 final ThreadFactory delegate; 138 final Set<Thread> threads = ConcurrentHashMap.newKeySet(); 139 RecordingThreadFactory(ThreadFactory delegate) { 140 this.delegate = delegate; 141 } 142 @Override 143 public Thread newThread(Runnable task) { 144 Thread thread = delegate.newThread(task); 145 threads.add(thread); 146 return thread; 147 } 148 Set<Thread> threads() { 149 return threads; 150 } 151 } 152 var recordingThreadFactory = new RecordingThreadFactory(factory); 153 Set<Thread> threads = ConcurrentHashMap.newKeySet(); 154 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 155 cf -> cf.withThreadFactory(recordingThreadFactory))) { 156 157 for (int i = 0; i < 50; i++) { 158 // runnable 159 scope.fork(() -> { 160 threads.add(Thread.currentThread()); 161 }); 162 163 // callable 164 scope.fork(() -> { 165 threads.add(Thread.currentThread()); 166 return null; 167 }); 168 } 169 scope.join(); 170 } 171 assertEquals(100, threads.size()); 172 assertEquals(recordingThreadFactory.threads(), threads); 173 } 174 175 /** 176 * Test fork method is owner confined. 177 */ 178 @ParameterizedTest 179 @MethodSource("factories") 180 void testForkConfined(ThreadFactory factory) throws Exception { 181 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(), 182 cf -> cf.withThreadFactory(factory))) { 183 184 // random thread cannot fork 185 try (var pool = Executors.newSingleThreadExecutor()) { 186 Future<Void> future = pool.submit(() -> { 187 assertThrows(WrongThreadException.class, () -> { 188 scope.fork(() -> null); 189 }); 190 return null; 191 }); 192 future.get(); 193 } 194 195 // subtask cannot fork 196 Subtask<Boolean> subtask = scope.fork(() -> { 197 assertThrows(WrongThreadException.class, () -> { 198 scope.fork(() -> null); 199 }); 200 return true; 201 }); 202 scope.join(); 203 assertTrue(subtask.get()); 204 } 205 } 206 207 /** 208 * Test fork after join, no subtasks forked before join. 209 */ 210 @ParameterizedTest 211 @MethodSource("factories") 212 void testForkAfterJoin1(ThreadFactory factory) throws Exception { 213 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 214 cf -> cf.withThreadFactory(factory))) { 215 scope.join(); 216 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar")); 217 } 218 } 219 220 /** 221 * Test fork after join, subtasks forked before join. 222 */ 223 @ParameterizedTest 224 @MethodSource("factories") 225 void testForkAfterJoin2(ThreadFactory factory) throws Exception { 226 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 227 cf -> cf.withThreadFactory(factory))) { 228 scope.fork(() -> "foo"); 229 scope.join(); 230 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar")); 231 } 232 } 233 234 /** 235 * Test fork after join throws. 236 */ 237 @ParameterizedTest 238 @MethodSource("factories") 239 void testForkAfterJoinThrows(ThreadFactory factory) throws Exception { 240 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 241 cf -> cf.withThreadFactory(factory))) { 242 var latch = new CountDownLatch(1); 243 var subtask1 = scope.fork(() -> { 244 latch.await(); 245 return "foo"; 246 }); 247 248 // join throws 249 Thread.currentThread().interrupt(); 250 assertThrows(InterruptedException.class, scope::join); 251 252 // fork should throw 253 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar")); 254 } 255 } 256 257 /** 258 * Test fork after task scope is cancelled. This test uses a custom Joiner to 259 * cancel execution. 260 */ 261 @ParameterizedTest 262 @MethodSource("factories") 263 void testForkAfterCancel2(ThreadFactory factory) throws Exception { 264 var countingThreadFactory = new CountingThreadFactory(factory); 265 var testJoiner = new CancelAfterOneJoiner<String>(); 266 267 try (var scope = StructuredTaskScope.open(testJoiner, 268 cf -> cf.withThreadFactory(countingThreadFactory))) { 269 270 // fork subtask, the scope should be cancelled when the subtask completes 271 var subtask1 = scope.fork(() -> "foo"); 272 while (!scope.isCancelled()) { 273 Thread.sleep(20); 274 } 275 276 assertEquals(1, countingThreadFactory.threadCount()); 277 assertEquals(1, testJoiner.onForkCount()); 278 assertEquals(1, testJoiner.onCompleteCount()); 279 280 // fork second subtask, it should not run 281 var subtask2 = scope.fork(() -> "bar"); 282 283 // onFork should be invoked, newThread and onComplete should not be invoked 284 assertEquals(1, countingThreadFactory.threadCount()); 285 assertEquals(2, testJoiner.onForkCount()); 286 assertEquals(1, testJoiner.onCompleteCount()); 287 288 scope.join(); 289 290 assertEquals(1, countingThreadFactory.threadCount()); 291 assertEquals(2, testJoiner.onForkCount()); 292 assertEquals(1, testJoiner.onCompleteCount()); 293 assertEquals("foo", subtask1.get()); 294 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 295 } 296 } 297 298 /** 299 * Test fork after task scope is closed. 300 */ 301 @Test 302 void testForkAfterClose() { 303 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 304 scope.close(); 305 assertThrows(IllegalStateException.class, () -> scope.fork(() -> null)); 306 } 307 } 308 309 /** 310 * Test fork with a ThreadFactory that rejects creating a thread. 311 */ 312 @Test 313 void testForkRejectedExecutionException() { 314 ThreadFactory factory = task -> null; 315 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 316 cf -> cf.withThreadFactory(factory))) { 317 assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null)); 318 } 319 } 320 321 /** 322 * Test join with no subtasks. 323 */ 324 @Test 325 void testJoinWithNoSubtasks() throws Exception { 326 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 327 scope.join(); 328 } 329 } 330 331 /** 332 * Test join with a remaining subtask. 333 */ 334 @ParameterizedTest 335 @MethodSource("factories") 336 void testJoinWithRemainingSubtasks(ThreadFactory factory) throws Exception { 337 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 338 cf -> cf.withThreadFactory(factory))) { 339 Subtask<String> subtask = scope.fork(() -> { 340 Thread.sleep(Duration.ofMillis(100)); 341 return "foo"; 342 }); 343 scope.join(); 344 assertEquals("foo", subtask.get()); 345 } 346 } 347 348 /** 349 * Test join after join completed with a result. 350 */ 351 @Test 352 void testJoinAfterJoin1() throws Exception { 353 var results = new LinkedTransferQueue<>(List.of("foo", "bar", "baz")); 354 Joiner<Object, String> joiner = results::take; 355 try (var scope = StructuredTaskScope.open(joiner)) { 356 scope.fork(() -> "foo"); 357 assertEquals("foo", scope.join()); 358 359 // join already called 360 for (int i = 0 ; i < 3; i++) { 361 assertThrows(IllegalStateException.class, scope::join); 362 } 363 } 364 } 365 366 /** 367 * Test join after join completed with an exception. 368 */ 369 @Test 370 void testJoinAfterJoin2() throws Exception { 371 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) { 372 scope.fork(() -> { throw new FooException(); }); 373 Throwable ex = assertThrows(FailedException.class, scope::join); 374 assertTrue(ex.getCause() instanceof FooException); 375 376 // join already called 377 for (int i = 0 ; i < 3; i++) { 378 assertThrows(IllegalStateException.class, scope::join); 379 } 380 } 381 } 382 383 /** 384 * Test join after join completed with a timeout. 385 */ 386 @Test 387 void testJoinAfterJoin3() throws Exception { 388 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(), 389 cf -> cf.withTimeout(Duration.ofMillis(100)))) { 390 // wait for scope to be cancelled by timeout 391 while (!scope.isCancelled()) { 392 Thread.sleep(20); 393 } 394 assertThrows(TimeoutException.class, scope::join); 395 396 // join already called 397 for (int i = 0 ; i < 3; i++) { 398 assertThrows(IllegalStateException.class, scope::join); 399 } 400 } 401 } 402 403 /** 404 * Test join method is owner confined. 405 */ 406 @ParameterizedTest 407 @MethodSource("factories") 408 void testJoinConfined(ThreadFactory factory) throws Exception { 409 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(), 410 cf -> cf.withThreadFactory(factory))) { 411 412 // random thread cannot join 413 try (var pool = Executors.newSingleThreadExecutor()) { 414 Future<Void> future = pool.submit(() -> { 415 assertThrows(WrongThreadException.class, scope::join); 416 return null; 417 }); 418 future.get(); 419 } 420 421 // subtask cannot join 422 Subtask<Boolean> subtask = scope.fork(() -> { 423 assertThrows(WrongThreadException.class, () -> { scope.join(); }); 424 return true; 425 }); 426 scope.join(); 427 assertTrue(subtask.get()); 428 } 429 } 430 431 /** 432 * Test join with interrupt status set. 433 */ 434 @ParameterizedTest 435 @MethodSource("factories") 436 void testInterruptJoin1(ThreadFactory factory) throws Exception { 437 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 438 cf -> cf.withThreadFactory(factory))) { 439 440 Subtask<String> subtask = scope.fork(() -> { 441 Thread.sleep(60_000); 442 return "foo"; 443 }); 444 445 // join should throw 446 Thread.currentThread().interrupt(); 447 try { 448 scope.join(); 449 fail("join did not throw"); 450 } catch (InterruptedException expected) { 451 assertFalse(Thread.interrupted()); // interrupt status should be cleared 452 } 453 } 454 } 455 456 /** 457 * Test interrupt of thread blocked in join. 458 */ 459 @ParameterizedTest 460 @MethodSource("factories") 461 void testInterruptJoin2(ThreadFactory factory) throws Exception { 462 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 463 cf -> cf.withThreadFactory(factory))) { 464 465 var latch = new CountDownLatch(1); 466 Subtask<String> subtask = scope.fork(() -> { 467 Thread.sleep(60_000); 468 return "foo"; 469 }); 470 471 // interrupt main thread when it blocks in join 472 scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join"); 473 try { 474 scope.join(); 475 fail("join did not throw"); 476 } catch (InterruptedException expected) { 477 assertFalse(Thread.interrupted()); // interrupt status should be clear 478 } 479 } 480 } 481 482 /** 483 * Test join when scope is cancelled. 484 */ 485 @ParameterizedTest 486 @MethodSource("factories") 487 void testJoinWhenCancelled(ThreadFactory factory) throws Exception { 488 var countingThreadFactory = new CountingThreadFactory(factory); 489 var testJoiner = new CancelAfterOneJoiner<String>(); 490 491 try (var scope = StructuredTaskScope.open(testJoiner, 492 cf -> cf.withThreadFactory(countingThreadFactory))) { 493 494 // fork subtask, the scope should be cancelled when the subtask completes 495 var subtask1 = scope.fork(() -> "foo"); 496 while (!scope.isCancelled()) { 497 Thread.sleep(20); 498 } 499 500 // fork second subtask, it should not run 501 var subtask2 = scope.fork(() -> { 502 Thread.sleep(Duration.ofDays(1)); 503 return "bar"; 504 }); 505 506 scope.join(); 507 508 assertEquals("foo", subtask1.get()); 509 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 510 } 511 } 512 513 /** 514 * Test join after scope is closed. 515 */ 516 @Test 517 void testJoinAfterClose() throws Exception { 518 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 519 scope.close(); 520 assertThrows(IllegalStateException.class, () -> scope.join()); 521 } 522 } 523 524 /** 525 * Test join with timeout, subtasks finish before timeout expires. 526 */ 527 @ParameterizedTest 528 @MethodSource("factories") 529 void testJoinWithTimeout1(ThreadFactory factory) throws Exception { 530 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 531 cf -> cf.withThreadFactory(factory) 532 .withTimeout(Duration.ofDays(1)))) { 533 534 Subtask<String> subtask = scope.fork(() -> { 535 Thread.sleep(Duration.ofSeconds(1)); 536 return "foo"; 537 }); 538 539 scope.join(); 540 541 assertFalse(scope.isCancelled()); 542 assertEquals("foo", subtask.get()); 543 } 544 } 545 546 /** 547 * Test join with timeout, timeout expires before subtasks finish. 548 */ 549 @ParameterizedTest 550 @MethodSource("factories") 551 void testJoinWithTimeout2(ThreadFactory factory) throws Exception { 552 long startMillis = millisTime(); 553 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 554 cf -> cf.withThreadFactory(factory) 555 .withTimeout(Duration.ofSeconds(2)))) { 556 557 Subtask<Void> subtask = scope.fork(() -> { 558 Thread.sleep(Duration.ofDays(1)); 559 return null; 560 }); 561 562 assertThrows(TimeoutException.class, scope::join); 563 expectDuration(startMillis, /*min*/1900, /*max*/20_000); 564 565 assertTrue(scope.isCancelled()); 566 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 567 } 568 } 569 570 /** 571 * Test join with timeout that has already expired. 572 */ 573 @ParameterizedTest 574 @MethodSource("factories") 575 void testJoinWithTimeout3(ThreadFactory factory) throws Exception { 576 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 577 cf -> cf.withThreadFactory(factory) 578 .withTimeout(Duration.ofSeconds(-1)))) { 579 580 Subtask<Void> subtask = scope.fork(() -> { 581 Thread.sleep(Duration.ofDays(1)); 582 return null; 583 }); 584 585 assertThrows(TimeoutException.class, scope::join); 586 587 assertTrue(scope.isCancelled()); 588 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 589 } 590 } 591 592 /** 593 * Test that cancelling execution interrupts unfinished threads. This test uses 594 * a custom Joiner to cancel execution. 595 */ 596 @ParameterizedTest 597 @MethodSource("factories") 598 void testCancelInterruptsThreads2(ThreadFactory factory) throws Exception { 599 var testJoiner = new CancelAfterOneJoiner<String>(); 600 601 try (var scope = StructuredTaskScope.open(testJoiner, 602 cf -> cf.withThreadFactory(factory))) { 603 604 // fork subtask1 that runs for a long time 605 var started = new CountDownLatch(1); 606 var interrupted = new CountDownLatch(1); 607 var subtask1 = scope.fork(() -> { 608 started.countDown(); 609 try { 610 Thread.sleep(Duration.ofDays(1)); 611 } catch (InterruptedException e) { 612 interrupted.countDown(); 613 } 614 }); 615 started.await(); 616 617 // fork subtask2, the scope should be cancelled when the subtask completes 618 var subtask2 = scope.fork(() -> "bar"); 619 while (!scope.isCancelled()) { 620 Thread.sleep(20); 621 } 622 623 // subtask1 should be interrupted 624 interrupted.await(); 625 626 scope.join(); 627 assertEquals(Subtask.State.UNAVAILABLE, subtask1.state()); 628 assertEquals("bar", subtask2.get()); 629 } 630 } 631 632 /** 633 * Test that timeout interrupts unfinished threads. 634 */ 635 @ParameterizedTest 636 @MethodSource("factories") 637 void testTimeoutInterruptsThreads(ThreadFactory factory) throws Exception { 638 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 639 cf -> cf.withThreadFactory(factory) 640 .withTimeout(Duration.ofSeconds(2)))) { 641 642 var started = new AtomicBoolean(); 643 var interrupted = new CountDownLatch(1); 644 Subtask<Void> subtask = scope.fork(() -> { 645 started.set(true); 646 try { 647 Thread.sleep(Duration.ofDays(1)); 648 } catch (InterruptedException e) { 649 interrupted.countDown(); 650 } 651 return null; 652 }); 653 654 while (!scope.isCancelled()) { 655 Thread.sleep(50); 656 } 657 658 // if subtask started then it should be interrupted 659 if (started.get()) { 660 interrupted.await(); 661 } 662 663 assertThrows(TimeoutException.class, scope::join); 664 665 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 666 } 667 } 668 669 /** 670 * Test close without join, no subtasks forked. 671 */ 672 @Test 673 void testCloseWithoutJoin1() { 674 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 675 // do nothing 676 } 677 } 678 679 /** 680 * Test close without join, subtasks forked. 681 */ 682 @ParameterizedTest 683 @MethodSource("factories") 684 void testCloseWithoutJoin2(ThreadFactory factory) { 685 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 686 cf -> cf.withThreadFactory(factory))) { 687 Subtask<String> subtask = scope.fork(() -> { 688 Thread.sleep(Duration.ofDays(1)); 689 return null; 690 }); 691 692 // first call to close should throw 693 assertThrows(IllegalStateException.class, scope::close); 694 695 // subsequent calls to close should not throw 696 for (int i = 0; i < 3; i++) { 697 scope.close(); 698 } 699 700 // subtask result/exception not available 701 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 702 assertThrows(IllegalStateException.class, subtask::get); 703 assertThrows(IllegalStateException.class, subtask::exception); 704 } 705 } 706 707 /** 708 * Test close after join throws. Close should not throw as join attempted. 709 */ 710 @ParameterizedTest 711 @MethodSource("factories") 712 void testCloseAfterJoinThrows(ThreadFactory factory) throws Exception { 713 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 714 cf -> cf.withThreadFactory(factory))) { 715 var subtask = scope.fork(() -> { 716 Thread.sleep(Duration.ofDays(1)); 717 return null; 718 }); 719 720 // join throws 721 Thread.currentThread().interrupt(); 722 assertThrows(InterruptedException.class, scope::join); 723 assertThrows(IllegalStateException.class, subtask::get); 724 725 } // close should not throw 726 } 727 728 /** 729 * Test close method is owner confined. 730 */ 731 @ParameterizedTest 732 @MethodSource("factories") 733 void testCloseConfined(ThreadFactory factory) throws Exception { 734 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(), 735 cf -> cf.withThreadFactory(factory))) { 736 737 // random thread cannot close scope 738 try (var pool = Executors.newCachedThreadPool(factory)) { 739 Future<Boolean> future = pool.submit(() -> { 740 assertThrows(WrongThreadException.class, scope::close); 741 return null; 742 }); 743 future.get(); 744 } 745 746 // subtask cannot close 747 Subtask<Boolean> subtask = scope.fork(() -> { 748 assertThrows(WrongThreadException.class, scope::close); 749 return true; 750 }); 751 scope.join(); 752 assertTrue(subtask.get()); 753 } 754 } 755 756 /** 757 * Test close with interrupt status set. 758 */ 759 @ParameterizedTest 760 @MethodSource("factories") 761 void testInterruptClose1(ThreadFactory factory) throws Exception { 762 var testJoiner = new CancelAfterOneJoiner<String>(); 763 try (var scope = StructuredTaskScope.open(testJoiner, 764 cf -> cf.withThreadFactory(factory))) { 765 766 // fork first subtask, a straggler as it continues after being interrupted 767 var started = new CountDownLatch(1); 768 var done = new AtomicBoolean(); 769 scope.fork(() -> { 770 started.countDown(); 771 try { 772 Thread.sleep(Duration.ofDays(1)); 773 } catch (InterruptedException e) { 774 // interrupted by cancel, expected 775 } 776 Thread.sleep(Duration.ofMillis(100)); // force close to wait 777 done.set(true); 778 return null; 779 }); 780 started.await(); 781 782 // fork second subtask, the scope should be cancelled when this subtask completes 783 scope.fork(() -> "bar"); 784 while (!scope.isCancelled()) { 785 Thread.sleep(20); 786 } 787 788 scope.join(); 789 790 // invoke close with interrupt status set 791 Thread.currentThread().interrupt(); 792 try { 793 scope.close(); 794 } finally { 795 assertTrue(Thread.interrupted()); // clear interrupt status 796 assertTrue(done.get()); 797 } 798 } 799 } 800 801 /** 802 * Test interrupting thread waiting in close. 803 */ 804 @ParameterizedTest 805 @MethodSource("factories") 806 void testInterruptClose2(ThreadFactory factory) throws Exception { 807 var testJoiner = new CancelAfterOneJoiner<String>(); 808 try (var scope = StructuredTaskScope.open(testJoiner, 809 cf -> cf.withThreadFactory(factory))) { 810 811 Thread mainThread = Thread.currentThread(); 812 813 // fork first subtask, a straggler as it continues after being interrupted 814 var started = new CountDownLatch(1); 815 var done = new AtomicBoolean(); 816 scope.fork(() -> { 817 started.countDown(); 818 try { 819 Thread.sleep(Duration.ofDays(1)); 820 } catch (InterruptedException e) { 821 // interrupted by cancel, expected 822 } 823 824 // interrupt main thread when it blocks in close 825 interruptThreadAt(mainThread, "java.util.concurrent.StructuredTaskScopeImpl.close"); 826 827 Thread.sleep(Duration.ofMillis(100)); // force close to wait 828 done.set(true); 829 return null; 830 }); 831 started.await(); 832 833 // fork second subtask, the scope should be cancelled when this subtask completes 834 scope.fork(() -> "bar"); 835 while (!scope.isCancelled()) { 836 Thread.sleep(20); 837 } 838 839 scope.join(); 840 841 // main thread will be interrupted while blocked in close 842 try { 843 scope.close(); 844 } finally { 845 assertTrue(Thread.interrupted()); // clear interrupt status 846 assertTrue(done.get()); 847 } 848 } 849 } 850 851 /** 852 * Test that closing an enclosing scope closes the thread flock of a nested scope. 853 */ 854 @Test 855 void testCloseThrowsStructureViolation() throws Exception { 856 try (var scope1 = StructuredTaskScope.open(Joiner.awaitAll())) { 857 try (var scope2 = StructuredTaskScope.open(Joiner.awaitAll())) { 858 859 // close enclosing scope 860 try { 861 scope1.close(); 862 fail("close did not throw"); 863 } catch (StructureViolationException expected) { } 864 865 // underlying flock should be closed 866 var executed = new AtomicBoolean(); 867 Subtask<?> subtask = scope2.fork(() -> executed.set(true)); 868 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 869 scope2.join(); 870 assertFalse(executed.get()); 871 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 872 } 873 } 874 } 875 876 /** 877 * Test that isCancelled returns true after close. 878 */ 879 @Test 880 void testIsCancelledAfterClose() throws Exception { 881 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 882 assertFalse(scope.isCancelled()); 883 scope.close(); 884 assertTrue(scope.isCancelled()); 885 } 886 } 887 888 /** 889 * Test Joiner.onFork throwing exception. 890 */ 891 @Test 892 void testOnForkThrows() throws Exception { 893 var joiner = new Joiner<String, Void>() { 894 @Override 895 public boolean onFork(Subtask<? extends String> subtask) { 896 throw new FooException(); 897 } 898 @Override 899 public Void result() { 900 return null; 901 } 902 }; 903 try (var scope = StructuredTaskScope.open(joiner)) { 904 assertThrows(FooException.class, () -> scope.fork(() -> "foo")); 905 } 906 } 907 908 /** 909 * Test Joiner.onFork returning true to cancel execution. 910 */ 911 @Test 912 void testOnForkCancelsExecution() throws Exception { 913 var joiner = new Joiner<String, Void>() { 914 @Override 915 public boolean onFork(Subtask<? extends String> subtask) { 916 return true; 917 } 918 @Override 919 public Void result() { 920 return null; 921 } 922 }; 923 try (var scope = StructuredTaskScope.open(joiner)) { 924 assertFalse(scope.isCancelled()); 925 scope.fork(() -> "foo"); 926 assertTrue(scope.isCancelled()); 927 scope.join(); 928 } 929 } 930 931 /** 932 * Test Joiner.onComplete throwing exception causes UHE to be invoked. 933 */ 934 @Test 935 void testOnCompleteThrows() throws Exception { 936 var joiner = new Joiner<String, Void>() { 937 @Override 938 public boolean onComplete(Subtask<? extends String> subtask) { 939 throw new FooException(); 940 } 941 @Override 942 public Void result() { 943 return null; 944 } 945 }; 946 var excRef = new AtomicReference<Throwable>(); 947 Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e); 948 ThreadFactory factory = Thread.ofVirtual() 949 .uncaughtExceptionHandler(uhe) 950 .factory(); 951 try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) { 952 scope.fork(() -> "foo"); 953 scope.join(); 954 assertInstanceOf(FooException.class, excRef.get()); 955 } 956 } 957 958 /** 959 * Test Joiner.onComplete returning true to cancel execution. 960 */ 961 @Test 962 void testOnCompleteCancelsExecution() throws Exception { 963 var joiner = new Joiner<String, Void>() { 964 @Override 965 public boolean onComplete(Subtask<? extends String> subtask) { 966 return true; 967 } 968 @Override 969 public Void result() { 970 return null; 971 } 972 }; 973 try (var scope = StructuredTaskScope.open(joiner)) { 974 assertFalse(scope.isCancelled()); 975 scope.fork(() -> "foo"); 976 while (!scope.isCancelled()) { 977 Thread.sleep(10); 978 } 979 scope.join(); 980 } 981 } 982 983 /** 984 * Test toString. 985 */ 986 @Test 987 void testToString() throws Exception { 988 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 989 cf -> cf.withName("duke"))) { 990 991 // open 992 assertTrue(scope.toString().contains("duke")); 993 994 // closed 995 scope.close(); 996 assertTrue(scope.toString().contains("duke")); 997 } 998 } 999 1000 /** 1001 * Test Subtask with task that completes successfully. 1002 */ 1003 @ParameterizedTest 1004 @MethodSource("factories") 1005 void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception { 1006 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 1007 cf -> cf.withThreadFactory(factory))) { 1008 1009 Subtask<String> subtask = scope.fork(() -> "foo"); 1010 1011 // before join 1012 assertThrows(IllegalStateException.class, subtask::get); 1013 assertThrows(IllegalStateException.class, subtask::exception); 1014 1015 scope.join(); 1016 1017 // after join 1018 assertEquals(Subtask.State.SUCCESS, subtask.state()); 1019 assertEquals("foo", subtask.get()); 1020 assertThrows(IllegalStateException.class, subtask::exception); 1021 } 1022 } 1023 1024 /** 1025 * Test Subtask with task that fails. 1026 */ 1027 @ParameterizedTest 1028 @MethodSource("factories") 1029 void testSubtaskWhenFailed(ThreadFactory factory) throws Exception { 1030 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 1031 cf -> cf.withThreadFactory(factory))) { 1032 1033 Subtask<String> subtask = scope.fork(() -> { throw new FooException(); }); 1034 1035 // before join 1036 assertThrows(IllegalStateException.class, subtask::get); 1037 assertThrows(IllegalStateException.class, subtask::exception); 1038 1039 scope.join(); 1040 1041 // after join 1042 assertEquals(Subtask.State.FAILED, subtask.state()); 1043 assertThrows(IllegalStateException.class, subtask::get); 1044 assertTrue(subtask.exception() instanceof FooException); 1045 } 1046 } 1047 1048 /** 1049 * Test Subtask with a task that has not completed. 1050 */ 1051 @ParameterizedTest 1052 @MethodSource("factories") 1053 void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception { 1054 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 1055 cf -> cf.withThreadFactory(factory))) { 1056 Subtask<Void> subtask = scope.fork(() -> { 1057 Thread.sleep(Duration.ofDays(1)); 1058 return null; 1059 }); 1060 1061 // before join 1062 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1063 assertThrows(IllegalStateException.class, subtask::get); 1064 assertThrows(IllegalStateException.class, subtask::exception); 1065 1066 // attempt join, join throws 1067 Thread.currentThread().interrupt(); 1068 assertThrows(InterruptedException.class, scope::join); 1069 1070 // after join 1071 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1072 assertThrows(IllegalStateException.class, subtask::get); 1073 assertThrows(IllegalStateException.class, subtask::exception); 1074 } 1075 } 1076 1077 /** 1078 * Test Subtask forked after execution cancelled. 1079 */ 1080 @ParameterizedTest 1081 @MethodSource("factories") 1082 void testSubtaskWhenCancelled(ThreadFactory factory) throws Exception { 1083 try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) { 1084 scope.fork(() -> "foo"); 1085 while (!scope.isCancelled()) { 1086 Thread.sleep(20); 1087 } 1088 1089 var subtask = scope.fork(() -> "foo"); 1090 1091 // before join 1092 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1093 assertThrows(IllegalStateException.class, subtask::get); 1094 assertThrows(IllegalStateException.class, subtask::exception); 1095 1096 scope.join(); 1097 1098 // after join 1099 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1100 assertThrows(IllegalStateException.class, subtask::get); 1101 assertThrows(IllegalStateException.class, subtask::exception); 1102 } 1103 } 1104 1105 /** 1106 * Test Subtask::toString. 1107 */ 1108 @Test 1109 void testSubtaskToString() throws Exception { 1110 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 1111 var latch = new CountDownLatch(1); 1112 var subtask1 = scope.fork(() -> { 1113 latch.await(); 1114 return "foo"; 1115 }); 1116 var subtask2 = scope.fork(() -> { throw new FooException(); }); 1117 1118 // subtask1 result is unavailable 1119 assertTrue(subtask1.toString().contains("Unavailable")); 1120 latch.countDown(); 1121 1122 scope.join(); 1123 1124 assertTrue(subtask1.toString().contains("Completed successfully")); 1125 assertTrue(subtask2.toString().contains("Failed")); 1126 } 1127 } 1128 1129 /** 1130 * Test Joiner.allSuccessfulOrThrow() with no subtasks. 1131 */ 1132 @Test 1133 void testAllSuccessfulOrThrow1() throws Throwable { 1134 try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow())) { 1135 var subtasks = scope.join().toList(); 1136 assertTrue(subtasks.isEmpty()); 1137 } 1138 } 1139 1140 /** 1141 * Test Joiner.allSuccessfulOrThrow() with subtasks that complete successfully. 1142 */ 1143 @ParameterizedTest 1144 @MethodSource("factories") 1145 void testAllSuccessfulOrThrow2(ThreadFactory factory) throws Throwable { 1146 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(), 1147 cf -> cf.withThreadFactory(factory))) { 1148 var subtask1 = scope.fork(() -> "foo"); 1149 var subtask2 = scope.fork(() -> "bar"); 1150 var subtasks = scope.join().toList(); 1151 assertEquals(List.of(subtask1, subtask2), subtasks); 1152 assertEquals("foo", subtask1.get()); 1153 assertEquals("bar", subtask2.get()); 1154 } 1155 } 1156 1157 /** 1158 * Test Joiner.allSuccessfulOrThrow() with a subtask that complete successfully and 1159 * a subtask that fails. 1160 */ 1161 @ParameterizedTest 1162 @MethodSource("factories") 1163 void testAllSuccessfulOrThrow3(ThreadFactory factory) throws Throwable { 1164 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(), 1165 cf -> cf.withThreadFactory(factory))) { 1166 scope.fork(() -> "foo"); 1167 scope.fork(() -> { throw new FooException(); }); 1168 try { 1169 scope.join(); 1170 } catch (FailedException e) { 1171 assertTrue(e.getCause() instanceof FooException); 1172 } 1173 } 1174 } 1175 1176 /** 1177 * Test Joiner.anySuccessfulResultOrThrow() with no subtasks. 1178 */ 1179 @Test 1180 void testAnySuccessfulResultOrThrow1() throws Exception { 1181 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) { 1182 try { 1183 scope.join(); 1184 } catch (FailedException e) { 1185 assertTrue(e.getCause() instanceof NoSuchElementException); 1186 } 1187 } 1188 } 1189 1190 /** 1191 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully. 1192 */ 1193 @ParameterizedTest 1194 @MethodSource("factories") 1195 void testAnySuccessfulResultOrThrow2(ThreadFactory factory) throws Exception { 1196 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(), 1197 cf -> cf.withThreadFactory(factory))) { 1198 scope.fork(() -> "foo"); 1199 String result = scope.join(); 1200 assertEquals("foo", result); 1201 } 1202 } 1203 1204 /** 1205 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully 1206 * with a null result. 1207 */ 1208 @ParameterizedTest 1209 @MethodSource("factories") 1210 void testAnySuccessfulResultOrThrow3(ThreadFactory factory) throws Exception { 1211 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(), 1212 cf -> cf.withThreadFactory(factory))) { 1213 scope.fork(() -> null); 1214 String result = scope.join(); 1215 assertNull(result); 1216 } 1217 } 1218 1219 /** 1220 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that complete succcessfully 1221 * and a subtask that fails. 1222 */ 1223 @ParameterizedTest 1224 @MethodSource("factories") 1225 void testAnySuccessfulResultOrThrow4(ThreadFactory factory) throws Exception { 1226 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(), 1227 cf -> cf.withThreadFactory(factory))) { 1228 scope.fork(() -> "foo"); 1229 scope.fork(() -> { throw new FooException(); }); 1230 String first = scope.join(); 1231 assertEquals("foo", first); 1232 } 1233 } 1234 1235 /** 1236 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that fails. 1237 */ 1238 @ParameterizedTest 1239 @MethodSource("factories") 1240 void testAnySuccessfulResultOrThrow5(ThreadFactory factory) throws Exception { 1241 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(), 1242 cf -> cf.withThreadFactory(factory))) { 1243 scope.fork(() -> { throw new FooException(); }); 1244 Throwable ex = assertThrows(FailedException.class, scope::join); 1245 assertTrue(ex.getCause() instanceof FooException); 1246 } 1247 } 1248 1249 /** 1250 * Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks. 1251 */ 1252 @Test 1253 void testAwaitSuccessfulOrThrow1() throws Throwable { 1254 try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) { 1255 var result = scope.join(); 1256 assertNull(result); 1257 } 1258 } 1259 1260 /** 1261 * Test Joiner.awaitAllSuccessfulOrThrow() with subtasks that complete successfully. 1262 */ 1263 @ParameterizedTest 1264 @MethodSource("factories") 1265 void testAwaitSuccessfulOrThrow2(ThreadFactory factory) throws Throwable { 1266 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(), 1267 cf -> cf.withThreadFactory(factory))) { 1268 var subtask1 = scope.fork(() -> "foo"); 1269 var subtask2 = scope.fork(() -> "bar"); 1270 var result = scope.join(); 1271 assertNull(result); 1272 assertEquals("foo", subtask1.get()); 1273 assertEquals("bar", subtask2.get()); 1274 } 1275 } 1276 1277 /** 1278 * Test Joiner.awaitAllSuccessfulOrThrow() with a subtask that complete successfully and 1279 * a subtask that fails. 1280 */ 1281 @ParameterizedTest 1282 @MethodSource("factories") 1283 void testAwaitSuccessfulOrThrow3(ThreadFactory factory) throws Throwable { 1284 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(), 1285 cf -> cf.withThreadFactory(factory))) { 1286 scope.fork(() -> "foo"); 1287 scope.fork(() -> { throw new FooException(); }); 1288 try { 1289 scope.join(); 1290 } catch (FailedException e) { 1291 assertTrue(e.getCause() instanceof FooException); 1292 } 1293 } 1294 } 1295 1296 /** 1297 * Test Joiner.awaitAll() with no subtasks. 1298 */ 1299 @Test 1300 void testAwaitAll1() throws Throwable { 1301 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 1302 var result = scope.join(); 1303 assertNull(result); 1304 } 1305 } 1306 1307 /** 1308 * Test Joiner.awaitAll() with subtasks that complete successfully. 1309 */ 1310 @ParameterizedTest 1311 @MethodSource("factories") 1312 void testAwaitAll2(ThreadFactory factory) throws Throwable { 1313 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 1314 cf -> cf.withThreadFactory(factory))) { 1315 var subtask1 = scope.fork(() -> "foo"); 1316 var subtask2 = scope.fork(() -> "bar"); 1317 var result = scope.join(); 1318 assertNull(result); 1319 assertEquals("foo", subtask1.get()); 1320 assertEquals("bar", subtask2.get()); 1321 } 1322 } 1323 1324 /** 1325 * Test Joiner.awaitAll() with a subtask that complete successfully and a subtask 1326 * that fails. 1327 */ 1328 @ParameterizedTest 1329 @MethodSource("factories") 1330 void testAwaitAll3(ThreadFactory factory) throws Throwable { 1331 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 1332 cf -> cf.withThreadFactory(factory))) { 1333 var subtask1 = scope.fork(() -> "foo"); 1334 var subtask2 = scope.fork(() -> { throw new FooException(); }); 1335 var result = scope.join(); 1336 assertNull(result); 1337 assertEquals("foo", subtask1.get()); 1338 assertTrue(subtask2.exception() instanceof FooException); 1339 } 1340 } 1341 1342 /** 1343 * Test Joiner.allUntil(Predicate) with no subtasks. 1344 */ 1345 @Test 1346 void testAllUntil1() throws Throwable { 1347 try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) { 1348 var subtasks = scope.join(); 1349 assertEquals(0, subtasks.count()); 1350 } 1351 } 1352 1353 /** 1354 * Test Joiner.allUntil(Predicate) with no cancellation. 1355 */ 1356 @ParameterizedTest 1357 @MethodSource("factories") 1358 void testAllUntil2(ThreadFactory factory) throws Exception { 1359 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false), 1360 cf -> cf.withThreadFactory(factory))) { 1361 1362 var subtask1 = scope.fork(() -> "foo"); 1363 var subtask2 = scope.fork(() -> { throw new FooException(); }); 1364 1365 var subtasks = scope.join().toList(); 1366 assertEquals(2, subtasks.size()); 1367 1368 assertTrue(subtasks.get(0) == subtask1); 1369 assertTrue(subtasks.get(1) == subtask2); 1370 assertEquals("foo", subtask1.get()); 1371 assertTrue(subtask2.exception() instanceof FooException); 1372 } 1373 } 1374 1375 /** 1376 * Test Joiner.allUntil(Predicate) with cancellation after one subtask completes. 1377 */ 1378 @ParameterizedTest 1379 @MethodSource("factories") 1380 void testAllUntil3(ThreadFactory factory) throws Exception { 1381 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> true), 1382 cf -> cf.withThreadFactory(factory))) { 1383 1384 var subtask1 = scope.fork(() -> "foo"); 1385 var subtask2 = scope.fork(() -> { 1386 Thread.sleep(Duration.ofDays(1)); 1387 return "bar"; 1388 }); 1389 1390 var subtasks = scope.join().toList(); 1391 1392 assertEquals(2, subtasks.size()); 1393 assertTrue(subtasks.get(0) == subtask1); 1394 assertTrue(subtasks.get(1) == subtask2); 1395 assertEquals("foo", subtask1.get()); 1396 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 1397 } 1398 } 1399 1400 /** 1401 * Test Joiner.allUntil(Predicate) with cancellation after serveral subtasks complete. 1402 */ 1403 @ParameterizedTest 1404 @MethodSource("factories") 1405 void testAllUntil4(ThreadFactory factory) throws Exception { 1406 1407 // cancel execution after two or more failures 1408 class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> { 1409 final AtomicInteger failedCount = new AtomicInteger(); 1410 @Override 1411 public boolean test(Subtask<? extends T> subtask) { 1412 return subtask.state() == Subtask.State.FAILED 1413 && failedCount.incrementAndGet() >= 2; 1414 } 1415 } 1416 var joiner = Joiner.allUntil(new CancelAfterTwoFailures<String>()); 1417 1418 try (var scope = StructuredTaskScope.open(joiner)) { 1419 int forkCount = 0; 1420 1421 // fork subtasks until execution cancelled 1422 while (!scope.isCancelled()) { 1423 scope.fork(() -> "foo"); 1424 scope.fork(() -> { throw new FooException(); }); 1425 forkCount += 2; 1426 Thread.sleep(Duration.ofMillis(10)); 1427 } 1428 1429 var subtasks = scope.join().toList(); 1430 assertEquals(forkCount, subtasks.size()); 1431 1432 long failedCount = subtasks.stream() 1433 .filter(s -> s.state() == Subtask.State.FAILED) 1434 .count(); 1435 assertTrue(failedCount >= 2); 1436 } 1437 } 1438 1439 /** 1440 * Test Test Joiner.allUntil(Predicate) where the Predicate's test method throws. 1441 */ 1442 @Test 1443 void testAllUntil5() throws Exception { 1444 var joiner = Joiner.allUntil(_ -> { throw new FooException(); }); 1445 var excRef = new AtomicReference<Throwable>(); 1446 Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e); 1447 ThreadFactory factory = Thread.ofVirtual() 1448 .uncaughtExceptionHandler(uhe) 1449 .factory(); 1450 try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) { 1451 scope.fork(() -> "foo"); 1452 scope.join(); 1453 assertInstanceOf(FooException.class, excRef.get()); 1454 } 1455 } 1456 1457 /** 1458 * Test Joiner default methods. 1459 */ 1460 @Test 1461 void testJoinerDefaultMethods() throws Exception { 1462 try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) { 1463 1464 // need subtasks to test default methods 1465 var subtask1 = scope.fork(() -> "foo"); 1466 while (!scope.isCancelled()) { 1467 Thread.sleep(20); 1468 } 1469 var subtask2 = scope.fork(() -> "bar"); 1470 scope.join(); 1471 1472 assertEquals(Subtask.State.SUCCESS, subtask1.state()); 1473 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 1474 1475 // Joiner that does not override default methods 1476 Joiner<Object, Void> joiner = () -> null; 1477 assertThrows(NullPointerException.class, () -> joiner.onFork(null)); 1478 assertThrows(NullPointerException.class, () -> joiner.onComplete(null)); 1479 assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1)); 1480 assertFalse(joiner.onFork(subtask2)); 1481 assertFalse(joiner.onComplete(subtask1)); 1482 assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2)); 1483 } 1484 } 1485 1486 /** 1487 * Test Joiners onFork/onComplete methods with a subtask in an unexpected state. 1488 */ 1489 @Test 1490 void testJoinersWithUnavailableResukt() throws Exception { 1491 try (var scope = StructuredTaskScope.open()) { 1492 var done = new CountDownLatch(1); 1493 var subtask = scope.fork(() -> { 1494 done.await(); 1495 return null; 1496 }); 1497 1498 // onComplete with uncompleted task should throw IAE 1499 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1500 assertThrows(IllegalArgumentException.class, 1501 () -> Joiner.allSuccessfulOrThrow().onComplete(subtask)); 1502 assertThrows(IllegalArgumentException.class, 1503 () -> Joiner.anySuccessfulResultOrThrow().onComplete(subtask)); 1504 assertThrows(IllegalArgumentException.class, 1505 () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(subtask)); 1506 assertThrows(IllegalArgumentException.class, 1507 () -> Joiner.awaitAll().onComplete(subtask)); 1508 assertThrows(IllegalArgumentException.class, 1509 () -> Joiner.allUntil(_ -> false).onComplete(subtask)); 1510 1511 done.countDown(); 1512 scope.join(); 1513 1514 // onFork with completed task should throw IAE 1515 assertEquals(Subtask.State.SUCCESS, subtask.state()); 1516 assertThrows(IllegalArgumentException.class, 1517 () -> Joiner.allSuccessfulOrThrow().onFork(subtask)); 1518 assertThrows(IllegalArgumentException.class, 1519 () -> Joiner.anySuccessfulResultOrThrow().onFork(subtask)); 1520 assertThrows(IllegalArgumentException.class, 1521 () -> Joiner.awaitAllSuccessfulOrThrow().onFork(subtask)); 1522 assertThrows(IllegalArgumentException.class, 1523 () -> Joiner.awaitAll().onFork(subtask)); 1524 assertThrows(IllegalArgumentException.class, 1525 () -> Joiner.allUntil(_ -> false).onFork(subtask)); 1526 } 1527 1528 } 1529 1530 /** 1531 * Test the Config function apply method throwing an exception. 1532 */ 1533 @Test 1534 void testConfigFunctionThrows() throws Exception { 1535 assertThrows(FooException.class, 1536 () -> StructuredTaskScope.open(Joiner.awaitAll(), 1537 cf -> { throw new FooException(); })); 1538 } 1539 1540 /** 1541 * Test Config equals/hashCode/toString 1542 */ 1543 @Test 1544 void testConfigMethods() throws Exception { 1545 Function<Config, Config> testConfig = cf -> { 1546 var name = "duke"; 1547 var threadFactory = Thread.ofPlatform().factory(); 1548 var timeout = Duration.ofSeconds(10); 1549 1550 assertEquals(cf, cf); 1551 assertEquals(cf.withName(name), cf.withName(name)); 1552 assertEquals(cf.withThreadFactory(threadFactory), cf.withThreadFactory(threadFactory)); 1553 assertEquals(cf.withTimeout(timeout), cf.withTimeout(timeout)); 1554 1555 assertNotEquals(cf, cf.withName(name)); 1556 assertNotEquals(cf, cf.withThreadFactory(threadFactory)); 1557 assertNotEquals(cf, cf.withTimeout(timeout)); 1558 1559 assertEquals(cf.withName(name).hashCode(), cf.withName(name).hashCode()); 1560 assertEquals(cf.withThreadFactory(threadFactory).hashCode(), 1561 cf.withThreadFactory(threadFactory).hashCode()); 1562 assertEquals(cf.withTimeout(timeout).hashCode(), cf.withTimeout(timeout).hashCode()); 1563 1564 assertTrue(cf.withName(name).toString().contains(name)); 1565 assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString())); 1566 assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString())); 1567 1568 return cf; 1569 }; 1570 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), testConfig)) { 1571 // do nothing 1572 } 1573 } 1574 1575 /** 1576 * Test for NullPointerException. 1577 */ 1578 @Test 1579 void testNulls() throws Exception { 1580 assertThrows(NullPointerException.class, 1581 () -> StructuredTaskScope.open(null)); 1582 assertThrows(NullPointerException.class, 1583 () -> StructuredTaskScope.open(null, cf -> cf)); 1584 assertThrows(NullPointerException.class, 1585 () -> StructuredTaskScope.open(Joiner.awaitAll(), null)); 1586 1587 assertThrows(NullPointerException.class, () -> Joiner.allUntil(null)); 1588 1589 // fork 1590 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 1591 assertThrows(NullPointerException.class, () -> scope.fork((Callable<Object>) null)); 1592 assertThrows(NullPointerException.class, () -> scope.fork((Runnable) null)); 1593 } 1594 1595 // Config and withXXX methods 1596 assertThrows(NullPointerException.class, 1597 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> null)); 1598 assertThrows(NullPointerException.class, 1599 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withName(null))); 1600 assertThrows(NullPointerException.class, 1601 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withThreadFactory(null))); 1602 assertThrows(NullPointerException.class, 1603 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withTimeout(null))); 1604 1605 // Joiner.onFork/onComplete 1606 assertThrows(NullPointerException.class, 1607 () -> Joiner.awaitAllSuccessfulOrThrow().onFork(null)); 1608 assertThrows(NullPointerException.class, 1609 () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(null)); 1610 assertThrows(NullPointerException.class, 1611 () -> Joiner.awaitAll().onFork(null)); 1612 assertThrows(NullPointerException.class, 1613 () -> Joiner.awaitAll().onComplete(null)); 1614 assertThrows(NullPointerException.class, 1615 () -> Joiner.allSuccessfulOrThrow().onFork(null)); 1616 assertThrows(NullPointerException.class, 1617 () -> Joiner.allSuccessfulOrThrow().onComplete(null)); 1618 assertThrows(NullPointerException.class, 1619 () -> Joiner.anySuccessfulResultOrThrow().onFork(null)); 1620 assertThrows(NullPointerException.class, 1621 () -> Joiner.anySuccessfulResultOrThrow().onComplete(null)); 1622 } 1623 1624 /** 1625 * ThreadFactory that counts usage. 1626 */ 1627 private static class CountingThreadFactory implements ThreadFactory { 1628 final ThreadFactory delegate; 1629 final AtomicInteger threadCount = new AtomicInteger(); 1630 CountingThreadFactory(ThreadFactory delegate) { 1631 this.delegate = delegate; 1632 } 1633 @Override 1634 public Thread newThread(Runnable task) { 1635 threadCount.incrementAndGet(); 1636 return delegate.newThread(task); 1637 } 1638 int threadCount() { 1639 return threadCount.get(); 1640 } 1641 } 1642 1643 /** 1644 * A joiner that counts that counts the number of subtasks that are forked and the 1645 * number of subtasks that complete. 1646 */ 1647 private static class CountingJoiner<T> implements Joiner<T, Void> { 1648 final AtomicInteger onForkCount = new AtomicInteger(); 1649 final AtomicInteger onCompleteCount = new AtomicInteger(); 1650 @Override 1651 public boolean onFork(Subtask<? extends T> subtask) { 1652 onForkCount.incrementAndGet(); 1653 return false; 1654 } 1655 @Override 1656 public boolean onComplete(Subtask<? extends T> subtask) { 1657 onCompleteCount.incrementAndGet(); 1658 return false; 1659 } 1660 @Override 1661 public Void result() { 1662 return null; 1663 } 1664 int onForkCount() { 1665 return onForkCount.get(); 1666 } 1667 int onCompleteCount() { 1668 return onCompleteCount.get(); 1669 } 1670 } 1671 1672 /** 1673 * A joiner that cancels execution when a subtask completes. It also keeps a count 1674 * of the number of subtasks that are forked and the number of subtasks that complete. 1675 */ 1676 private static class CancelAfterOneJoiner<T> implements Joiner<T, Void> { 1677 final AtomicInteger onForkCount = new AtomicInteger(); 1678 final AtomicInteger onCompleteCount = new AtomicInteger(); 1679 @Override 1680 public boolean onFork(Subtask<? extends T> subtask) { 1681 onForkCount.incrementAndGet(); 1682 return false; 1683 } 1684 @Override 1685 public boolean onComplete(Subtask<? extends T> subtask) { 1686 onCompleteCount.incrementAndGet(); 1687 return true; 1688 } 1689 @Override 1690 public Void result() { 1691 return null; 1692 } 1693 int onForkCount() { 1694 return onForkCount.get(); 1695 } 1696 int onCompleteCount() { 1697 return onCompleteCount.get(); 1698 } 1699 } 1700 1701 /** 1702 * A runtime exception for tests. 1703 */ 1704 private static class FooException extends RuntimeException { 1705 FooException() { } 1706 FooException(Throwable cause) { super(cause); } 1707 } 1708 1709 /** 1710 * Returns the current time in milliseconds. 1711 */ 1712 private long millisTime() { 1713 long now = System.nanoTime(); 1714 return TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS); 1715 } 1716 1717 /** 1718 * Check the duration of a task 1719 * @param start start time, in milliseconds 1720 * @param min minimum expected duration, in milliseconds 1721 * @param max maximum expected duration, in milliseconds 1722 * @return the duration (now - start), in milliseconds 1723 */ 1724 private long expectDuration(long start, long min, long max) { 1725 long duration = millisTime() - start; 1726 assertTrue(duration >= min, 1727 "Duration " + duration + "ms, expected >= " + min + "ms"); 1728 assertTrue(duration <= max, 1729 "Duration " + duration + "ms, expected <= " + max + "ms"); 1730 return duration; 1731 } 1732 1733 /** 1734 * Interrupts a thread when it waits (timed or untimed) at location "{@code c.m}". 1735 * {@code c} is the fully qualified class name and {@code m} is the method name. 1736 */ 1737 private void interruptThreadAt(Thread target, String location) throws InterruptedException { 1738 int index = location.lastIndexOf('.'); 1739 String className = location.substring(0, index); 1740 String methodName = location.substring(index + 1); 1741 1742 boolean found = false; 1743 while (!found) { 1744 Thread.State state = target.getState(); 1745 assertTrue(state != TERMINATED); 1746 if ((state == WAITING || state == TIMED_WAITING) 1747 && contains(target.getStackTrace(), className, methodName)) { 1748 found = true; 1749 } else { 1750 Thread.sleep(20); 1751 } 1752 } 1753 target.interrupt(); 1754 } 1755 1756 /** 1757 * Schedules the current thread to be interrupted when it waits (timed or untimed) 1758 * at the given location. 1759 */ 1760 private void scheduleInterruptAt(String location) { 1761 Thread target = Thread.currentThread(); 1762 scheduler.submit(() -> { 1763 interruptThreadAt(target, location); 1764 return null; 1765 }); 1766 } 1767 1768 /** 1769 * Returns true if the given stack trace contains an element for the given class 1770 * and method name. 1771 */ 1772 private boolean contains(StackTraceElement[] stack, String className, String methodName) { 1773 return Arrays.stream(stack) 1774 .anyMatch(e -> className.equals(e.getClassName()) 1775 && methodName.equals(e.getMethodName())); 1776 } 1777 }