1 /* 2 * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test id=platform 26 * @bug 8284199 8296779 8306647 27 * @summary Basic tests for StructuredTaskScope 28 * @enablePreview 29 * @run junit/othervm -DthreadFactory=platform StructuredTaskScopeTest 30 */ 31 32 /* 33 * @test id=virtual 34 * @enablePreview 35 * @run junit/othervm -DthreadFactory=virtual StructuredTaskScopeTest 36 */ 37 38 import java.time.Duration; 39 import java.util.Arrays; 40 import java.util.ArrayList; 41 import java.util.List; 42 import java.util.Set; 43 import java.util.NoSuchElementException; 44 import java.util.concurrent.Callable; 45 import java.util.concurrent.ConcurrentHashMap; 46 import java.util.concurrent.CountDownLatch; 47 import java.util.concurrent.Executors; 48 import java.util.concurrent.ExecutionException; 49 import java.util.concurrent.Future; 50 import java.util.concurrent.LinkedTransferQueue; 51 import java.util.concurrent.ThreadFactory; 52 import java.util.concurrent.TimeoutException; 53 import java.util.concurrent.TimeUnit; 54 import java.util.concurrent.RejectedExecutionException; 55 import java.util.concurrent.ScheduledExecutorService; 56 import java.util.concurrent.StructuredTaskScope; 57 import java.util.concurrent.StructuredTaskScope.Config; 58 import java.util.concurrent.StructuredTaskScope.Joiner; 59 import java.util.concurrent.StructuredTaskScope.Subtask; 60 import java.util.concurrent.StructureViolationException; 61 import java.util.concurrent.atomic.AtomicBoolean; 62 import java.util.concurrent.atomic.AtomicInteger; 63 import java.util.function.Function; 64 import java.util.function.Predicate; 65 import java.util.stream.Stream; 66 import static java.lang.Thread.State.*; 67 68 import org.junit.jupiter.api.Test; 69 import org.junit.jupiter.api.BeforeAll; 70 import org.junit.jupiter.api.AfterAll; 71 import org.junit.jupiter.params.ParameterizedTest; 72 import org.junit.jupiter.params.provider.MethodSource; 73 import static org.junit.jupiter.api.Assertions.*; 74 75 class StructuredTaskScopeTest { 76 private static ScheduledExecutorService scheduler; 77 private static List<ThreadFactory> threadFactories; 78 79 @BeforeAll 80 static void setup() throws Exception { 81 scheduler = Executors.newSingleThreadScheduledExecutor(); 82 83 // thread factories 84 String value = System.getProperty("threadFactory"); 85 List<ThreadFactory> list = new ArrayList<>(); 86 if (value == null || value.equals("platform")) 87 list.add(Thread.ofPlatform().factory()); 88 if (value == null || value.equals("virtual")) 89 list.add(Thread.ofVirtual().factory()); 90 assertTrue(list.size() > 0, "No thread factories for tests"); 91 threadFactories = list; 92 } 93 94 @AfterAll 95 static void shutdown() { 96 scheduler.shutdown(); 97 } 98 99 private static Stream<ThreadFactory> factories() { 100 return threadFactories.stream(); 101 } 102 103 /** 104 * Test that fork creates virtual threads when no ThreadFactory is configured. 105 */ 106 @Test 107 void testForkCreateVirtualThread() throws Exception { 108 Set<Thread> threads = ConcurrentHashMap.newKeySet(); 109 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 110 for (int i = 0; i < 50; i++) { 111 // runnable 112 scope.fork(() -> { 113 threads.add(Thread.currentThread()); 114 }); 115 116 // callable 117 scope.fork(() -> { 118 threads.add(Thread.currentThread()); 119 return null; 120 }); 121 } 122 scope.join(); 123 } 124 assertEquals(100, threads.size()); 125 threads.forEach(t -> assertTrue(t.isVirtual())); 126 } 127 128 /** 129 * Test that fork create threads with the configured ThreadFactory. 130 */ 131 @ParameterizedTest 132 @MethodSource("factories") 133 void testForkUsesThreadFactory(ThreadFactory factory) throws Exception { 134 // TheadFactory that keeps reference to all threads it creates 135 class RecordingThreadFactory implements ThreadFactory { 136 final ThreadFactory delegate; 137 final Set<Thread> threads = ConcurrentHashMap.newKeySet(); 138 RecordingThreadFactory(ThreadFactory delegate) { 139 this.delegate = delegate; 140 } 141 @Override 142 public Thread newThread(Runnable task) { 143 Thread thread = delegate.newThread(task); 144 threads.add(thread); 145 return thread; 146 } 147 Set<Thread> threads() { 148 return threads; 149 } 150 } 151 var recordingThreadFactory = new RecordingThreadFactory(factory); 152 Set<Thread> threads = ConcurrentHashMap.newKeySet(); 153 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 154 cf -> cf.withThreadFactory(recordingThreadFactory))) { 155 156 for (int i = 0; i < 50; i++) { 157 // runnable 158 scope.fork(() -> { 159 threads.add(Thread.currentThread()); 160 }); 161 162 // callable 163 scope.fork(() -> { 164 threads.add(Thread.currentThread()); 165 return null; 166 }); 167 } 168 scope.join(); 169 } 170 assertEquals(100, threads.size()); 171 assertEquals(recordingThreadFactory.threads(), threads); 172 } 173 174 /** 175 * Test fork method is owner confined. 176 */ 177 @ParameterizedTest 178 @MethodSource("factories") 179 void testForkConfined(ThreadFactory factory) throws Exception { 180 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(), 181 cf -> cf.withThreadFactory(factory))) { 182 183 // random thread cannot fork 184 try (var pool = Executors.newSingleThreadExecutor()) { 185 Future<Void> future = pool.submit(() -> { 186 assertThrows(WrongThreadException.class, () -> { 187 scope.fork(() -> null); 188 }); 189 return null; 190 }); 191 future.get(); 192 } 193 194 // subtask cannot fork 195 Subtask<Boolean> subtask = scope.fork(() -> { 196 assertThrows(WrongThreadException.class, () -> { 197 scope.fork(() -> null); 198 }); 199 return true; 200 }); 201 scope.join(); 202 assertTrue(subtask.get()); 203 } 204 } 205 206 /** 207 * Test fork after join, no subtasks forked before join. 208 */ 209 @ParameterizedTest 210 @MethodSource("factories") 211 void testForkAfterJoin1(ThreadFactory factory) throws Exception { 212 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 213 cf -> cf.withThreadFactory(factory))) { 214 scope.join(); 215 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar")); 216 } 217 } 218 219 /** 220 * Test fork after join, subtasks forked before join. 221 */ 222 @ParameterizedTest 223 @MethodSource("factories") 224 void testForkAfterJoin2(ThreadFactory factory) throws Exception { 225 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 226 cf -> cf.withThreadFactory(factory))) { 227 scope.fork(() -> "foo"); 228 scope.join(); 229 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar")); 230 } 231 } 232 233 /** 234 * Test fork after join throws. 235 */ 236 @ParameterizedTest 237 @MethodSource("factories") 238 void testForkAfterJoinThrows(ThreadFactory factory) throws Exception { 239 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 240 cf -> cf.withThreadFactory(factory))) { 241 var latch = new CountDownLatch(1); 242 var subtask1 = scope.fork(() -> { 243 latch.await(); 244 return "foo"; 245 }); 246 247 // join throws 248 Thread.currentThread().interrupt(); 249 assertThrows(InterruptedException.class, scope::join); 250 251 // allow subtask1 to finish 252 latch.countDown(); 253 254 // continue to fork 255 var subtask2 = scope.fork(() -> "bar"); 256 scope.join(); 257 assertEquals("foo", subtask1.get()); 258 assertEquals("bar", subtask2.get()); 259 } 260 } 261 262 /** 263 * Test fork after task scope is cancelled. This test uses the cancel method to 264 * cancel execution explicitly. 265 */ 266 @ParameterizedTest 267 @MethodSource("factories") 268 void testForkAfterCancel1(ThreadFactory factory) throws Exception { 269 var countingThreadFactory = new CountingThreadFactory(factory); 270 var testJoiner = new CountingJoiner<String>(); 271 272 try (var scope = StructuredTaskScope.open(testJoiner, 273 cf -> cf.withThreadFactory(countingThreadFactory))) { 274 275 scope.cancel(); 276 277 // fork subtask 278 var subtask = scope.fork(() -> "foo"); 279 280 assertEquals(0, countingThreadFactory.threadCount()); 281 assertEquals(1, testJoiner.onForkCount()); 282 assertEquals(0, testJoiner.onCompleteCount()); 283 284 scope.join(); 285 286 assertEquals(0, countingThreadFactory.threadCount()); 287 assertEquals(1, testJoiner.onForkCount()); 288 assertEquals(0, testJoiner.onCompleteCount()); 289 290 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 291 } 292 } 293 294 295 /** 296 * Test fork after task scope is cancelled. This test uses a custom Joiner to 297 * cancel execution. 298 */ 299 @ParameterizedTest 300 @MethodSource("factories") 301 void testForkAfterCancel2(ThreadFactory factory) throws Exception { 302 var countingThreadFactory = new CountingThreadFactory(factory); 303 var testJoiner = new CancelAfterOneJoiner<String>(); 304 305 try (var scope = StructuredTaskScope.open(testJoiner, 306 cf -> cf.withThreadFactory(countingThreadFactory))) { 307 308 // fork subtask, the scope should be cancelled when the subtask completes 309 var subtask1 = scope.fork(() -> "foo"); 310 while (!scope.isCancelled()) { 311 Thread.sleep(20); 312 } 313 314 assertEquals(1, countingThreadFactory.threadCount()); 315 assertEquals(1, testJoiner.onForkCount()); 316 assertEquals(1, testJoiner.onCompleteCount()); 317 318 // fork second subtask, it should not run 319 var subtask2 = scope.fork(() -> "bar"); 320 321 // onFork should be invoked, newThread and onComplete should not be invoked 322 assertEquals(1, countingThreadFactory.threadCount()); 323 assertEquals(2, testJoiner.onForkCount()); 324 assertEquals(1, testJoiner.onCompleteCount()); 325 326 scope.join(); 327 328 assertEquals(1, countingThreadFactory.threadCount()); 329 assertEquals(2, testJoiner.onForkCount()); 330 assertEquals(1, testJoiner.onCompleteCount()); 331 assertEquals("foo", subtask1.get()); 332 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 333 } 334 } 335 336 /** 337 * Test fork after task scope is closed. 338 */ 339 @Test 340 void testForkAfterClose() { 341 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 342 scope.close(); 343 assertThrows(IllegalStateException.class, () -> scope.fork(() -> null)); 344 } 345 } 346 347 /** 348 * Test fork when the ThreadFactory rejects creating a thread. 349 */ 350 @Test 351 void testForkRejectedExecutionException() { 352 ThreadFactory factory = task -> null; 353 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 354 cf -> cf.withThreadFactory(factory))) { 355 assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null)); 356 } 357 } 358 359 /** 360 * Test join with no subtasks. 361 */ 362 @Test 363 void testJoinWithNoSubtasks() throws Exception { 364 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 365 scope.join(); 366 } 367 } 368 369 /** 370 * Test join with a remaining subtask. 371 */ 372 @ParameterizedTest 373 @MethodSource("factories") 374 void testJoinWithRemainingSubtasks(ThreadFactory factory) throws Exception { 375 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 376 cf -> cf.withThreadFactory(factory))) { 377 Subtask<String> subtask = scope.fork(() -> { 378 Thread.sleep(Duration.ofMillis(100)); 379 return "foo"; 380 }); 381 scope.join(); 382 assertEquals("foo", subtask.get()); 383 } 384 } 385 386 /** 387 * Test repeated calls to join. 388 */ 389 @Test 390 void testJoinAfterJoin() throws Exception { 391 var results = new LinkedTransferQueue<>(List.of("foo", "bar", "baz")); 392 Joiner<Object, String> joiner = results::take; 393 try (var scope = StructuredTaskScope.open(joiner)) { 394 scope.fork(() -> "foo"); 395 396 // each call to join should invoke Joiner::result 397 assertEquals("foo", scope.join()); 398 assertEquals("bar", scope.join()); 399 assertEquals("baz", scope.join()); 400 } 401 } 402 403 /** 404 * Test join method is owner confined. 405 */ 406 @ParameterizedTest 407 @MethodSource("factories") 408 void testJoinConfined(ThreadFactory factory) throws Exception { 409 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(), 410 cf -> cf.withThreadFactory(factory))) { 411 412 // random thread cannot join 413 try (var pool = Executors.newSingleThreadExecutor()) { 414 Future<Void> future = pool.submit(() -> { 415 assertThrows(WrongThreadException.class, scope::join); 416 return null; 417 }); 418 future.get(); 419 } 420 421 // subtask cannot join 422 Subtask<Boolean> subtask = scope.fork(() -> { 423 assertThrows(WrongThreadException.class, () -> { scope.join(); }); 424 return true; 425 }); 426 scope.join(); 427 assertTrue(subtask.get()); 428 } 429 } 430 431 /** 432 * Test join with interrupt status set. 433 */ 434 @ParameterizedTest 435 @MethodSource("factories") 436 void testInterruptJoin1(ThreadFactory factory) throws Exception { 437 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 438 cf -> cf.withThreadFactory(factory))) { 439 440 var latch = new CountDownLatch(1); 441 442 Subtask<String> subtask = scope.fork(() -> { 443 latch.await(); 444 return "foo"; 445 }); 446 447 // join should throw 448 Thread.currentThread().interrupt(); 449 try { 450 scope.join(); 451 fail("join did not throw"); 452 } catch (InterruptedException expected) { 453 assertFalse(Thread.interrupted()); // interrupt status should be cleared 454 } finally { 455 // let task continue 456 latch.countDown(); 457 } 458 459 // join should complete 460 scope.join(); 461 assertEquals("foo", subtask.get()); 462 } 463 } 464 465 /** 466 * Test interrupt of thread blocked in join. 467 */ 468 @ParameterizedTest 469 @MethodSource("factories") 470 void testInterruptJoin2(ThreadFactory factory) throws Exception { 471 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 472 cf -> cf.withThreadFactory(factory))) { 473 474 var latch = new CountDownLatch(1); 475 Subtask<String> subtask = scope.fork(() -> { 476 latch.await(); 477 return "foo"; 478 }); 479 480 // join should throw 481 scheduleInterruptAt("java.util.concurrent.StructuredTaskScope.join"); 482 try { 483 scope.join(); 484 fail("join did not throw"); 485 } catch (InterruptedException expected) { 486 assertFalse(Thread.interrupted()); // interrupt status should be clear 487 } finally { 488 // let task continue 489 latch.countDown(); 490 } 491 492 // join should complete 493 scope.join(); 494 assertEquals("foo", subtask.get()); 495 } 496 } 497 498 /** 499 * Test join when scope is cancelled. 500 */ 501 @ParameterizedTest 502 @MethodSource("factories") 503 void testJoinWhenCancelled(ThreadFactory factory) throws Exception { 504 var countingThreadFactory = new CountingThreadFactory(factory); 505 var testJoiner = new CancelAfterOneJoiner<String>(); 506 507 try (var scope = StructuredTaskScope.open(testJoiner, 508 cf -> cf.withThreadFactory(countingThreadFactory))) { 509 510 // fork subtask, the scope should be cancelled when the subtask completes 511 var subtask1 = scope.fork(() -> "foo"); 512 while (!scope.isCancelled()) { 513 Thread.sleep(20); 514 } 515 516 // fork second subtask, it should not run 517 var subtask2 = scope.fork(() -> { 518 Thread.sleep(Duration.ofDays(1)); 519 return "bar"; 520 }); 521 522 scope.join(); 523 524 assertEquals("foo", subtask1.get()); 525 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 526 } 527 } 528 529 /** 530 * Test join after scope is closed. 531 */ 532 @Test 533 void testJoinAfterClose() throws Exception { 534 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 535 scope.close(); 536 assertThrows(IllegalStateException.class, () -> scope.join()); 537 } 538 } 539 540 /** 541 * Test join with timeout, subtasks finish before timeout expires. 542 */ 543 @ParameterizedTest 544 @MethodSource("factories") 545 void testJoinWithTimeout1(ThreadFactory factory) throws Exception { 546 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 547 cf -> cf.withThreadFactory(factory) 548 .withTimeout(Duration.ofDays(1)))) { 549 550 Subtask<String> subtask = scope.fork(() -> { 551 Thread.sleep(Duration.ofSeconds(1)); 552 return "foo"; 553 }); 554 555 scope.join(); 556 557 assertFalse(scope.isCancelled()); 558 assertEquals("foo", subtask.get()); 559 } 560 } 561 562 /** 563 * Test join with timeout, timeout expires before subtasks finish. 564 */ 565 @ParameterizedTest 566 @MethodSource("factories") 567 void testJoinWithTimeout2(ThreadFactory factory) throws Exception { 568 long startMillis = millisTime(); 569 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 570 cf -> cf.withThreadFactory(factory) 571 .withTimeout(Duration.ofSeconds(2)))) { 572 573 Subtask<Void> subtask = scope.fork(() -> { 574 Thread.sleep(Duration.ofDays(1)); 575 return null; 576 }); 577 578 try { 579 scope.join(); 580 fail(); 581 } catch (ExecutionException e) { 582 assertTrue(e.getCause() instanceof TimeoutException); 583 } 584 expectDuration(startMillis, /*min*/1900, /*max*/20_000); 585 586 assertTrue(scope.isCancelled()); 587 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 588 } 589 } 590 591 /** 592 * Test join with timeout that has already expired. 593 */ 594 @ParameterizedTest 595 @MethodSource("factories") 596 void testJoinWithTimeout3(ThreadFactory factory) throws Exception { 597 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 598 cf -> cf.withThreadFactory(factory) 599 .withTimeout(Duration.ofSeconds(-1)))) { 600 601 Subtask<Void> subtask = scope.fork(() -> { 602 Thread.sleep(Duration.ofDays(1)); 603 return null; 604 }); 605 606 try { 607 scope.join(); 608 fail(); 609 } catch (ExecutionException e) { 610 assertTrue(e.getCause() instanceof TimeoutException); 611 } 612 assertTrue(scope.isCancelled()); 613 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 614 } 615 } 616 617 /** 618 * Test that cancelling execution interrupts unfinished threads. This test uses the 619 * cancel method to cancel execution explicitly. 620 */ 621 @ParameterizedTest 622 @MethodSource("factories") 623 void testCancelInterruptsThreads1(ThreadFactory factory) throws Exception { 624 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 625 cf -> cf.withThreadFactory(factory))) { 626 627 // fork subtask that runs for a long time 628 var started = new CountDownLatch(1); 629 var interrupted = new CountDownLatch(1); 630 var subtask = scope.fork(() -> { 631 started.countDown(); 632 try { 633 Thread.sleep(Duration.ofDays(1)); 634 } catch (InterruptedException e) { 635 interrupted.countDown(); 636 } 637 }); 638 started.await(); 639 640 scope.cancel(); 641 642 // subtask should be interrupted 643 interrupted.await(); 644 645 scope.join(); 646 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 647 } 648 } 649 650 /** 651 * Test that cancelling execution interrupts unfinished threads. This test uses 652 * a custom Joiner to cancel execution. 653 */ 654 @ParameterizedTest 655 @MethodSource("factories") 656 void testCancelInterruptsThreads2(ThreadFactory factory) throws Exception { 657 var testJoiner = new CancelAfterOneJoiner<String>(); 658 659 try (var scope = StructuredTaskScope.open(testJoiner, 660 cf -> cf.withThreadFactory(factory))) { 661 662 // fork subtask1 that runs for a long time 663 var started = new CountDownLatch(1); 664 var interrupted = new CountDownLatch(1); 665 var subtask1 = scope.fork(() -> { 666 started.countDown(); 667 try { 668 Thread.sleep(Duration.ofDays(1)); 669 } catch (InterruptedException e) { 670 interrupted.countDown(); 671 } 672 }); 673 started.await(); 674 675 // fork subtask2, the scope should be cancelled when the subtask completes 676 var subtask2 = scope.fork(() -> "bar"); 677 while (!scope.isCancelled()) { 678 Thread.sleep(20); 679 } 680 681 // subtask1 should be interrupted 682 interrupted.await(); 683 684 scope.join(); 685 assertEquals(Subtask.State.UNAVAILABLE, subtask1.state()); 686 assertEquals("bar", subtask2.get()); 687 } 688 } 689 690 /** 691 * Test that timeout interrupts unfinished threads. 692 */ 693 @ParameterizedTest 694 @MethodSource("factories") 695 void testTimeoutInterruptsThreads(ThreadFactory factory) throws Exception { 696 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 697 cf -> cf.withThreadFactory(factory) 698 .withTimeout(Duration.ofSeconds(2)))) { 699 700 var started = new AtomicBoolean(); 701 var interrupted = new CountDownLatch(1); 702 Subtask<Void> subtask = scope.fork(() -> { 703 started.set(true); 704 try { 705 Thread.sleep(Duration.ofDays(1)); 706 } catch (InterruptedException e) { 707 interrupted.countDown(); 708 } 709 return null; 710 }); 711 712 while (!scope.isCancelled()) { 713 Thread.sleep(50); 714 } 715 716 // if subtask started then it should be interrupted 717 if (started.get()) { 718 interrupted.await(); 719 } 720 721 try { 722 scope.join(); 723 fail(); 724 } catch (ExecutionException e) { 725 assertTrue(e.getCause() instanceof TimeoutException); 726 } 727 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 728 } 729 } 730 731 /** 732 * Test close without join, no subtasks forked. 733 */ 734 @Test 735 void testCloseWithoutJoin1() { 736 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 737 // do nothing 738 } 739 } 740 741 /** 742 * Test close without join, subtasks forked. 743 */ 744 @ParameterizedTest 745 @MethodSource("factories") 746 void testCloseWithoutJoin2(ThreadFactory factory) { 747 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 748 cf -> cf.withThreadFactory(factory))) { 749 Subtask<String> subtask = scope.fork(() -> { 750 Thread.sleep(Duration.ofDays(1)); 751 return null; 752 }); 753 assertThrows(IllegalStateException.class, scope::close); 754 755 // subtask result/exception not available 756 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 757 assertThrows(IllegalStateException.class, subtask::get); 758 assertThrows(IllegalStateException.class, subtask::exception); 759 } 760 } 761 762 /** 763 * Test close after join throws. Close should not throw as join attempted. 764 */ 765 @ParameterizedTest 766 @MethodSource("factories") 767 void testCloseAfterJoinThrows(ThreadFactory factory) throws Exception { 768 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 769 cf -> cf.withThreadFactory(factory))) { 770 var subtask = scope.fork(() -> { 771 Thread.sleep(Duration.ofDays(1)); 772 return null; 773 }); 774 775 // join throws 776 Thread.currentThread().interrupt(); 777 assertThrows(InterruptedException.class, scope::join); 778 assertThrows(IllegalStateException.class, subtask::get); 779 780 } // close should not throw 781 } 782 783 /** 784 * Test close method is owner confined. 785 */ 786 @ParameterizedTest 787 @MethodSource("factories") 788 void testCloseConfined(ThreadFactory factory) throws Exception { 789 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(), 790 cf -> cf.withThreadFactory(factory))) { 791 792 // random thread cannot close scope 793 try (var pool = Executors.newCachedThreadPool(factory)) { 794 Future<Boolean> future = pool.submit(() -> { 795 assertThrows(WrongThreadException.class, scope::close); 796 return null; 797 }); 798 future.get(); 799 } 800 801 // subtask cannot close 802 Subtask<Boolean> subtask = scope.fork(() -> { 803 assertThrows(WrongThreadException.class, scope::close); 804 return true; 805 }); 806 scope.join(); 807 assertTrue(subtask.get()); 808 } 809 } 810 811 /** 812 * Test close with interrupt status set. 813 */ 814 @ParameterizedTest 815 @MethodSource("factories") 816 void testInterruptClose1(ThreadFactory factory) throws Exception { 817 var testJoiner = new CancelAfterOneJoiner<String>(); 818 try (var scope = StructuredTaskScope.open(testJoiner, 819 cf -> cf.withThreadFactory(factory))) { 820 821 // fork first subtask, a straggler as it continues after being interrupted 822 var started = new CountDownLatch(1); 823 var done = new AtomicBoolean(); 824 scope.fork(() -> { 825 started.countDown(); 826 try { 827 Thread.sleep(Duration.ofDays(1)); 828 } catch (InterruptedException e) { 829 // interrupted by cancel, expected 830 } 831 Thread.sleep(Duration.ofMillis(100)); // force close to wait 832 done.set(true); 833 return null; 834 }); 835 started.await(); 836 837 // fork second subtask, the scope should be cancelled when this subtask completes 838 scope.fork(() -> "bar"); 839 while (!scope.isCancelled()) { 840 Thread.sleep(20); 841 } 842 843 scope.join(); 844 845 // invoke close with interrupt status set 846 Thread.currentThread().interrupt(); 847 try { 848 scope.close(); 849 } finally { 850 assertTrue(Thread.interrupted()); // clear interrupt status 851 assertTrue(done.get()); 852 } 853 } 854 } 855 856 /** 857 * Test interrupting thread waiting in close. 858 */ 859 @ParameterizedTest 860 @MethodSource("factories") 861 void testInterruptClose2(ThreadFactory factory) throws Exception { 862 var testJoiner = new CancelAfterOneJoiner<String>(); 863 try (var scope = StructuredTaskScope.open(testJoiner, 864 cf -> cf.withThreadFactory(factory))) { 865 866 Thread mainThread = Thread.currentThread(); 867 868 // fork first subtask, a straggler as it continues after being interrupted 869 var started = new CountDownLatch(1); 870 var done = new AtomicBoolean(); 871 scope.fork(() -> { 872 started.countDown(); 873 try { 874 Thread.sleep(Duration.ofDays(1)); 875 } catch (InterruptedException e) { 876 // interrupted by cancel, expected 877 } 878 879 // interrupt main thread when it blocks in close 880 interruptThreadAt(mainThread, "java.util.concurrent.StructuredTaskScope.close"); 881 882 Thread.sleep(Duration.ofMillis(100)); // force close to wait 883 done.set(true); 884 return null; 885 }); 886 started.await(); 887 888 // fork second subtask, the scope should be cancelled when this subtask completes 889 scope.fork(() -> "bar"); 890 while (!scope.isCancelled()) { 891 Thread.sleep(20); 892 } 893 894 scope.join(); 895 896 // main thread will be interrupted while blocked in close 897 try { 898 scope.close(); 899 } finally { 900 assertTrue(Thread.interrupted()); // clear interrupt status 901 assertTrue(done.get()); 902 } 903 } 904 } 905 906 /** 907 * Test that closing an enclosing scope closes the thread flock of a nested scope. 908 */ 909 @Test 910 void testCloseThrowsStructureViolation() throws Exception { 911 try (var scope1 = StructuredTaskScope.open(Joiner.awaitAll())) { 912 try (var scope2 = StructuredTaskScope.open(Joiner.awaitAll())) { 913 914 // close enclosing scope 915 try { 916 scope1.close(); 917 fail("close did not throw"); 918 } catch (StructureViolationException expected) { } 919 920 // underlying flock should be closed 921 var executed = new AtomicBoolean(); 922 Subtask<?> subtask = scope2.fork(() -> executed.set(true)); 923 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 924 scope2.join(); 925 assertFalse(executed.get()); 926 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 927 } 928 } 929 } 930 931 932 /** 933 * Test cancel after task scope is cancelled. 934 */ 935 @Test 936 void testCancelAfterCancel() { 937 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 938 scope.cancel(); 939 } 940 } 941 942 /** 943 * Test cancel after task scope is closed. 944 */ 945 @Test 946 void testCancelAfterClose() { 947 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 948 assertFalse(scope.isCancelled()); 949 scope.cancel(); 950 assertTrue(scope.isCancelled()); 951 scope.cancel(); 952 assertTrue(scope.isCancelled()); 953 } 954 } 955 956 /** 957 * Test cancel method is owner confined. 958 */ 959 @ParameterizedTest 960 @MethodSource("factories") 961 void testCancelConfined(ThreadFactory factory) throws Exception { 962 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(), 963 cf -> cf.withThreadFactory(factory))) { 964 965 // random thread cannot cancel scope 966 try (var pool = Executors.newCachedThreadPool(factory)) { 967 Future<Boolean> future = pool.submit(() -> { 968 assertThrows(WrongThreadException.class, scope::cancel); 969 return null; 970 }); 971 future.get(); 972 } 973 974 // subtask cannot cancel 975 Subtask<Boolean> subtask = scope.fork(() -> { 976 assertThrows(WrongThreadException.class, scope::cancel); 977 return true; 978 }); 979 scope.join(); 980 assertTrue(subtask.get()); 981 } 982 } 983 984 /** 985 * Test that isCancelled returns true after close. 986 */ 987 @Test 988 void testIsCancelledAfterClose() throws Exception { 989 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 990 assertFalse(scope.isCancelled()); 991 scope.close(); 992 assertTrue(scope.isCancelled()); 993 } 994 } 995 996 /** 997 * Test Joiner.onFork throwing exception. 998 */ 999 @Test 1000 void testOnForkThrows() throws Exception { 1001 var joiner = new Joiner<String, Void>() { 1002 @Override 1003 public boolean onFork(Subtask<? extends String> subtask) { 1004 throw new FooException(); 1005 } 1006 @Override 1007 public Void result() { 1008 return null; 1009 } 1010 }; 1011 try (var scope = StructuredTaskScope.open(joiner)) { 1012 assertThrows(FooException.class, () -> scope.fork(() -> "foo")); 1013 } 1014 } 1015 1016 /** 1017 * Test Joiner.onFork returning true to cancel execution. 1018 */ 1019 @Test 1020 void testOnForkCancelsExecution() throws Exception { 1021 var joiner = new Joiner<String, Void>() { 1022 @Override 1023 public boolean onFork(Subtask<? extends String> subtask) { 1024 return true; 1025 } 1026 @Override 1027 public Void result() { 1028 return null; 1029 } 1030 }; 1031 try (var scope = StructuredTaskScope.open(joiner)) { 1032 assertFalse(scope.isCancelled()); 1033 scope.fork(() -> "foo"); 1034 assertTrue(scope.isCancelled()); 1035 scope.join(); 1036 } 1037 } 1038 1039 /** 1040 * Test Joiner.onComplete returning true to cancel execution. 1041 */ 1042 @Test 1043 void testOnCompleteCancelsExecution() throws Exception { 1044 var joiner = new Joiner<String, Void>() { 1045 @Override 1046 public boolean onComplete(Subtask<? extends String> subtask) { 1047 return true; 1048 } 1049 @Override 1050 public Void result() { 1051 return null; 1052 } 1053 }; 1054 try (var scope = StructuredTaskScope.open(joiner)) { 1055 assertFalse(scope.isCancelled()); 1056 scope.fork(() -> "foo"); 1057 while (!scope.isCancelled()) { 1058 Thread.sleep(10); 1059 } 1060 scope.join(); 1061 } 1062 } 1063 1064 /** 1065 * Test toString. 1066 */ 1067 @Test 1068 void testToString() throws Exception { 1069 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 1070 cf -> cf.withName("duke"))) { 1071 1072 // open 1073 assertTrue(scope.toString().contains("duke")); 1074 1075 // closed 1076 scope.close(); 1077 assertTrue(scope.toString().contains("duke")); 1078 } 1079 } 1080 1081 /** 1082 * Test Subtask with task that completes successfully. 1083 */ 1084 @ParameterizedTest 1085 @MethodSource("factories") 1086 void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception { 1087 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 1088 cf -> cf.withThreadFactory(factory))) { 1089 1090 Subtask<String> subtask = scope.fork(() -> "foo"); 1091 1092 // before join 1093 assertThrows(IllegalStateException.class, subtask::get); 1094 assertThrows(IllegalStateException.class, subtask::exception); 1095 1096 scope.join(); 1097 1098 // after join 1099 assertEquals(Subtask.State.SUCCESS, subtask.state()); 1100 assertEquals("foo", subtask.get()); 1101 assertThrows(IllegalStateException.class, subtask::exception); 1102 } 1103 } 1104 1105 /** 1106 * Test Subtask with task that fails. 1107 */ 1108 @ParameterizedTest 1109 @MethodSource("factories") 1110 void testSubtaskWhenFailed(ThreadFactory factory) throws Exception { 1111 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 1112 cf -> cf.withThreadFactory(factory))) { 1113 1114 Subtask<String> subtask = scope.fork(() -> { throw new FooException(); }); 1115 1116 // before join 1117 assertThrows(IllegalStateException.class, subtask::get); 1118 assertThrows(IllegalStateException.class, subtask::exception); 1119 1120 scope.join(); 1121 1122 // after join 1123 assertEquals(Subtask.State.FAILED, subtask.state()); 1124 assertThrows(IllegalStateException.class, subtask::get); 1125 assertTrue(subtask.exception() instanceof FooException); 1126 } 1127 } 1128 1129 /** 1130 * Test Subtask with a task that has not completed. 1131 */ 1132 @ParameterizedTest 1133 @MethodSource("factories") 1134 void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception { 1135 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 1136 cf -> cf.withThreadFactory(factory))) { 1137 Subtask<Void> subtask = scope.fork(() -> { 1138 Thread.sleep(Duration.ofDays(1)); 1139 return null; 1140 }); 1141 1142 // before join 1143 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1144 assertThrows(IllegalStateException.class, subtask::get); 1145 assertThrows(IllegalStateException.class, subtask::exception); 1146 1147 // attempt join, join throws 1148 Thread.currentThread().interrupt(); 1149 assertThrows(InterruptedException.class, scope::join); 1150 1151 // after join 1152 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1153 assertThrows(IllegalStateException.class, subtask::get); 1154 assertThrows(IllegalStateException.class, subtask::exception); 1155 } 1156 } 1157 1158 /** 1159 * Test Subtask forked after execution cancelled. 1160 */ 1161 @ParameterizedTest 1162 @MethodSource("factories") 1163 void testSubtaskWhenCancelled(ThreadFactory factory) throws Exception { 1164 try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) { 1165 scope.fork(() -> "foo"); 1166 while (!scope.isCancelled()) { 1167 Thread.sleep(20); 1168 } 1169 1170 var subtask = scope.fork(() -> "foo"); 1171 1172 // before join 1173 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1174 assertThrows(IllegalStateException.class, subtask::get); 1175 assertThrows(IllegalStateException.class, subtask::exception); 1176 1177 scope.join(); 1178 1179 // after join 1180 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1181 assertThrows(IllegalStateException.class, subtask::get); 1182 assertThrows(IllegalStateException.class, subtask::exception); 1183 } 1184 } 1185 1186 /** 1187 * Test Subtask::toString. 1188 */ 1189 @Test 1190 void testSubtaskToString() throws Exception { 1191 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 1192 var latch = new CountDownLatch(1); 1193 var subtask1 = scope.fork(() -> { 1194 latch.await(); 1195 return "foo"; 1196 }); 1197 var subtask2 = scope.fork(() -> { throw new FooException(); }); 1198 1199 // subtask1 result is unavailable 1200 assertTrue(subtask1.toString().contains("Unavailable")); 1201 latch.countDown(); 1202 1203 scope.join(); 1204 1205 assertTrue(subtask1.toString().contains("Completed successfully")); 1206 assertTrue(subtask2.toString().contains("Failed")); 1207 } 1208 } 1209 1210 /** 1211 * Test Joiner.allSuccessfulOrThrow() with no subtasks. 1212 */ 1213 @Test 1214 void testAllSuccessfulOrThrow1() throws Throwable { 1215 try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow())) { 1216 var subtasks = scope.join().toList(); 1217 assertTrue(subtasks.isEmpty()); 1218 } 1219 } 1220 1221 /** 1222 * Test Joiner.allSuccessfulOrThrow() with subtasks that complete successfully. 1223 */ 1224 @ParameterizedTest 1225 @MethodSource("factories") 1226 void testAllSuccessfulOrThrow2(ThreadFactory factory) throws Throwable { 1227 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(), 1228 cf -> cf.withThreadFactory(factory))) { 1229 var subtask1 = scope.fork(() -> "foo"); 1230 var subtask2 = scope.fork(() -> "bar"); 1231 var subtasks = scope.join().toList(); 1232 assertEquals(List.of(subtask1, subtask2), subtasks); 1233 assertEquals("foo", subtask1.get()); 1234 assertEquals("bar", subtask2.get()); 1235 } 1236 } 1237 1238 /** 1239 * Test Joiner.allSuccessfulOrThrow() with a subtask that complete successfully and 1240 * a subtask that fails. 1241 */ 1242 @ParameterizedTest 1243 @MethodSource("factories") 1244 void testAllSuccessfulOrThrow3(ThreadFactory factory) throws Throwable { 1245 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(), 1246 cf -> cf.withThreadFactory(factory))) { 1247 scope.fork(() -> "foo"); 1248 scope.fork(() -> { throw new FooException(); }); 1249 try { 1250 scope.join(); 1251 } catch (ExecutionException e) { 1252 assertTrue(e.getCause() instanceof FooException); 1253 } 1254 } 1255 } 1256 1257 /** 1258 * Test Joiner.anySuccessfulResultOrThrow() with no subtasks. 1259 */ 1260 @Test 1261 void testAnySuccessfulResultOrThrow1() throws Exception { 1262 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) { 1263 try { 1264 scope.join(); 1265 } catch (ExecutionException e) { 1266 assertTrue(e.getCause() instanceof NoSuchElementException); 1267 } 1268 } 1269 } 1270 1271 /** 1272 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully. 1273 */ 1274 @ParameterizedTest 1275 @MethodSource("factories") 1276 void testAnySuccessfulResultOrThrow2(ThreadFactory factory) throws Exception { 1277 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(), 1278 cf -> cf.withThreadFactory(factory))) { 1279 scope.fork(() -> "foo"); 1280 String result = scope.join(); 1281 assertEquals("foo", result); 1282 } 1283 } 1284 1285 /** 1286 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully 1287 * with a null result. 1288 */ 1289 @ParameterizedTest 1290 @MethodSource("factories") 1291 void testAnySuccessfulResultOrThrow3(ThreadFactory factory) throws Exception { 1292 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(), 1293 cf -> cf.withThreadFactory(factory))) { 1294 scope.fork(() -> null); 1295 String result = scope.join(); 1296 assertNull(result); 1297 } 1298 } 1299 1300 /** 1301 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that complete succcessfully 1302 * and a subtask that fails. 1303 */ 1304 @ParameterizedTest 1305 @MethodSource("factories") 1306 void testAnySuccessfulResultOrThrow4(ThreadFactory factory) throws Exception { 1307 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(), 1308 cf -> cf.withThreadFactory(factory))) { 1309 scope.fork(() -> "foo"); 1310 scope.fork(() -> { throw new FooException(); }); 1311 String first = scope.join(); 1312 assertEquals("foo", first); 1313 } 1314 } 1315 1316 /** 1317 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that fails. 1318 */ 1319 @ParameterizedTest 1320 @MethodSource("factories") 1321 void testAnySuccessfulResultOrThrow5(ThreadFactory factory) throws Exception { 1322 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(), 1323 cf -> cf.withThreadFactory(factory))) { 1324 scope.fork(() -> { throw new FooException(); }); 1325 Throwable ex = assertThrows(ExecutionException.class, scope::join); 1326 assertTrue(ex.getCause() instanceof FooException); 1327 } 1328 } 1329 1330 /** 1331 * Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks. 1332 */ 1333 @Test 1334 void testIgnoreSuccessfulOrThrow1() throws Throwable { 1335 try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) { 1336 var result = scope.join(); 1337 assertNull(result); 1338 } 1339 } 1340 1341 /** 1342 * Test Joiner.awaitAllSuccessfulOrThrow() with subtasks that complete successfully. 1343 */ 1344 @ParameterizedTest 1345 @MethodSource("factories") 1346 void testIgnoreSuccessfulOrThrow2(ThreadFactory factory) throws Throwable { 1347 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(), 1348 cf -> cf.withThreadFactory(factory))) { 1349 var subtask1 = scope.fork(() -> "foo"); 1350 var subtask2 = scope.fork(() -> "bar"); 1351 var result = scope.join(); 1352 assertNull(result); 1353 assertEquals("foo", subtask1.get()); 1354 assertEquals("bar", subtask2.get()); 1355 } 1356 } 1357 1358 /** 1359 * Test Joiner.awaitAllSuccessfulOrThrow() with a subtask that complete successfully and 1360 * a subtask that fails. 1361 */ 1362 @ParameterizedTest 1363 @MethodSource("factories") 1364 void testIgnoreSuccessfulOrThrow3(ThreadFactory factory) throws Throwable { 1365 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(), 1366 cf -> cf.withThreadFactory(factory))) { 1367 scope.fork(() -> "foo"); 1368 scope.fork(() -> { throw new FooException(); }); 1369 try { 1370 scope.join(); 1371 } catch (ExecutionException e) { 1372 assertTrue(e.getCause() instanceof FooException); 1373 } 1374 } 1375 } 1376 1377 /** 1378 * Test Joiner.awaitAll() with no subtasks. 1379 */ 1380 @Test 1381 void testIgnoreAll1() throws Throwable { 1382 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 1383 var result = scope.join(); 1384 assertNull(result); 1385 } 1386 } 1387 1388 /** 1389 * Test Joiner.awaitAll() with subtasks that complete successfully. 1390 */ 1391 @ParameterizedTest 1392 @MethodSource("factories") 1393 void testIgnoreAll2(ThreadFactory factory) throws Throwable { 1394 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 1395 cf -> cf.withThreadFactory(factory))) { 1396 var subtask1 = scope.fork(() -> "foo"); 1397 var subtask2 = scope.fork(() -> "bar"); 1398 var result = scope.join(); 1399 assertNull(result); 1400 assertEquals("foo", subtask1.get()); 1401 assertEquals("bar", subtask2.get()); 1402 } 1403 } 1404 1405 /** 1406 * Test Joiner.awaitAll() with a subtask that complete successfully and a subtask 1407 * that fails. 1408 */ 1409 @ParameterizedTest 1410 @MethodSource("factories") 1411 void testIgnoreAll3(ThreadFactory factory) throws Throwable { 1412 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 1413 cf -> cf.withThreadFactory(factory))) { 1414 var subtask1 = scope.fork(() -> "foo"); 1415 var subtask2 = scope.fork(() -> { throw new FooException(); }); 1416 var result = scope.join(); 1417 assertNull(result); 1418 assertEquals("foo", subtask1.get()); 1419 assertTrue(subtask2.exception() instanceof FooException); 1420 } 1421 } 1422 1423 /** 1424 * Test Joiner.all(Predicate) with no subtasks. 1425 */ 1426 @Test 1427 void testAllWithPredicate1() throws Throwable { 1428 try (var scope = StructuredTaskScope.open(Joiner.all(s -> false))) { 1429 var subtasks = scope.join(); 1430 assertEquals(0, subtasks.count()); 1431 } 1432 } 1433 1434 /** 1435 * Test Joiner.all(Predicate) with no cancellation. 1436 */ 1437 @ParameterizedTest 1438 @MethodSource("factories") 1439 void testAllWithPredicate2(ThreadFactory factory) throws Exception { 1440 try (var scope = StructuredTaskScope.open(Joiner.<String>all(s -> false), 1441 cf -> cf.withThreadFactory(factory))) { 1442 1443 var subtask1 = scope.fork(() -> "foo"); 1444 var subtask2 = scope.fork(() -> { throw new FooException(); }); 1445 1446 var subtasks = scope.join().toList(); 1447 assertEquals(2, subtasks.size()); 1448 1449 assertTrue(subtasks.get(0) == subtask1); 1450 assertTrue(subtasks.get(1) == subtask2); 1451 assertEquals("foo", subtask1.get()); 1452 assertTrue(subtask2.exception() instanceof FooException); 1453 } 1454 } 1455 1456 /** 1457 * Test Joiner.all(Predicate) with cancellation after one subtask completes. 1458 */ 1459 @ParameterizedTest 1460 @MethodSource("factories") 1461 void testAllWithPredicate3(ThreadFactory factory) throws Exception { 1462 try (var scope = StructuredTaskScope.open(Joiner.<String>all(s -> true), 1463 cf -> cf.withThreadFactory(factory))) { 1464 1465 var subtask1 = scope.fork(() -> "foo"); 1466 var subtask2 = scope.fork(() -> { 1467 Thread.sleep(Duration.ofDays(1)); 1468 return "bar"; 1469 }); 1470 1471 var subtasks = scope.join().toList(); 1472 1473 assertEquals(2, subtasks.size()); 1474 assertTrue(subtasks.get(0) == subtask1); 1475 assertTrue(subtasks.get(1) == subtask2); 1476 assertEquals("foo", subtask1.get()); 1477 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 1478 } 1479 } 1480 1481 /** 1482 * Test Joiner.all(Predicate) with cancellation after serveral subtasks complete. 1483 */ 1484 @ParameterizedTest 1485 @MethodSource("factories") 1486 void testAllWithPredicate4(ThreadFactory factory) throws Exception { 1487 1488 // cancel execution after two or more failures 1489 class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> { 1490 final AtomicInteger failedCount = new AtomicInteger(); 1491 @Override 1492 public boolean test(Subtask<? extends T> subtask) { 1493 return subtask.state() == Subtask.State.FAILED 1494 && failedCount.incrementAndGet() >= 2; 1495 } 1496 } 1497 var joiner = Joiner.all(new CancelAfterTwoFailures<String>()); 1498 1499 try (var scope = StructuredTaskScope.open(joiner)) { 1500 int forkCount = 0; 1501 1502 // fork subtasks until execution cancelled 1503 while (!scope.isCancelled()) { 1504 scope.fork(() -> "foo"); 1505 scope.fork(() -> { throw new FooException(); }); 1506 forkCount += 2; 1507 Thread.sleep(Duration.ofMillis(10)); 1508 } 1509 1510 var subtasks = scope.join().toList(); 1511 assertEquals(forkCount, subtasks.size()); 1512 1513 long failedCount = subtasks.stream() 1514 .filter(s -> s.state() == Subtask.State.FAILED) 1515 .count(); 1516 assertTrue(failedCount >= 2); 1517 } 1518 } 1519 1520 /** 1521 * Test Config equals/hashCode/toString 1522 */ 1523 @Test 1524 void testConfigMethods() throws Exception { 1525 Function<Config, Config> testConfig = cf -> { 1526 var name = "duke"; 1527 var threadFactory = Thread.ofPlatform().factory(); 1528 var timeout = Duration.ofSeconds(10); 1529 1530 assertEquals(cf, cf); 1531 assertEquals(cf.withName(name), cf.withName(name)); 1532 assertEquals(cf.withThreadFactory(threadFactory), cf.withThreadFactory(threadFactory)); 1533 assertEquals(cf.withTimeout(timeout), cf.withTimeout(timeout)); 1534 1535 assertNotEquals(cf, cf.withName(name)); 1536 assertNotEquals(cf, cf.withThreadFactory(threadFactory)); 1537 assertNotEquals(cf, cf.withTimeout(timeout)); 1538 1539 assertEquals(cf.withName(name).hashCode(), cf.withName(name).hashCode()); 1540 assertEquals(cf.withThreadFactory(threadFactory).hashCode(), 1541 cf.withThreadFactory(threadFactory).hashCode()); 1542 assertEquals(cf.withTimeout(timeout).hashCode(), cf.withTimeout(timeout).hashCode()); 1543 1544 assertTrue(cf.withName(name).toString().contains(name)); 1545 assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString())); 1546 assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString())); 1547 1548 return cf; 1549 }; 1550 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), testConfig)) { 1551 // do nothing 1552 } 1553 } 1554 1555 /** 1556 * Test Joiner default methods. 1557 */ 1558 @Test 1559 void testJoinerDefaultMethods() throws Exception { 1560 try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) { 1561 1562 // need subtasks to test default methods 1563 var subtask1 = scope.fork(() -> "foo"); 1564 while (!scope.isCancelled()) { 1565 Thread.sleep(20); 1566 } 1567 var subtask2 = scope.fork(() -> "bar"); 1568 scope.join(); 1569 1570 assertEquals(Subtask.State.SUCCESS, subtask1.state()); 1571 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 1572 1573 // Joiner that does not override default methods 1574 Joiner<Object, Void> joiner = () -> null; 1575 assertThrows(NullPointerException.class, () -> joiner.onFork(null)); 1576 assertThrows(NullPointerException.class, () -> joiner.onComplete(null)); 1577 assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1)); 1578 assertFalse(joiner.onFork(subtask2)); 1579 assertFalse(joiner.onComplete(subtask1)); 1580 assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2)); 1581 } 1582 } 1583 1584 /** 1585 * Test for NullPointerException. 1586 */ 1587 @Test 1588 void testNulls() throws Exception { 1589 assertThrows(NullPointerException.class, 1590 () -> StructuredTaskScope.open(null)); 1591 assertThrows(NullPointerException.class, 1592 () -> StructuredTaskScope.open(null, cf -> cf)); 1593 assertThrows(NullPointerException.class, 1594 () -> StructuredTaskScope.open(Joiner.awaitAll(), null)); 1595 assertThrows(NullPointerException.class, 1596 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> null)); 1597 1598 assertThrows(NullPointerException.class, () -> Joiner.all(null)); 1599 1600 // fork 1601 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 1602 assertThrows(NullPointerException.class, () -> scope.fork((Callable<Object>) null)); 1603 assertThrows(NullPointerException.class, () -> scope.fork((Runnable) null)); 1604 } 1605 1606 // withXXX 1607 assertThrows(NullPointerException.class, 1608 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withName(null))); 1609 assertThrows(NullPointerException.class, 1610 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withThreadFactory(null))); 1611 assertThrows(NullPointerException.class, 1612 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withTimeout(null))); 1613 } 1614 1615 /** 1616 * ThreadFactory that counts usage. 1617 */ 1618 private static class CountingThreadFactory implements ThreadFactory { 1619 final ThreadFactory delegate; 1620 final AtomicInteger threadCount = new AtomicInteger(); 1621 CountingThreadFactory(ThreadFactory delegate) { 1622 this.delegate = delegate; 1623 } 1624 @Override 1625 public Thread newThread(Runnable task) { 1626 threadCount.incrementAndGet(); 1627 return delegate.newThread(task); 1628 } 1629 int threadCount() { 1630 return threadCount.get(); 1631 } 1632 } 1633 1634 /** 1635 * A joiner that counts that counts the number of subtasks that are forked and the 1636 * number of subtasks that complete. 1637 */ 1638 private static class CountingJoiner<T> implements Joiner<T, Void> { 1639 final AtomicInteger onForkCount = new AtomicInteger(); 1640 final AtomicInteger onCompleteCount = new AtomicInteger(); 1641 @Override 1642 public boolean onFork(Subtask<? extends T> subtask) { 1643 onForkCount.incrementAndGet(); 1644 return false; 1645 } 1646 @Override 1647 public boolean onComplete(Subtask<? extends T> subtask) { 1648 onCompleteCount.incrementAndGet(); 1649 return false; 1650 } 1651 @Override 1652 public Void result() { 1653 return null; 1654 } 1655 int onForkCount() { 1656 return onForkCount.get(); 1657 } 1658 int onCompleteCount() { 1659 return onCompleteCount.get(); 1660 } 1661 } 1662 1663 /** 1664 * A joiner that cancels execution when a subtask completes. It also keeps a count 1665 * of the number of subtasks that are forked and the number of subtasks that complete. 1666 */ 1667 private static class CancelAfterOneJoiner<T> implements Joiner<T, Void> { 1668 final AtomicInteger onForkCount = new AtomicInteger(); 1669 final AtomicInteger onCompleteCount = new AtomicInteger(); 1670 @Override 1671 public boolean onFork(Subtask<? extends T> subtask) { 1672 onForkCount.incrementAndGet(); 1673 return false; 1674 } 1675 @Override 1676 public boolean onComplete(Subtask<? extends T> subtask) { 1677 onCompleteCount.incrementAndGet(); 1678 return true; 1679 } 1680 @Override 1681 public Void result() { 1682 return null; 1683 } 1684 int onForkCount() { 1685 return onForkCount.get(); 1686 } 1687 int onCompleteCount() { 1688 return onCompleteCount.get(); 1689 } 1690 } 1691 1692 /** 1693 * A runtime exception for tests. 1694 */ 1695 private static class FooException extends RuntimeException { 1696 FooException() { } 1697 FooException(Throwable cause) { super(cause); } 1698 } 1699 1700 /** 1701 * Returns the current time in milliseconds. 1702 */ 1703 private long millisTime() { 1704 long now = System.nanoTime(); 1705 return TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS); 1706 } 1707 1708 /** 1709 * Check the duration of a task 1710 * @param start start time, in milliseconds 1711 * @param min minimum expected duration, in milliseconds 1712 * @param max maximum expected duration, in milliseconds 1713 * @return the duration (now - start), in milliseconds 1714 */ 1715 private long expectDuration(long start, long min, long max) { 1716 long duration = millisTime() - start; 1717 assertTrue(duration >= min, 1718 "Duration " + duration + "ms, expected >= " + min + "ms"); 1719 assertTrue(duration <= max, 1720 "Duration " + duration + "ms, expected <= " + max + "ms"); 1721 return duration; 1722 } 1723 1724 /** 1725 * Interrupts a thread when it waits (timed or untimed) at location "{@code c.m}". 1726 * {@code c} is the fully qualified class name and {@code m} is the method name. 1727 */ 1728 private void interruptThreadAt(Thread target, String location) throws InterruptedException { 1729 int index = location.lastIndexOf('.'); 1730 String className = location.substring(0, index); 1731 String methodName = location.substring(index + 1); 1732 1733 boolean found = false; 1734 while (!found) { 1735 Thread.State state = target.getState(); 1736 assertTrue(state != TERMINATED); 1737 if ((state == WAITING || state == TIMED_WAITING) 1738 && contains(target.getStackTrace(), className, methodName)) { 1739 found = true; 1740 } else { 1741 Thread.sleep(20); 1742 } 1743 } 1744 target.interrupt(); 1745 } 1746 1747 /** 1748 * Schedules the current thread to be interrupted when it waits (timed or untimed) 1749 * at the given location. 1750 */ 1751 private void scheduleInterruptAt(String location) { 1752 Thread target = Thread.currentThread(); 1753 scheduler.submit(() -> { 1754 interruptThreadAt(target, location); 1755 return null; 1756 }); 1757 } 1758 1759 /** 1760 * Returns true if the given stack trace contains an element for the given class 1761 * and method name. 1762 */ 1763 private boolean contains(StackTraceElement[] stack, String className, String methodName) { 1764 return Arrays.stream(stack) 1765 .anyMatch(e -> className.equals(e.getClassName()) 1766 && methodName.equals(e.getMethodName())); 1767 } 1768 }