1 /* 2 * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test id=platform 26 * @bug 8284199 8296779 8306647 27 * @summary Basic tests for StructuredTaskScope 28 * @enablePreview 29 * @run junit/othervm -DthreadFactory=platform StructuredTaskScopeTest 30 */ 31 32 /* 33 * @test id=virtual 34 * @enablePreview 35 * @run junit/othervm -DthreadFactory=virtual StructuredTaskScopeTest 36 */ 37 38 import java.time.Duration; 39 import java.util.Arrays; 40 import java.util.ArrayList; 41 import java.util.List; 42 import java.util.Set; 43 import java.util.NoSuchElementException; 44 import java.util.concurrent.Callable; 45 import java.util.concurrent.ConcurrentHashMap; 46 import java.util.concurrent.CountDownLatch; 47 import java.util.concurrent.Executors; 48 import java.util.concurrent.Future; 49 import java.util.concurrent.LinkedTransferQueue; 50 import java.util.concurrent.ThreadFactory; 51 import java.util.concurrent.TimeUnit; 52 import java.util.concurrent.RejectedExecutionException; 53 import java.util.concurrent.ScheduledExecutorService; 54 import java.util.concurrent.StructuredTaskScope; 55 import java.util.concurrent.StructuredTaskScope.TimeoutException; 56 import java.util.concurrent.StructuredTaskScope.Config; 57 import java.util.concurrent.StructuredTaskScope.FailedException; 58 import java.util.concurrent.StructuredTaskScope.Joiner; 59 import java.util.concurrent.StructuredTaskScope.Subtask; 60 import java.util.concurrent.StructureViolationException; 61 import java.util.concurrent.atomic.AtomicBoolean; 62 import java.util.concurrent.atomic.AtomicInteger; 63 import java.util.function.Function; 64 import java.util.function.Predicate; 65 import java.util.stream.Stream; 66 import static java.lang.Thread.State.*; 67 68 import org.junit.jupiter.api.Test; 69 import org.junit.jupiter.api.BeforeAll; 70 import org.junit.jupiter.api.AfterAll; 71 import org.junit.jupiter.params.ParameterizedTest; 72 import org.junit.jupiter.params.provider.MethodSource; 73 import static org.junit.jupiter.api.Assertions.*; 74 75 class StructuredTaskScopeTest { 76 private static ScheduledExecutorService scheduler; 77 private static List<ThreadFactory> threadFactories; 78 79 @BeforeAll 80 static void setup() throws Exception { 81 scheduler = Executors.newSingleThreadScheduledExecutor(); 82 83 // thread factories 84 String value = System.getProperty("threadFactory"); 85 List<ThreadFactory> list = new ArrayList<>(); 86 if (value == null || value.equals("platform")) 87 list.add(Thread.ofPlatform().factory()); 88 if (value == null || value.equals("virtual")) 89 list.add(Thread.ofVirtual().factory()); 90 assertTrue(list.size() > 0, "No thread factories for tests"); 91 threadFactories = list; 92 } 93 94 @AfterAll 95 static void shutdown() { 96 scheduler.shutdown(); 97 } 98 99 private static Stream<ThreadFactory> factories() { 100 return threadFactories.stream(); 101 } 102 103 /** 104 * Test that fork creates virtual threads when no ThreadFactory is configured. 105 */ 106 @Test 107 void testForkCreatesVirtualThread() throws Exception { 108 Set<Thread> threads = ConcurrentHashMap.newKeySet(); 109 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 110 for (int i = 0; i < 50; i++) { 111 // runnable 112 scope.fork(() -> { 113 threads.add(Thread.currentThread()); 114 }); 115 116 // callable 117 scope.fork(() -> { 118 threads.add(Thread.currentThread()); 119 return null; 120 }); 121 } 122 scope.join(); 123 } 124 assertEquals(100, threads.size()); 125 threads.forEach(t -> assertTrue(t.isVirtual())); 126 } 127 128 /** 129 * Test that fork create threads with the configured ThreadFactory. 130 */ 131 @ParameterizedTest 132 @MethodSource("factories") 133 void testForkUsesThreadFactory(ThreadFactory factory) throws Exception { 134 // TheadFactory that keeps reference to all threads it creates 135 class RecordingThreadFactory implements ThreadFactory { 136 final ThreadFactory delegate; 137 final Set<Thread> threads = ConcurrentHashMap.newKeySet(); 138 RecordingThreadFactory(ThreadFactory delegate) { 139 this.delegate = delegate; 140 } 141 @Override 142 public Thread newThread(Runnable task) { 143 Thread thread = delegate.newThread(task); 144 threads.add(thread); 145 return thread; 146 } 147 Set<Thread> threads() { 148 return threads; 149 } 150 } 151 var recordingThreadFactory = new RecordingThreadFactory(factory); 152 Set<Thread> threads = ConcurrentHashMap.newKeySet(); 153 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 154 cf -> cf.withThreadFactory(recordingThreadFactory))) { 155 156 for (int i = 0; i < 50; i++) { 157 // runnable 158 scope.fork(() -> { 159 threads.add(Thread.currentThread()); 160 }); 161 162 // callable 163 scope.fork(() -> { 164 threads.add(Thread.currentThread()); 165 return null; 166 }); 167 } 168 scope.join(); 169 } 170 assertEquals(100, threads.size()); 171 assertEquals(recordingThreadFactory.threads(), threads); 172 } 173 174 /** 175 * Test fork method is owner confined. 176 */ 177 @ParameterizedTest 178 @MethodSource("factories") 179 void testForkConfined(ThreadFactory factory) throws Exception { 180 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(), 181 cf -> cf.withThreadFactory(factory))) { 182 183 // random thread cannot fork 184 try (var pool = Executors.newSingleThreadExecutor()) { 185 Future<Void> future = pool.submit(() -> { 186 assertThrows(WrongThreadException.class, () -> { 187 scope.fork(() -> null); 188 }); 189 return null; 190 }); 191 future.get(); 192 } 193 194 // subtask cannot fork 195 Subtask<Boolean> subtask = scope.fork(() -> { 196 assertThrows(WrongThreadException.class, () -> { 197 scope.fork(() -> null); 198 }); 199 return true; 200 }); 201 scope.join(); 202 assertTrue(subtask.get()); 203 } 204 } 205 206 /** 207 * Test fork after join, no subtasks forked before join. 208 */ 209 @ParameterizedTest 210 @MethodSource("factories") 211 void testForkAfterJoin1(ThreadFactory factory) throws Exception { 212 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 213 cf -> cf.withThreadFactory(factory))) { 214 scope.join(); 215 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar")); 216 } 217 } 218 219 /** 220 * Test fork after join, subtasks forked before join. 221 */ 222 @ParameterizedTest 223 @MethodSource("factories") 224 void testForkAfterJoin2(ThreadFactory factory) throws Exception { 225 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 226 cf -> cf.withThreadFactory(factory))) { 227 scope.fork(() -> "foo"); 228 scope.join(); 229 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar")); 230 } 231 } 232 233 /** 234 * Test fork after join throws. 235 */ 236 @ParameterizedTest 237 @MethodSource("factories") 238 void testForkAfterJoinThrows(ThreadFactory factory) throws Exception { 239 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 240 cf -> cf.withThreadFactory(factory))) { 241 var latch = new CountDownLatch(1); 242 var subtask1 = scope.fork(() -> { 243 latch.await(); 244 return "foo"; 245 }); 246 247 // join throws 248 Thread.currentThread().interrupt(); 249 assertThrows(InterruptedException.class, scope::join); 250 251 // fork should throw 252 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar")); 253 } 254 } 255 256 /** 257 * Test fork after task scope is cancelled. This test uses a custom Joiner to 258 * cancel execution. 259 */ 260 @ParameterizedTest 261 @MethodSource("factories") 262 void testForkAfterCancel2(ThreadFactory factory) throws Exception { 263 var countingThreadFactory = new CountingThreadFactory(factory); 264 var testJoiner = new CancelAfterOneJoiner<String>(); 265 266 try (var scope = StructuredTaskScope.open(testJoiner, 267 cf -> cf.withThreadFactory(countingThreadFactory))) { 268 269 // fork subtask, the scope should be cancelled when the subtask completes 270 var subtask1 = scope.fork(() -> "foo"); 271 while (!scope.isCancelled()) { 272 Thread.sleep(20); 273 } 274 275 assertEquals(1, countingThreadFactory.threadCount()); 276 assertEquals(1, testJoiner.onForkCount()); 277 assertEquals(1, testJoiner.onCompleteCount()); 278 279 // fork second subtask, it should not run 280 var subtask2 = scope.fork(() -> "bar"); 281 282 // onFork should be invoked, newThread and onComplete should not be invoked 283 assertEquals(1, countingThreadFactory.threadCount()); 284 assertEquals(2, testJoiner.onForkCount()); 285 assertEquals(1, testJoiner.onCompleteCount()); 286 287 scope.join(); 288 289 assertEquals(1, countingThreadFactory.threadCount()); 290 assertEquals(2, testJoiner.onForkCount()); 291 assertEquals(1, testJoiner.onCompleteCount()); 292 assertEquals("foo", subtask1.get()); 293 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 294 } 295 } 296 297 /** 298 * Test fork after task scope is closed. 299 */ 300 @Test 301 void testForkAfterClose() { 302 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 303 scope.close(); 304 assertThrows(IllegalStateException.class, () -> scope.fork(() -> null)); 305 } 306 } 307 308 /** 309 * Test fork with a ThreadFactory that rejects creating a thread. 310 */ 311 @Test 312 void testForkRejectedExecutionException() { 313 ThreadFactory factory = task -> null; 314 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 315 cf -> cf.withThreadFactory(factory))) { 316 assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null)); 317 } 318 } 319 320 /** 321 * Test join with no subtasks. 322 */ 323 @Test 324 void testJoinWithNoSubtasks() throws Exception { 325 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 326 scope.join(); 327 } 328 } 329 330 /** 331 * Test join with a remaining subtask. 332 */ 333 @ParameterizedTest 334 @MethodSource("factories") 335 void testJoinWithRemainingSubtasks(ThreadFactory factory) throws Exception { 336 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 337 cf -> cf.withThreadFactory(factory))) { 338 Subtask<String> subtask = scope.fork(() -> { 339 Thread.sleep(Duration.ofMillis(100)); 340 return "foo"; 341 }); 342 scope.join(); 343 assertEquals("foo", subtask.get()); 344 } 345 } 346 347 /** 348 * Test join after join completed with a result. 349 */ 350 @Test 351 void testJoinAfterJoin1() throws Exception { 352 var results = new LinkedTransferQueue<>(List.of("foo", "bar", "baz")); 353 Joiner<Object, String> joiner = results::take; 354 try (var scope = StructuredTaskScope.open(joiner)) { 355 scope.fork(() -> "foo"); 356 assertEquals("foo", scope.join()); 357 358 // join already called 359 for (int i = 0 ; i < 3; i++) { 360 assertThrows(IllegalStateException.class, scope::join); 361 } 362 } 363 } 364 365 /** 366 * Test join after join completed with an exception. 367 */ 368 @Test 369 void testJoinAfterJoin2() throws Exception { 370 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) { 371 scope.fork(() -> { throw new FooException(); }); 372 Throwable ex = assertThrows(FailedException.class, scope::join); 373 assertTrue(ex.getCause() instanceof FooException); 374 375 // join already called 376 for (int i = 0 ; i < 3; i++) { 377 assertThrows(IllegalStateException.class, scope::join); 378 } 379 } 380 } 381 382 /** 383 * Test join after join completed with a timeout. 384 */ 385 @Test 386 void testJoinAfterJoin3() throws Exception { 387 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(), 388 cf -> cf.withTimeout(Duration.ofMillis(100)))) { 389 // wait for scope to be cancelled by timeout 390 while (!scope.isCancelled()) { 391 Thread.sleep(20); 392 } 393 assertThrows(TimeoutException.class, scope::join); 394 395 // join already called 396 for (int i = 0 ; i < 3; i++) { 397 assertThrows(IllegalStateException.class, scope::join); 398 } 399 } 400 } 401 402 /** 403 * Test join method is owner confined. 404 */ 405 @ParameterizedTest 406 @MethodSource("factories") 407 void testJoinConfined(ThreadFactory factory) throws Exception { 408 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(), 409 cf -> cf.withThreadFactory(factory))) { 410 411 // random thread cannot join 412 try (var pool = Executors.newSingleThreadExecutor()) { 413 Future<Void> future = pool.submit(() -> { 414 assertThrows(WrongThreadException.class, scope::join); 415 return null; 416 }); 417 future.get(); 418 } 419 420 // subtask cannot join 421 Subtask<Boolean> subtask = scope.fork(() -> { 422 assertThrows(WrongThreadException.class, () -> { scope.join(); }); 423 return true; 424 }); 425 scope.join(); 426 assertTrue(subtask.get()); 427 } 428 } 429 430 /** 431 * Test join with interrupt status set. 432 */ 433 @ParameterizedTest 434 @MethodSource("factories") 435 void testInterruptJoin1(ThreadFactory factory) throws Exception { 436 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 437 cf -> cf.withThreadFactory(factory))) { 438 439 Subtask<String> subtask = scope.fork(() -> { 440 Thread.sleep(60_000); 441 return "foo"; 442 }); 443 444 // join should throw 445 Thread.currentThread().interrupt(); 446 try { 447 scope.join(); 448 fail("join did not throw"); 449 } catch (InterruptedException expected) { 450 assertFalse(Thread.interrupted()); // interrupt status should be cleared 451 } 452 } 453 } 454 455 /** 456 * Test interrupt of thread blocked in join. 457 */ 458 @ParameterizedTest 459 @MethodSource("factories") 460 void testInterruptJoin2(ThreadFactory factory) throws Exception { 461 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 462 cf -> cf.withThreadFactory(factory))) { 463 464 var latch = new CountDownLatch(1); 465 Subtask<String> subtask = scope.fork(() -> { 466 Thread.sleep(60_000); 467 return "foo"; 468 }); 469 470 // join should throw 471 scheduleInterruptAt("java.util.concurrent.StructuredTaskScope.join"); 472 try { 473 scope.join(); 474 fail("join did not throw"); 475 } catch (InterruptedException expected) { 476 assertFalse(Thread.interrupted()); // interrupt status should be clear 477 } 478 } 479 } 480 481 /** 482 * Test join when scope is cancelled. 483 */ 484 @ParameterizedTest 485 @MethodSource("factories") 486 void testJoinWhenCancelled(ThreadFactory factory) throws Exception { 487 var countingThreadFactory = new CountingThreadFactory(factory); 488 var testJoiner = new CancelAfterOneJoiner<String>(); 489 490 try (var scope = StructuredTaskScope.open(testJoiner, 491 cf -> cf.withThreadFactory(countingThreadFactory))) { 492 493 // fork subtask, the scope should be cancelled when the subtask completes 494 var subtask1 = scope.fork(() -> "foo"); 495 while (!scope.isCancelled()) { 496 Thread.sleep(20); 497 } 498 499 // fork second subtask, it should not run 500 var subtask2 = scope.fork(() -> { 501 Thread.sleep(Duration.ofDays(1)); 502 return "bar"; 503 }); 504 505 scope.join(); 506 507 assertEquals("foo", subtask1.get()); 508 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 509 } 510 } 511 512 /** 513 * Test join after scope is closed. 514 */ 515 @Test 516 void testJoinAfterClose() throws Exception { 517 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 518 scope.close(); 519 assertThrows(IllegalStateException.class, () -> scope.join()); 520 } 521 } 522 523 /** 524 * Test join with timeout, subtasks finish before timeout expires. 525 */ 526 @ParameterizedTest 527 @MethodSource("factories") 528 void testJoinWithTimeout1(ThreadFactory factory) throws Exception { 529 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 530 cf -> cf.withThreadFactory(factory) 531 .withTimeout(Duration.ofDays(1)))) { 532 533 Subtask<String> subtask = scope.fork(() -> { 534 Thread.sleep(Duration.ofSeconds(1)); 535 return "foo"; 536 }); 537 538 scope.join(); 539 540 assertFalse(scope.isCancelled()); 541 assertEquals("foo", subtask.get()); 542 } 543 } 544 545 /** 546 * Test join with timeout, timeout expires before subtasks finish. 547 */ 548 @ParameterizedTest 549 @MethodSource("factories") 550 void testJoinWithTimeout2(ThreadFactory factory) throws Exception { 551 long startMillis = millisTime(); 552 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 553 cf -> cf.withThreadFactory(factory) 554 .withTimeout(Duration.ofSeconds(2)))) { 555 556 Subtask<Void> subtask = scope.fork(() -> { 557 Thread.sleep(Duration.ofDays(1)); 558 return null; 559 }); 560 561 assertThrows(TimeoutException.class, scope::join); 562 expectDuration(startMillis, /*min*/1900, /*max*/20_000); 563 564 assertTrue(scope.isCancelled()); 565 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 566 } 567 } 568 569 /** 570 * Test join with timeout that has already expired. 571 */ 572 @ParameterizedTest 573 @MethodSource("factories") 574 void testJoinWithTimeout3(ThreadFactory factory) throws Exception { 575 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 576 cf -> cf.withThreadFactory(factory) 577 .withTimeout(Duration.ofSeconds(-1)))) { 578 579 Subtask<Void> subtask = scope.fork(() -> { 580 Thread.sleep(Duration.ofDays(1)); 581 return null; 582 }); 583 584 assertThrows(TimeoutException.class, scope::join); 585 586 assertTrue(scope.isCancelled()); 587 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 588 } 589 } 590 591 /** 592 * Test that cancelling execution interrupts unfinished threads. This test uses 593 * a custom Joiner to cancel execution. 594 */ 595 @ParameterizedTest 596 @MethodSource("factories") 597 void testCancelInterruptsThreads2(ThreadFactory factory) throws Exception { 598 var testJoiner = new CancelAfterOneJoiner<String>(); 599 600 try (var scope = StructuredTaskScope.open(testJoiner, 601 cf -> cf.withThreadFactory(factory))) { 602 603 // fork subtask1 that runs for a long time 604 var started = new CountDownLatch(1); 605 var interrupted = new CountDownLatch(1); 606 var subtask1 = scope.fork(() -> { 607 started.countDown(); 608 try { 609 Thread.sleep(Duration.ofDays(1)); 610 } catch (InterruptedException e) { 611 interrupted.countDown(); 612 } 613 }); 614 started.await(); 615 616 // fork subtask2, the scope should be cancelled when the subtask completes 617 var subtask2 = scope.fork(() -> "bar"); 618 while (!scope.isCancelled()) { 619 Thread.sleep(20); 620 } 621 622 // subtask1 should be interrupted 623 interrupted.await(); 624 625 scope.join(); 626 assertEquals(Subtask.State.UNAVAILABLE, subtask1.state()); 627 assertEquals("bar", subtask2.get()); 628 } 629 } 630 631 /** 632 * Test that timeout interrupts unfinished threads. 633 */ 634 @ParameterizedTest 635 @MethodSource("factories") 636 void testTimeoutInterruptsThreads(ThreadFactory factory) throws Exception { 637 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 638 cf -> cf.withThreadFactory(factory) 639 .withTimeout(Duration.ofSeconds(2)))) { 640 641 var started = new AtomicBoolean(); 642 var interrupted = new CountDownLatch(1); 643 Subtask<Void> subtask = scope.fork(() -> { 644 started.set(true); 645 try { 646 Thread.sleep(Duration.ofDays(1)); 647 } catch (InterruptedException e) { 648 interrupted.countDown(); 649 } 650 return null; 651 }); 652 653 while (!scope.isCancelled()) { 654 Thread.sleep(50); 655 } 656 657 // if subtask started then it should be interrupted 658 if (started.get()) { 659 interrupted.await(); 660 } 661 662 assertThrows(TimeoutException.class, scope::join); 663 664 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 665 } 666 } 667 668 /** 669 * Test close without join, no subtasks forked. 670 */ 671 @Test 672 void testCloseWithoutJoin1() { 673 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 674 // do nothing 675 } 676 } 677 678 /** 679 * Test close without join, subtasks forked. 680 */ 681 @ParameterizedTest 682 @MethodSource("factories") 683 void testCloseWithoutJoin2(ThreadFactory factory) { 684 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 685 cf -> cf.withThreadFactory(factory))) { 686 Subtask<String> subtask = scope.fork(() -> { 687 Thread.sleep(Duration.ofDays(1)); 688 return null; 689 }); 690 691 // first call to close should throw 692 assertThrows(IllegalStateException.class, scope::close); 693 694 // subsequent calls to close should not throw 695 for (int i = 0; i < 3; i++) { 696 scope.close(); 697 } 698 699 // subtask result/exception not available 700 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 701 assertThrows(IllegalStateException.class, subtask::get); 702 assertThrows(IllegalStateException.class, subtask::exception); 703 } 704 } 705 706 /** 707 * Test close after join throws. Close should not throw as join attempted. 708 */ 709 @ParameterizedTest 710 @MethodSource("factories") 711 void testCloseAfterJoinThrows(ThreadFactory factory) throws Exception { 712 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 713 cf -> cf.withThreadFactory(factory))) { 714 var subtask = scope.fork(() -> { 715 Thread.sleep(Duration.ofDays(1)); 716 return null; 717 }); 718 719 // join throws 720 Thread.currentThread().interrupt(); 721 assertThrows(InterruptedException.class, scope::join); 722 assertThrows(IllegalStateException.class, subtask::get); 723 724 } // close should not throw 725 } 726 727 /** 728 * Test close method is owner confined. 729 */ 730 @ParameterizedTest 731 @MethodSource("factories") 732 void testCloseConfined(ThreadFactory factory) throws Exception { 733 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(), 734 cf -> cf.withThreadFactory(factory))) { 735 736 // random thread cannot close scope 737 try (var pool = Executors.newCachedThreadPool(factory)) { 738 Future<Boolean> future = pool.submit(() -> { 739 assertThrows(WrongThreadException.class, scope::close); 740 return null; 741 }); 742 future.get(); 743 } 744 745 // subtask cannot close 746 Subtask<Boolean> subtask = scope.fork(() -> { 747 assertThrows(WrongThreadException.class, scope::close); 748 return true; 749 }); 750 scope.join(); 751 assertTrue(subtask.get()); 752 } 753 } 754 755 /** 756 * Test close with interrupt status set. 757 */ 758 @ParameterizedTest 759 @MethodSource("factories") 760 void testInterruptClose1(ThreadFactory factory) throws Exception { 761 var testJoiner = new CancelAfterOneJoiner<String>(); 762 try (var scope = StructuredTaskScope.open(testJoiner, 763 cf -> cf.withThreadFactory(factory))) { 764 765 // fork first subtask, a straggler as it continues after being interrupted 766 var started = new CountDownLatch(1); 767 var done = new AtomicBoolean(); 768 scope.fork(() -> { 769 started.countDown(); 770 try { 771 Thread.sleep(Duration.ofDays(1)); 772 } catch (InterruptedException e) { 773 // interrupted by cancel, expected 774 } 775 Thread.sleep(Duration.ofMillis(100)); // force close to wait 776 done.set(true); 777 return null; 778 }); 779 started.await(); 780 781 // fork second subtask, the scope should be cancelled when this subtask completes 782 scope.fork(() -> "bar"); 783 while (!scope.isCancelled()) { 784 Thread.sleep(20); 785 } 786 787 scope.join(); 788 789 // invoke close with interrupt status set 790 Thread.currentThread().interrupt(); 791 try { 792 scope.close(); 793 } finally { 794 assertTrue(Thread.interrupted()); // clear interrupt status 795 assertTrue(done.get()); 796 } 797 } 798 } 799 800 /** 801 * Test interrupting thread waiting in close. 802 */ 803 @ParameterizedTest 804 @MethodSource("factories") 805 void testInterruptClose2(ThreadFactory factory) throws Exception { 806 var testJoiner = new CancelAfterOneJoiner<String>(); 807 try (var scope = StructuredTaskScope.open(testJoiner, 808 cf -> cf.withThreadFactory(factory))) { 809 810 Thread mainThread = Thread.currentThread(); 811 812 // fork first subtask, a straggler as it continues after being interrupted 813 var started = new CountDownLatch(1); 814 var done = new AtomicBoolean(); 815 scope.fork(() -> { 816 started.countDown(); 817 try { 818 Thread.sleep(Duration.ofDays(1)); 819 } catch (InterruptedException e) { 820 // interrupted by cancel, expected 821 } 822 823 // interrupt main thread when it blocks in close 824 interruptThreadAt(mainThread, "java.util.concurrent.StructuredTaskScope.close"); 825 826 Thread.sleep(Duration.ofMillis(100)); // force close to wait 827 done.set(true); 828 return null; 829 }); 830 started.await(); 831 832 // fork second subtask, the scope should be cancelled when this subtask completes 833 scope.fork(() -> "bar"); 834 while (!scope.isCancelled()) { 835 Thread.sleep(20); 836 } 837 838 scope.join(); 839 840 // main thread will be interrupted while blocked in close 841 try { 842 scope.close(); 843 } finally { 844 assertTrue(Thread.interrupted()); // clear interrupt status 845 assertTrue(done.get()); 846 } 847 } 848 } 849 850 /** 851 * Test that closing an enclosing scope closes the thread flock of a nested scope. 852 */ 853 @Test 854 void testCloseThrowsStructureViolation() throws Exception { 855 try (var scope1 = StructuredTaskScope.open(Joiner.awaitAll())) { 856 try (var scope2 = StructuredTaskScope.open(Joiner.awaitAll())) { 857 858 // close enclosing scope 859 try { 860 scope1.close(); 861 fail("close did not throw"); 862 } catch (StructureViolationException expected) { } 863 864 // underlying flock should be closed 865 var executed = new AtomicBoolean(); 866 Subtask<?> subtask = scope2.fork(() -> executed.set(true)); 867 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 868 scope2.join(); 869 assertFalse(executed.get()); 870 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 871 } 872 } 873 } 874 875 /** 876 * Test that isCancelled returns true after close. 877 */ 878 @Test 879 void testIsCancelledAfterClose() throws Exception { 880 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 881 assertFalse(scope.isCancelled()); 882 scope.close(); 883 assertTrue(scope.isCancelled()); 884 } 885 } 886 887 /** 888 * Test Joiner.onFork throwing exception. 889 */ 890 @Test 891 void testOnForkThrows() throws Exception { 892 var joiner = new Joiner<String, Void>() { 893 @Override 894 public boolean onFork(Subtask<? extends String> subtask) { 895 throw new FooException(); 896 } 897 @Override 898 public Void result() { 899 return null; 900 } 901 }; 902 try (var scope = StructuredTaskScope.open(joiner)) { 903 assertThrows(FooException.class, () -> scope.fork(() -> "foo")); 904 } 905 } 906 907 /** 908 * Test Joiner.onFork returning true to cancel execution. 909 */ 910 @Test 911 void testOnForkCancelsExecution() throws Exception { 912 var joiner = new Joiner<String, Void>() { 913 @Override 914 public boolean onFork(Subtask<? extends String> subtask) { 915 return true; 916 } 917 @Override 918 public Void result() { 919 return null; 920 } 921 }; 922 try (var scope = StructuredTaskScope.open(joiner)) { 923 assertFalse(scope.isCancelled()); 924 scope.fork(() -> "foo"); 925 assertTrue(scope.isCancelled()); 926 scope.join(); 927 } 928 } 929 930 /** 931 * Test Joiner.onComplete returning true to cancel execution. 932 */ 933 @Test 934 void testOnCompleteCancelsExecution() throws Exception { 935 var joiner = new Joiner<String, Void>() { 936 @Override 937 public boolean onComplete(Subtask<? extends String> subtask) { 938 return true; 939 } 940 @Override 941 public Void result() { 942 return null; 943 } 944 }; 945 try (var scope = StructuredTaskScope.open(joiner)) { 946 assertFalse(scope.isCancelled()); 947 scope.fork(() -> "foo"); 948 while (!scope.isCancelled()) { 949 Thread.sleep(10); 950 } 951 scope.join(); 952 } 953 } 954 955 /** 956 * Test toString. 957 */ 958 @Test 959 void testToString() throws Exception { 960 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 961 cf -> cf.withName("duke"))) { 962 963 // open 964 assertTrue(scope.toString().contains("duke")); 965 966 // closed 967 scope.close(); 968 assertTrue(scope.toString().contains("duke")); 969 } 970 } 971 972 /** 973 * Test Subtask with task that completes successfully. 974 */ 975 @ParameterizedTest 976 @MethodSource("factories") 977 void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception { 978 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 979 cf -> cf.withThreadFactory(factory))) { 980 981 Subtask<String> subtask = scope.fork(() -> "foo"); 982 983 // before join 984 assertThrows(IllegalStateException.class, subtask::get); 985 assertThrows(IllegalStateException.class, subtask::exception); 986 987 scope.join(); 988 989 // after join 990 assertEquals(Subtask.State.SUCCESS, subtask.state()); 991 assertEquals("foo", subtask.get()); 992 assertThrows(IllegalStateException.class, subtask::exception); 993 } 994 } 995 996 /** 997 * Test Subtask with task that fails. 998 */ 999 @ParameterizedTest 1000 @MethodSource("factories") 1001 void testSubtaskWhenFailed(ThreadFactory factory) throws Exception { 1002 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 1003 cf -> cf.withThreadFactory(factory))) { 1004 1005 Subtask<String> subtask = scope.fork(() -> { throw new FooException(); }); 1006 1007 // before join 1008 assertThrows(IllegalStateException.class, subtask::get); 1009 assertThrows(IllegalStateException.class, subtask::exception); 1010 1011 scope.join(); 1012 1013 // after join 1014 assertEquals(Subtask.State.FAILED, subtask.state()); 1015 assertThrows(IllegalStateException.class, subtask::get); 1016 assertTrue(subtask.exception() instanceof FooException); 1017 } 1018 } 1019 1020 /** 1021 * Test Subtask with a task that has not completed. 1022 */ 1023 @ParameterizedTest 1024 @MethodSource("factories") 1025 void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception { 1026 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 1027 cf -> cf.withThreadFactory(factory))) { 1028 Subtask<Void> subtask = scope.fork(() -> { 1029 Thread.sleep(Duration.ofDays(1)); 1030 return null; 1031 }); 1032 1033 // before join 1034 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1035 assertThrows(IllegalStateException.class, subtask::get); 1036 assertThrows(IllegalStateException.class, subtask::exception); 1037 1038 // attempt join, join throws 1039 Thread.currentThread().interrupt(); 1040 assertThrows(InterruptedException.class, scope::join); 1041 1042 // after join 1043 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1044 assertThrows(IllegalStateException.class, subtask::get); 1045 assertThrows(IllegalStateException.class, subtask::exception); 1046 } 1047 } 1048 1049 /** 1050 * Test Subtask forked after execution cancelled. 1051 */ 1052 @ParameterizedTest 1053 @MethodSource("factories") 1054 void testSubtaskWhenCancelled(ThreadFactory factory) throws Exception { 1055 try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) { 1056 scope.fork(() -> "foo"); 1057 while (!scope.isCancelled()) { 1058 Thread.sleep(20); 1059 } 1060 1061 var subtask = scope.fork(() -> "foo"); 1062 1063 // before join 1064 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1065 assertThrows(IllegalStateException.class, subtask::get); 1066 assertThrows(IllegalStateException.class, subtask::exception); 1067 1068 scope.join(); 1069 1070 // after join 1071 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1072 assertThrows(IllegalStateException.class, subtask::get); 1073 assertThrows(IllegalStateException.class, subtask::exception); 1074 } 1075 } 1076 1077 /** 1078 * Test Subtask::toString. 1079 */ 1080 @Test 1081 void testSubtaskToString() throws Exception { 1082 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 1083 var latch = new CountDownLatch(1); 1084 var subtask1 = scope.fork(() -> { 1085 latch.await(); 1086 return "foo"; 1087 }); 1088 var subtask2 = scope.fork(() -> { throw new FooException(); }); 1089 1090 // subtask1 result is unavailable 1091 assertTrue(subtask1.toString().contains("Unavailable")); 1092 latch.countDown(); 1093 1094 scope.join(); 1095 1096 assertTrue(subtask1.toString().contains("Completed successfully")); 1097 assertTrue(subtask2.toString().contains("Failed")); 1098 } 1099 } 1100 1101 /** 1102 * Test Joiner.allSuccessfulOrThrow() with no subtasks. 1103 */ 1104 @Test 1105 void testAllSuccessfulOrThrow1() throws Throwable { 1106 try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow())) { 1107 var subtasks = scope.join().toList(); 1108 assertTrue(subtasks.isEmpty()); 1109 } 1110 } 1111 1112 /** 1113 * Test Joiner.allSuccessfulOrThrow() with subtasks that complete successfully. 1114 */ 1115 @ParameterizedTest 1116 @MethodSource("factories") 1117 void testAllSuccessfulOrThrow2(ThreadFactory factory) throws Throwable { 1118 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(), 1119 cf -> cf.withThreadFactory(factory))) { 1120 var subtask1 = scope.fork(() -> "foo"); 1121 var subtask2 = scope.fork(() -> "bar"); 1122 var subtasks = scope.join().toList(); 1123 assertEquals(List.of(subtask1, subtask2), subtasks); 1124 assertEquals("foo", subtask1.get()); 1125 assertEquals("bar", subtask2.get()); 1126 } 1127 } 1128 1129 /** 1130 * Test Joiner.allSuccessfulOrThrow() with a subtask that complete successfully and 1131 * a subtask that fails. 1132 */ 1133 @ParameterizedTest 1134 @MethodSource("factories") 1135 void testAllSuccessfulOrThrow3(ThreadFactory factory) throws Throwable { 1136 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(), 1137 cf -> cf.withThreadFactory(factory))) { 1138 scope.fork(() -> "foo"); 1139 scope.fork(() -> { throw new FooException(); }); 1140 try { 1141 scope.join(); 1142 } catch (FailedException e) { 1143 assertTrue(e.getCause() instanceof FooException); 1144 } 1145 } 1146 } 1147 1148 /** 1149 * Test Joiner.anySuccessfulResultOrThrow() with no subtasks. 1150 */ 1151 @Test 1152 void testAnySuccessfulResultOrThrow1() throws Exception { 1153 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) { 1154 try { 1155 scope.join(); 1156 } catch (FailedException e) { 1157 assertTrue(e.getCause() instanceof NoSuchElementException); 1158 } 1159 } 1160 } 1161 1162 /** 1163 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully. 1164 */ 1165 @ParameterizedTest 1166 @MethodSource("factories") 1167 void testAnySuccessfulResultOrThrow2(ThreadFactory factory) throws Exception { 1168 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(), 1169 cf -> cf.withThreadFactory(factory))) { 1170 scope.fork(() -> "foo"); 1171 String result = scope.join(); 1172 assertEquals("foo", result); 1173 } 1174 } 1175 1176 /** 1177 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully 1178 * with a null result. 1179 */ 1180 @ParameterizedTest 1181 @MethodSource("factories") 1182 void testAnySuccessfulResultOrThrow3(ThreadFactory factory) throws Exception { 1183 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(), 1184 cf -> cf.withThreadFactory(factory))) { 1185 scope.fork(() -> null); 1186 String result = scope.join(); 1187 assertNull(result); 1188 } 1189 } 1190 1191 /** 1192 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that complete succcessfully 1193 * and a subtask that fails. 1194 */ 1195 @ParameterizedTest 1196 @MethodSource("factories") 1197 void testAnySuccessfulResultOrThrow4(ThreadFactory factory) throws Exception { 1198 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(), 1199 cf -> cf.withThreadFactory(factory))) { 1200 scope.fork(() -> "foo"); 1201 scope.fork(() -> { throw new FooException(); }); 1202 String first = scope.join(); 1203 assertEquals("foo", first); 1204 } 1205 } 1206 1207 /** 1208 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that fails. 1209 */ 1210 @ParameterizedTest 1211 @MethodSource("factories") 1212 void testAnySuccessfulResultOrThrow5(ThreadFactory factory) throws Exception { 1213 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(), 1214 cf -> cf.withThreadFactory(factory))) { 1215 scope.fork(() -> { throw new FooException(); }); 1216 Throwable ex = assertThrows(FailedException.class, scope::join); 1217 assertTrue(ex.getCause() instanceof FooException); 1218 } 1219 } 1220 1221 /** 1222 * Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks. 1223 */ 1224 @Test 1225 void testAwaitSuccessfulOrThrow1() throws Throwable { 1226 try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) { 1227 var result = scope.join(); 1228 assertNull(result); 1229 } 1230 } 1231 1232 /** 1233 * Test Joiner.awaitAllSuccessfulOrThrow() with subtasks that complete successfully. 1234 */ 1235 @ParameterizedTest 1236 @MethodSource("factories") 1237 void testAwaitSuccessfulOrThrow2(ThreadFactory factory) throws Throwable { 1238 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(), 1239 cf -> cf.withThreadFactory(factory))) { 1240 var subtask1 = scope.fork(() -> "foo"); 1241 var subtask2 = scope.fork(() -> "bar"); 1242 var result = scope.join(); 1243 assertNull(result); 1244 assertEquals("foo", subtask1.get()); 1245 assertEquals("bar", subtask2.get()); 1246 } 1247 } 1248 1249 /** 1250 * Test Joiner.awaitAllSuccessfulOrThrow() with a subtask that complete successfully and 1251 * a subtask that fails. 1252 */ 1253 @ParameterizedTest 1254 @MethodSource("factories") 1255 void testAwaitSuccessfulOrThrow3(ThreadFactory factory) throws Throwable { 1256 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(), 1257 cf -> cf.withThreadFactory(factory))) { 1258 scope.fork(() -> "foo"); 1259 scope.fork(() -> { throw new FooException(); }); 1260 try { 1261 scope.join(); 1262 } catch (FailedException e) { 1263 assertTrue(e.getCause() instanceof FooException); 1264 } 1265 } 1266 } 1267 1268 /** 1269 * Test Joiner.awaitAll() with no subtasks. 1270 */ 1271 @Test 1272 void testAwaitAll1() throws Throwable { 1273 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 1274 var result = scope.join(); 1275 assertNull(result); 1276 } 1277 } 1278 1279 /** 1280 * Test Joiner.awaitAll() with subtasks that complete successfully. 1281 */ 1282 @ParameterizedTest 1283 @MethodSource("factories") 1284 void testAwaitAll2(ThreadFactory factory) throws Throwable { 1285 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 1286 cf -> cf.withThreadFactory(factory))) { 1287 var subtask1 = scope.fork(() -> "foo"); 1288 var subtask2 = scope.fork(() -> "bar"); 1289 var result = scope.join(); 1290 assertNull(result); 1291 assertEquals("foo", subtask1.get()); 1292 assertEquals("bar", subtask2.get()); 1293 } 1294 } 1295 1296 /** 1297 * Test Joiner.awaitAll() with a subtask that complete successfully and a subtask 1298 * that fails. 1299 */ 1300 @ParameterizedTest 1301 @MethodSource("factories") 1302 void testAwaitAll3(ThreadFactory factory) throws Throwable { 1303 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 1304 cf -> cf.withThreadFactory(factory))) { 1305 var subtask1 = scope.fork(() -> "foo"); 1306 var subtask2 = scope.fork(() -> { throw new FooException(); }); 1307 var result = scope.join(); 1308 assertNull(result); 1309 assertEquals("foo", subtask1.get()); 1310 assertTrue(subtask2.exception() instanceof FooException); 1311 } 1312 } 1313 1314 /** 1315 * Test Joiner.allUntil(Predicate) with no subtasks. 1316 */ 1317 @Test 1318 void testAllUntil1() throws Throwable { 1319 try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) { 1320 var subtasks = scope.join(); 1321 assertEquals(0, subtasks.count()); 1322 } 1323 } 1324 1325 /** 1326 * Test Joiner.allUntil(Predicate) with no cancellation. 1327 */ 1328 @ParameterizedTest 1329 @MethodSource("factories") 1330 void testAllUntil2(ThreadFactory factory) throws Exception { 1331 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false), 1332 cf -> cf.withThreadFactory(factory))) { 1333 1334 var subtask1 = scope.fork(() -> "foo"); 1335 var subtask2 = scope.fork(() -> { throw new FooException(); }); 1336 1337 var subtasks = scope.join().toList(); 1338 assertEquals(2, subtasks.size()); 1339 1340 assertTrue(subtasks.get(0) == subtask1); 1341 assertTrue(subtasks.get(1) == subtask2); 1342 assertEquals("foo", subtask1.get()); 1343 assertTrue(subtask2.exception() instanceof FooException); 1344 } 1345 } 1346 1347 /** 1348 * Test Joiner.allUntil(Predicate) with cancellation after one subtask completes. 1349 */ 1350 @ParameterizedTest 1351 @MethodSource("factories") 1352 void testAllUntil3(ThreadFactory factory) throws Exception { 1353 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> true), 1354 cf -> cf.withThreadFactory(factory))) { 1355 1356 var subtask1 = scope.fork(() -> "foo"); 1357 var subtask2 = scope.fork(() -> { 1358 Thread.sleep(Duration.ofDays(1)); 1359 return "bar"; 1360 }); 1361 1362 var subtasks = scope.join().toList(); 1363 1364 assertEquals(2, subtasks.size()); 1365 assertTrue(subtasks.get(0) == subtask1); 1366 assertTrue(subtasks.get(1) == subtask2); 1367 assertEquals("foo", subtask1.get()); 1368 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 1369 } 1370 } 1371 1372 /** 1373 * Test Joiner.allUntil(Predicate) with cancellation after serveral subtasks complete. 1374 */ 1375 @ParameterizedTest 1376 @MethodSource("factories") 1377 void testAllUntil4(ThreadFactory factory) throws Exception { 1378 1379 // cancel execution after two or more failures 1380 class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> { 1381 final AtomicInteger failedCount = new AtomicInteger(); 1382 @Override 1383 public boolean test(Subtask<? extends T> subtask) { 1384 return subtask.state() == Subtask.State.FAILED 1385 && failedCount.incrementAndGet() >= 2; 1386 } 1387 } 1388 var joiner = Joiner.allUntil(new CancelAfterTwoFailures<String>()); 1389 1390 try (var scope = StructuredTaskScope.open(joiner)) { 1391 int forkCount = 0; 1392 1393 // fork subtasks until execution cancelled 1394 while (!scope.isCancelled()) { 1395 scope.fork(() -> "foo"); 1396 scope.fork(() -> { throw new FooException(); }); 1397 forkCount += 2; 1398 Thread.sleep(Duration.ofMillis(10)); 1399 } 1400 1401 var subtasks = scope.join().toList(); 1402 assertEquals(forkCount, subtasks.size()); 1403 1404 long failedCount = subtasks.stream() 1405 .filter(s -> s.state() == Subtask.State.FAILED) 1406 .count(); 1407 assertTrue(failedCount >= 2); 1408 } 1409 } 1410 1411 /** 1412 * Test Config equals/hashCode/toString 1413 */ 1414 @Test 1415 void testConfigMethods() throws Exception { 1416 Function<Config, Config> testConfig = cf -> { 1417 var name = "duke"; 1418 var threadFactory = Thread.ofPlatform().factory(); 1419 var timeout = Duration.ofSeconds(10); 1420 1421 assertEquals(cf, cf); 1422 assertEquals(cf.withName(name), cf.withName(name)); 1423 assertEquals(cf.withThreadFactory(threadFactory), cf.withThreadFactory(threadFactory)); 1424 assertEquals(cf.withTimeout(timeout), cf.withTimeout(timeout)); 1425 1426 assertNotEquals(cf, cf.withName(name)); 1427 assertNotEquals(cf, cf.withThreadFactory(threadFactory)); 1428 assertNotEquals(cf, cf.withTimeout(timeout)); 1429 1430 assertEquals(cf.withName(name).hashCode(), cf.withName(name).hashCode()); 1431 assertEquals(cf.withThreadFactory(threadFactory).hashCode(), 1432 cf.withThreadFactory(threadFactory).hashCode()); 1433 assertEquals(cf.withTimeout(timeout).hashCode(), cf.withTimeout(timeout).hashCode()); 1434 1435 assertTrue(cf.withName(name).toString().contains(name)); 1436 assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString())); 1437 assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString())); 1438 1439 return cf; 1440 }; 1441 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), testConfig)) { 1442 // do nothing 1443 } 1444 } 1445 1446 /** 1447 * Test Joiner default methods. 1448 */ 1449 @Test 1450 void testJoinerDefaultMethods() throws Exception { 1451 try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) { 1452 1453 // need subtasks to test default methods 1454 var subtask1 = scope.fork(() -> "foo"); 1455 while (!scope.isCancelled()) { 1456 Thread.sleep(20); 1457 } 1458 var subtask2 = scope.fork(() -> "bar"); 1459 scope.join(); 1460 1461 assertEquals(Subtask.State.SUCCESS, subtask1.state()); 1462 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 1463 1464 // Joiner that does not override default methods 1465 Joiner<Object, Void> joiner = () -> null; 1466 assertThrows(NullPointerException.class, () -> joiner.onFork(null)); 1467 assertThrows(NullPointerException.class, () -> joiner.onComplete(null)); 1468 assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1)); 1469 assertFalse(joiner.onFork(subtask2)); 1470 assertFalse(joiner.onComplete(subtask1)); 1471 assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2)); 1472 } 1473 } 1474 1475 /** 1476 * Test for NullPointerException. 1477 */ 1478 @Test 1479 void testNulls() throws Exception { 1480 assertThrows(NullPointerException.class, 1481 () -> StructuredTaskScope.open(null)); 1482 assertThrows(NullPointerException.class, 1483 () -> StructuredTaskScope.open(null, cf -> cf)); 1484 assertThrows(NullPointerException.class, 1485 () -> StructuredTaskScope.open(Joiner.awaitAll(), null)); 1486 assertThrows(NullPointerException.class, 1487 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> null)); 1488 1489 assertThrows(NullPointerException.class, () -> Joiner.allUntil(null)); 1490 1491 // fork 1492 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 1493 assertThrows(NullPointerException.class, () -> scope.fork((Callable<Object>) null)); 1494 assertThrows(NullPointerException.class, () -> scope.fork((Runnable) null)); 1495 } 1496 1497 // withXXX 1498 assertThrows(NullPointerException.class, 1499 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withName(null))); 1500 assertThrows(NullPointerException.class, 1501 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withThreadFactory(null))); 1502 assertThrows(NullPointerException.class, 1503 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withTimeout(null))); 1504 1505 // Joiner.onFork/onComplete 1506 assertThrows(NullPointerException.class, 1507 () -> Joiner.awaitAllSuccessfulOrThrow().onFork(null)); 1508 assertThrows(NullPointerException.class, 1509 () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(null)); 1510 assertThrows(NullPointerException.class, 1511 () -> Joiner.awaitAll().onFork(null)); 1512 assertThrows(NullPointerException.class, 1513 () -> Joiner.awaitAll().onComplete(null)); 1514 assertThrows(NullPointerException.class, 1515 () -> Joiner.allSuccessfulOrThrow().onFork(null)); 1516 assertThrows(NullPointerException.class, 1517 () -> Joiner.allSuccessfulOrThrow().onComplete(null)); 1518 assertThrows(NullPointerException.class, 1519 () -> Joiner.anySuccessfulResultOrThrow().onFork(null)); 1520 assertThrows(NullPointerException.class, 1521 () -> Joiner.anySuccessfulResultOrThrow().onComplete(null)); 1522 } 1523 1524 /** 1525 * ThreadFactory that counts usage. 1526 */ 1527 private static class CountingThreadFactory implements ThreadFactory { 1528 final ThreadFactory delegate; 1529 final AtomicInteger threadCount = new AtomicInteger(); 1530 CountingThreadFactory(ThreadFactory delegate) { 1531 this.delegate = delegate; 1532 } 1533 @Override 1534 public Thread newThread(Runnable task) { 1535 threadCount.incrementAndGet(); 1536 return delegate.newThread(task); 1537 } 1538 int threadCount() { 1539 return threadCount.get(); 1540 } 1541 } 1542 1543 /** 1544 * A joiner that counts that counts the number of subtasks that are forked and the 1545 * number of subtasks that complete. 1546 */ 1547 private static class CountingJoiner<T> implements Joiner<T, Void> { 1548 final AtomicInteger onForkCount = new AtomicInteger(); 1549 final AtomicInteger onCompleteCount = new AtomicInteger(); 1550 @Override 1551 public boolean onFork(Subtask<? extends T> subtask) { 1552 onForkCount.incrementAndGet(); 1553 return false; 1554 } 1555 @Override 1556 public boolean onComplete(Subtask<? extends T> subtask) { 1557 onCompleteCount.incrementAndGet(); 1558 return false; 1559 } 1560 @Override 1561 public Void result() { 1562 return null; 1563 } 1564 int onForkCount() { 1565 return onForkCount.get(); 1566 } 1567 int onCompleteCount() { 1568 return onCompleteCount.get(); 1569 } 1570 } 1571 1572 /** 1573 * A joiner that cancels execution when a subtask completes. It also keeps a count 1574 * of the number of subtasks that are forked and the number of subtasks that complete. 1575 */ 1576 private static class CancelAfterOneJoiner<T> implements Joiner<T, Void> { 1577 final AtomicInteger onForkCount = new AtomicInteger(); 1578 final AtomicInteger onCompleteCount = new AtomicInteger(); 1579 @Override 1580 public boolean onFork(Subtask<? extends T> subtask) { 1581 onForkCount.incrementAndGet(); 1582 return false; 1583 } 1584 @Override 1585 public boolean onComplete(Subtask<? extends T> subtask) { 1586 onCompleteCount.incrementAndGet(); 1587 return true; 1588 } 1589 @Override 1590 public Void result() { 1591 return null; 1592 } 1593 int onForkCount() { 1594 return onForkCount.get(); 1595 } 1596 int onCompleteCount() { 1597 return onCompleteCount.get(); 1598 } 1599 } 1600 1601 /** 1602 * A runtime exception for tests. 1603 */ 1604 private static class FooException extends RuntimeException { 1605 FooException() { } 1606 FooException(Throwable cause) { super(cause); } 1607 } 1608 1609 /** 1610 * Returns the current time in milliseconds. 1611 */ 1612 private long millisTime() { 1613 long now = System.nanoTime(); 1614 return TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS); 1615 } 1616 1617 /** 1618 * Check the duration of a task 1619 * @param start start time, in milliseconds 1620 * @param min minimum expected duration, in milliseconds 1621 * @param max maximum expected duration, in milliseconds 1622 * @return the duration (now - start), in milliseconds 1623 */ 1624 private long expectDuration(long start, long min, long max) { 1625 long duration = millisTime() - start; 1626 assertTrue(duration >= min, 1627 "Duration " + duration + "ms, expected >= " + min + "ms"); 1628 assertTrue(duration <= max, 1629 "Duration " + duration + "ms, expected <= " + max + "ms"); 1630 return duration; 1631 } 1632 1633 /** 1634 * Interrupts a thread when it waits (timed or untimed) at location "{@code c.m}". 1635 * {@code c} is the fully qualified class name and {@code m} is the method name. 1636 */ 1637 private void interruptThreadAt(Thread target, String location) throws InterruptedException { 1638 int index = location.lastIndexOf('.'); 1639 String className = location.substring(0, index); 1640 String methodName = location.substring(index + 1); 1641 1642 boolean found = false; 1643 while (!found) { 1644 Thread.State state = target.getState(); 1645 assertTrue(state != TERMINATED); 1646 if ((state == WAITING || state == TIMED_WAITING) 1647 && contains(target.getStackTrace(), className, methodName)) { 1648 found = true; 1649 } else { 1650 Thread.sleep(20); 1651 } 1652 } 1653 target.interrupt(); 1654 } 1655 1656 /** 1657 * Schedules the current thread to be interrupted when it waits (timed or untimed) 1658 * at the given location. 1659 */ 1660 private void scheduleInterruptAt(String location) { 1661 Thread target = Thread.currentThread(); 1662 scheduler.submit(() -> { 1663 interruptThreadAt(target, location); 1664 return null; 1665 }); 1666 } 1667 1668 /** 1669 * Returns true if the given stack trace contains an element for the given class 1670 * and method name. 1671 */ 1672 private boolean contains(StackTraceElement[] stack, String className, String methodName) { 1673 return Arrays.stream(stack) 1674 .anyMatch(e -> className.equals(e.getClassName()) 1675 && methodName.equals(e.getMethodName())); 1676 } 1677 }