1 /* 2 * Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test id=platform 26 * @bug 8284199 8296779 8306647 27 * @summary Basic tests for StructuredTaskScope 28 * @enablePreview 29 * @run junit/othervm -DthreadFactory=platform StructuredTaskScopeTest 30 */ 31 32 /* 33 * @test id=virtual 34 * @enablePreview 35 * @run junit/othervm -DthreadFactory=virtual StructuredTaskScopeTest 36 */ 37 38 import java.time.Duration; 39 import java.io.IOException; 40 import java.time.Instant; 41 import java.util.Arrays; 42 import java.util.ArrayList; 43 import java.util.List; 44 import java.util.Set; 45 import java.util.concurrent.Callable; 46 import java.util.concurrent.ConcurrentHashMap; 47 import java.util.concurrent.CountDownLatch; 48 import java.util.concurrent.Executors; 49 import java.util.concurrent.ExecutionException; 50 import java.util.concurrent.Future; 51 import java.util.concurrent.ThreadFactory; 52 import java.util.concurrent.TimeoutException; 53 import java.util.concurrent.TimeUnit; 54 import java.util.concurrent.RejectedExecutionException; 55 import java.util.concurrent.ScheduledExecutorService; 56 import java.util.concurrent.StructuredTaskScope; 57 import java.util.concurrent.StructuredTaskScope.Subtask; 58 import java.util.concurrent.StructuredTaskScope.ShutdownOnSuccess; 59 import java.util.concurrent.StructuredTaskScope.ShutdownOnFailure; 60 import java.util.concurrent.StructureViolationException; 61 import java.util.concurrent.atomic.AtomicBoolean; 62 import java.util.concurrent.atomic.AtomicInteger; 63 import java.util.concurrent.atomic.AtomicReference; 64 import java.util.function.Supplier; 65 import java.util.stream.Stream; 66 import static java.lang.Thread.State.*; 67 68 import org.junit.jupiter.api.Test; 69 import org.junit.jupiter.api.BeforeAll; 70 import org.junit.jupiter.api.AfterAll; 71 import org.junit.jupiter.params.ParameterizedTest; 72 import org.junit.jupiter.params.provider.MethodSource; 73 import static org.junit.jupiter.api.Assertions.*; 74 75 class StructuredTaskScopeTest { 76 private static ScheduledExecutorService scheduler; 77 private static List<ThreadFactory> threadFactories; 78 79 @BeforeAll 80 static void setup() throws Exception { 81 scheduler = Executors.newSingleThreadScheduledExecutor(); 82 83 // thread factories 84 String value = System.getProperty("threadFactory"); 85 List<ThreadFactory> list = new ArrayList<>(); 86 if (value == null || value.equals("platform")) 87 list.add(Thread.ofPlatform().factory()); 88 if (value == null || value.equals("virtual")) 89 list.add(Thread.ofVirtual().factory()); 90 assertTrue(list.size() > 0, "No thread factories for tests"); 91 threadFactories = list; 92 } 93 94 @AfterAll 95 static void shutdown() { 96 scheduler.shutdown(); 97 } 98 99 private static Stream<ThreadFactory> factories() { 100 return threadFactories.stream(); 101 } 102 103 /** 104 * Test that fork creates a new thread for each task. 105 */ 106 @ParameterizedTest 107 @MethodSource("factories") 108 void testForkCreatesThread(ThreadFactory factory) throws Exception { 109 Set<Long> tids = ConcurrentHashMap.newKeySet(); 110 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 111 for (int i = 0; i < 100; i++) { 112 scope.fork(() -> { 113 tids.add(Thread.currentThread().threadId()); 114 return null; 115 }); 116 } 117 scope.join(); 118 } 119 assertEquals(100, tids.size()); 120 } 121 122 /** 123 * Test that fork creates a new virtual thread for each task. 124 */ 125 @Test 126 void testForkCreateVirtualThread() throws Exception { 127 Set<Thread> threads = ConcurrentHashMap.newKeySet(); 128 try (var scope = new StructuredTaskScope<Object>()) { 129 for (int i = 0; i < 100; i++) { 130 scope.fork(() -> { 131 threads.add(Thread.currentThread()); 132 return null; 133 }); 134 } 135 scope.join(); 136 } 137 assertEquals(100, threads.size()); 138 threads.forEach(t -> assertTrue(t.isVirtual())); 139 } 140 141 /** 142 * Test that fork creates a new thread with the given thread factory. 143 */ 144 @ParameterizedTest 145 @MethodSource("factories") 146 void testForkUsesFactory(ThreadFactory factory) throws Exception { 147 var count = new AtomicInteger(); 148 ThreadFactory countingFactory = task -> { 149 count.incrementAndGet(); 150 return factory.newThread(task); 151 }; 152 try (var scope = new StructuredTaskScope<Object>(null, countingFactory)) { 153 for (int i = 0; i < 100; i++) { 154 scope.fork(() -> null); 155 } 156 scope.join(); 157 } 158 assertEquals(100, count.get()); 159 } 160 161 /** 162 * Test fork is confined to threads in the scope "tree". 163 */ 164 @ParameterizedTest 165 @MethodSource("factories") 166 void testForkConfined(ThreadFactory factory) throws Exception { 167 try (var scope1 = new StructuredTaskScope<Boolean>(); 168 var scope2 = new StructuredTaskScope<Boolean>()) { 169 170 // thread in scope1 cannot fork thread in scope2 171 Subtask<Boolean> subtask1 = scope1.fork(() -> { 172 assertThrows(WrongThreadException.class, () -> { 173 scope2.fork(() -> null); 174 }); 175 return true; 176 }); 177 178 // thread in scope2 can fork thread in scope1 179 Subtask<Boolean> subtask2 = scope2.fork(() -> { 180 scope1.fork(() -> null); 181 return true; 182 }); 183 184 scope2.join(); 185 scope1.join(); 186 187 assertTrue(subtask1.get()); 188 assertTrue(subtask2.get()); 189 190 // random thread cannot fork 191 try (var pool = Executors.newSingleThreadExecutor()) { 192 Future<Void> future = pool.submit(() -> { 193 assertThrows(WrongThreadException.class, () -> { 194 scope1.fork(() -> null); 195 }); 196 assertThrows(WrongThreadException.class, () -> { 197 scope2.fork(() -> null); 198 }); 199 return null; 200 }); 201 future.get(); 202 } 203 } 204 } 205 206 /** 207 * Test fork after join completes. 208 */ 209 @ParameterizedTest 210 @MethodSource("factories") 211 void testForkAfterJoin(ThreadFactory factory) throws Exception { 212 try (var scope = new StructuredTaskScope<String>(null, factory)) { 213 // round 1 214 var subtask1 = scope.fork(() -> "foo"); 215 assertThrows(IllegalStateException.class, subtask1::get); 216 scope.join(); 217 assertEquals("foo", subtask1.get()); 218 219 // round 2 220 var subtask2 = scope.fork(() -> "bar"); 221 assertEquals("foo", subtask1.get()); 222 assertThrows(IllegalStateException.class, subtask2::get); 223 scope.join(); 224 assertEquals("foo", subtask1.get()); 225 assertEquals("bar", subtask2.get()); 226 227 // round 3 228 var subtask3 = scope.fork(() -> "baz"); 229 assertEquals("foo", subtask1.get()); 230 assertEquals("bar", subtask2.get()); 231 assertThrows(IllegalStateException.class, subtask3::get); 232 scope.join(); 233 assertEquals("foo", subtask1.get()); 234 assertEquals("bar", subtask2.get()); 235 assertEquals("baz", subtask3.get()); 236 } 237 } 238 239 /** 240 * Test fork after join throws. 241 */ 242 @ParameterizedTest 243 @MethodSource("factories") 244 void testForkAfterJoinThrows(ThreadFactory factory) throws Exception { 245 try (var scope = new StructuredTaskScope<String>(null, factory)) { 246 var latch = new CountDownLatch(1); 247 var subtask1 = scope.fork(() -> { 248 latch.await(); 249 return "foo"; 250 }); 251 252 // join throws 253 Thread.currentThread().interrupt(); 254 assertThrows(InterruptedException.class, scope::join); 255 256 // allow subtask1 to finish 257 latch.countDown(); 258 259 // continue to fork 260 var subtask2 = scope.fork(() -> "bar"); 261 assertThrows(IllegalStateException.class, subtask1::get); 262 assertThrows(IllegalStateException.class, subtask2::get); 263 scope.join(); 264 assertEquals("foo", subtask1.get()); 265 assertEquals("bar", subtask2.get()); 266 } 267 } 268 269 /** 270 * Test fork after scope is shutdown. 271 */ 272 @ParameterizedTest 273 @MethodSource("factories") 274 void testForkAfterShutdown(ThreadFactory factory) throws Exception { 275 var executed = new AtomicBoolean(); 276 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 277 scope.shutdown(); 278 Subtask<String> subtask = scope.fork(() -> { 279 executed.set(true); 280 return null; 281 }); 282 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 283 assertThrows(IllegalStateException.class, subtask::get); 284 assertThrows(IllegalStateException.class, subtask::exception); 285 } 286 assertFalse(executed.get()); 287 } 288 289 /** 290 * Test fork after scope is closed. 291 */ 292 @ParameterizedTest 293 @MethodSource("factories") 294 void testForkAfterClose(ThreadFactory factory) throws Exception { 295 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 296 scope.close(); 297 assertThrows(IllegalStateException.class, () -> scope.fork(() -> null)); 298 } 299 } 300 301 /** 302 * Test fork when the thread factory rejects creating a thread. 303 */ 304 @Test 305 void testForkRejectedExecutionException() throws Exception { 306 ThreadFactory factory = task -> null; 307 try (var scope = new StructuredTaskScope(null, factory)) { 308 assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null)); 309 scope.join(); 310 } 311 } 312 313 /** 314 * Test join with no subtasks. 315 */ 316 @Test 317 void testJoinWithNoSubtasks() throws Exception { 318 try (var scope = new StructuredTaskScope()) { 319 scope.join(); 320 } 321 } 322 323 /** 324 * Test join with unfinished subtasks. 325 */ 326 @ParameterizedTest 327 @MethodSource("factories") 328 void testJoinWithSubtasks(ThreadFactory factory) throws Exception { 329 try (var scope = new StructuredTaskScope(null, factory)) { 330 Subtask<String> subtask = scope.fork(() -> { 331 Thread.sleep(Duration.ofMillis(50)); 332 return "foo"; 333 }); 334 scope.join(); 335 assertEquals("foo", subtask.get()); 336 } 337 } 338 339 /** 340 * Test join is owner confined. 341 */ 342 @ParameterizedTest 343 @MethodSource("factories") 344 void testJoinConfined(ThreadFactory factory) throws Exception { 345 try (var scope = new StructuredTaskScope<Boolean>()) { 346 347 // thread in scope cannot join 348 Subtask<Boolean> subtask = scope.fork(() -> { 349 assertThrows(WrongThreadException.class, () -> { scope.join(); }); 350 return true; 351 }); 352 353 scope.join(); 354 355 assertTrue(subtask.get()); 356 357 // random thread cannot join 358 try (var pool = Executors.newSingleThreadExecutor()) { 359 Future<Void> future = pool.submit(() -> { 360 assertThrows(WrongThreadException.class, scope::join); 361 return null; 362 }); 363 future.get(); 364 } 365 } 366 } 367 368 /** 369 * Test join with interrupt status set. 370 */ 371 @ParameterizedTest 372 @MethodSource("factories") 373 void testInterruptJoin1(ThreadFactory factory) throws Exception { 374 try (var scope = new StructuredTaskScope(null, factory)) { 375 var latch = new CountDownLatch(1); 376 377 Subtask<String> subtask = scope.fork(() -> { 378 latch.await(); 379 return "foo"; 380 }); 381 382 // join should throw 383 Thread.currentThread().interrupt(); 384 try { 385 scope.join(); 386 fail("join did not throw"); 387 } catch (InterruptedException expected) { 388 assertFalse(Thread.interrupted()); // interrupt status should be clear 389 } finally { 390 // let task continue 391 latch.countDown(); 392 } 393 394 // join should complete 395 scope.join(); 396 assertEquals("foo", subtask.get()); 397 } 398 } 399 400 /** 401 * Test interrupt of thread blocked in join. 402 */ 403 @ParameterizedTest 404 @MethodSource("factories") 405 void testInterruptJoin2(ThreadFactory factory) throws Exception { 406 try (var scope = new StructuredTaskScope(null, factory)) { 407 var latch = new CountDownLatch(1); 408 Subtask<String> subtask = scope.fork(() -> { 409 latch.await(); 410 return "foo"; 411 }); 412 413 // join should throw 414 scheduleInterruptAt("java.util.concurrent.StructuredTaskScope.join"); 415 try { 416 scope.join(); 417 fail("join did not throw"); 418 } catch (InterruptedException expected) { 419 assertFalse(Thread.interrupted()); // interrupt status should be clear 420 } finally { 421 // let task continue 422 latch.countDown(); 423 } 424 425 // join should complete 426 scope.join(); 427 assertEquals("foo", subtask.get()); 428 } 429 } 430 431 /** 432 * Test join when scope is shutdown. 433 */ 434 @ParameterizedTest 435 @MethodSource("factories") 436 void testJoinWithShutdown1(ThreadFactory factory) throws Exception { 437 try (var scope = new StructuredTaskScope<String>(null, factory)) { 438 var interrupted = new CountDownLatch(1); 439 var finish = new CountDownLatch(1); 440 441 Subtask<String> subtask = scope.fork(() -> { 442 try { 443 Thread.sleep(Duration.ofDays(1)); 444 } catch (InterruptedException e) { 445 interrupted.countDown(); 446 } 447 finish.await(); 448 return "foo"; 449 }); 450 451 scope.shutdown(); // should interrupt task 452 453 interrupted.await(); 454 455 scope.join(); 456 457 // signal task to finish 458 finish.countDown(); 459 } 460 } 461 462 /** 463 * Test shutdown when owner is blocked in join. 464 */ 465 @ParameterizedTest 466 @MethodSource("factories") 467 void testJoinWithShutdown2(ThreadFactory factory) throws Exception { 468 class MyScope<T> extends StructuredTaskScope<T> { 469 MyScope(ThreadFactory factory) { 470 super(null, factory); 471 } 472 @Override 473 protected void handleComplete(Subtask<? extends T> subtask) { 474 shutdown(); 475 } 476 } 477 478 try (var scope = new MyScope<String>(factory)) { 479 Subtask<String> subtask1 = scope.fork(() -> { 480 Thread.sleep(Duration.ofMillis(50)); 481 return "foo"; 482 }); 483 Subtask<String> subtask2 = scope.fork(() -> { 484 Thread.sleep(Duration.ofDays(1)); 485 return "bar"; 486 }); 487 488 // join should wakeup when shutdown is called 489 scope.join(); 490 491 // task1 should have completed successfully 492 assertEquals(Subtask.State.SUCCESS, subtask1.state()); 493 assertEquals("foo", subtask1.get()); 494 assertThrows(IllegalStateException.class, subtask1::exception); 495 496 // task2 result/exception not available 497 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 498 assertThrows(IllegalStateException.class, subtask2::get); 499 assertThrows(IllegalStateException.class, subtask2::exception); 500 } 501 } 502 503 /** 504 * Test join after scope is closed. 505 */ 506 @Test 507 void testJoinAfterClose() throws Exception { 508 try (var scope = new StructuredTaskScope()) { 509 scope.join(); 510 scope.close(); 511 assertThrows(IllegalStateException.class, () -> scope.join()); 512 assertThrows(IllegalStateException.class, () -> scope.joinUntil(Instant.now())); 513 } 514 } 515 516 /** 517 * Test joinUntil, subtasks finish before deadline expires. 518 */ 519 @ParameterizedTest 520 @MethodSource("factories") 521 void testJoinUntil1(ThreadFactory factory) throws Exception { 522 try (var scope = new StructuredTaskScope<String>(null, factory)) { 523 Subtask<String> subtask = scope.fork(() -> { 524 try { 525 Thread.sleep(Duration.ofSeconds(2)); 526 } catch (InterruptedException e) { } 527 return "foo"; 528 }); 529 530 long startMillis = millisTime(); 531 scope.joinUntil(Instant.now().plusSeconds(30)); 532 expectDuration(startMillis, /*min*/1900, /*max*/20_000); 533 assertEquals("foo", subtask.get()); 534 } 535 } 536 537 /** 538 * Test joinUntil, deadline expires before subtasks finish. 539 */ 540 @ParameterizedTest 541 @MethodSource("factories") 542 void testJoinUntil2(ThreadFactory factory) throws Exception { 543 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 544 Subtask<Void> subtask = scope.fork(() -> { 545 Thread.sleep(Duration.ofDays(1)); 546 return null; 547 }); 548 549 long startMillis = millisTime(); 550 try { 551 scope.joinUntil(Instant.now().plusSeconds(2)); 552 } catch (TimeoutException e) { 553 expectDuration(startMillis, /*min*/1900, /*max*/20_000); 554 } 555 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 556 } 557 } 558 559 /** 560 * Test joinUntil many times. 561 */ 562 @ParameterizedTest 563 @MethodSource("factories") 564 void testJoinUntil3(ThreadFactory factory) throws Exception { 565 try (var scope = new StructuredTaskScope<String>(null, factory)) { 566 Subtask<String> subtask = scope.fork(() -> { 567 Thread.sleep(Duration.ofDays(1)); 568 return null; 569 }); 570 571 for (int i = 0; i < 3; i++) { 572 try { 573 scope.joinUntil(Instant.now().plusMillis(50)); 574 fail("joinUntil did not throw"); 575 } catch (TimeoutException expected) { 576 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 577 } 578 } 579 } 580 } 581 582 /** 583 * Test joinUntil with a deadline that has already expired. 584 */ 585 @ParameterizedTest 586 @MethodSource("factories") 587 void testJoinUntil4(ThreadFactory factory) throws Exception { 588 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 589 Subtask<Void> subtask = scope.fork(() -> { 590 Thread.sleep(Duration.ofDays(1)); 591 return null; 592 }); 593 594 // now 595 try { 596 scope.joinUntil(Instant.now()); 597 fail("joinUntil did not throw"); 598 } catch (TimeoutException expected) { 599 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 600 } 601 602 // in the past 603 try { 604 scope.joinUntil(Instant.now().minusSeconds(1)); 605 fail("joinUntil did not throw"); 606 } catch (TimeoutException expected) { 607 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 608 } 609 } 610 } 611 612 /** 613 * Test joinUntil with interrupt status set. 614 */ 615 @ParameterizedTest 616 @MethodSource("factories") 617 void testInterruptJoinUntil1(ThreadFactory factory) throws Exception { 618 try (var scope = new StructuredTaskScope<String>(null, factory)) { 619 var latch = new CountDownLatch(1); 620 621 Subtask<String> subtask = scope.fork(() -> { 622 latch.await(); 623 return "foo"; 624 }); 625 626 // joinUntil should throw 627 Thread.currentThread().interrupt(); 628 try { 629 scope.joinUntil(Instant.now().plusSeconds(30)); 630 fail("joinUntil did not throw"); 631 } catch (InterruptedException expected) { 632 assertFalse(Thread.interrupted()); // interrupt status should be clear 633 } finally { 634 // let task continue 635 latch.countDown(); 636 } 637 638 // join should complete 639 scope.join(); 640 assertEquals("foo", subtask.get()); 641 } 642 } 643 644 /** 645 * Test interrupt of thread blocked in joinUntil. 646 */ 647 @ParameterizedTest 648 @MethodSource("factories") 649 void testInterruptJoinUntil2(ThreadFactory factory) throws Exception { 650 try (var scope = new StructuredTaskScope(null, factory)) { 651 var latch = new CountDownLatch(1); 652 653 Subtask<String> subtask = scope.fork(() -> { 654 latch.await(); 655 return "foo"; 656 }); 657 658 // joinUntil should throw 659 scheduleInterruptAt("java.util.concurrent.StructuredTaskScope.joinUntil"); 660 try { 661 scope.joinUntil(Instant.now().plusSeconds(30)); 662 fail("joinUntil did not throw"); 663 } catch (InterruptedException expected) { 664 assertFalse(Thread.interrupted()); // interrupt status should be clear 665 } finally { 666 // let task continue 667 latch.countDown(); 668 } 669 670 // join should complete 671 scope.join(); 672 assertEquals("foo", subtask.get()); 673 } 674 } 675 676 /** 677 * Test that shutdown prevents new threads from starting. 678 */ 679 @Test 680 void testShutdownWithFork() throws Exception { 681 ThreadFactory factory = task -> null; 682 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 683 scope.shutdown(); 684 // should not invoke the ThreadFactory to create thread 685 Subtask<Void> subtask = scope.fork(() -> null); 686 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 687 assertThrows(IllegalStateException.class, subtask::get); 688 assertThrows(IllegalStateException.class, subtask::exception); 689 } 690 } 691 692 /** 693 * Test that shutdown interrupts unfinished subtasks. 694 */ 695 @ParameterizedTest 696 @MethodSource("factories") 697 void testShutdownInterruptsThreads1(ThreadFactory factory) throws Exception { 698 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 699 var interrupted = new AtomicBoolean(); 700 var latch = new CountDownLatch(1); 701 var subtask = scope.fork(() -> { 702 try { 703 Thread.sleep(Duration.ofDays(1)); 704 } catch (InterruptedException e) { 705 interrupted.set(true); 706 } finally { 707 latch.countDown(); 708 } 709 return null; 710 }); 711 712 scope.shutdown(); 713 714 // wait for task to complete 715 latch.await(); 716 assertTrue(interrupted.get()); 717 718 scope.join(); 719 720 // subtask result/exception not available 721 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 722 assertThrows(IllegalStateException.class, subtask::get); 723 assertThrows(IllegalStateException.class, subtask::exception); 724 } 725 } 726 727 /** 728 * Test that shutdown does not interrupt current thread. 729 */ 730 @ParameterizedTest 731 @MethodSource("factories") 732 void testShutdownInterruptsThreads2(ThreadFactory factory) throws Exception { 733 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 734 var interrupted = new AtomicBoolean(); 735 var latch = new CountDownLatch(1); 736 var subtask = scope.fork(() -> { 737 try { 738 scope.shutdown(); 739 interrupted.set(Thread.currentThread().isInterrupted()); 740 } finally { 741 latch.countDown(); 742 } 743 return null; 744 }); 745 746 // wait for task to complete 747 latch.await(); 748 assertFalse(interrupted.get()); 749 750 scope.join(); 751 } 752 } 753 754 /** 755 * Test shutdown wakes join. 756 */ 757 @ParameterizedTest 758 @MethodSource("factories") 759 void testShutdownWakesJoin(ThreadFactory factory) throws Exception { 760 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 761 var latch = new CountDownLatch(1); 762 scope.fork(() -> { 763 Thread.sleep(Duration.ofMillis(100)); // give time for join to block 764 scope.shutdown(); 765 latch.await(); 766 return null; 767 }); 768 769 scope.join(); 770 771 // join woke up, allow task to complete 772 latch.countDown(); 773 } 774 } 775 776 /** 777 * Test shutdown after scope is closed. 778 */ 779 @Test 780 void testShutdownAfterClose() throws Exception { 781 try (var scope = new StructuredTaskScope<Object>()) { 782 scope.join(); 783 scope.close(); 784 assertThrows(IllegalStateException.class, scope::shutdown); 785 } 786 } 787 788 /** 789 * Test shutdown is confined to threads in the scope "tree". 790 */ 791 @ParameterizedTest 792 @MethodSource("factories") 793 void testShutdownConfined(ThreadFactory factory) throws Exception { 794 try (var scope1 = new StructuredTaskScope<Boolean>(); 795 var scope2 = new StructuredTaskScope<Boolean>()) { 796 797 // thread in scope1 cannot shutdown scope2 798 Subtask<Boolean> subtask1 = scope1.fork(() -> { 799 assertThrows(WrongThreadException.class, scope2::shutdown); 800 return true; 801 }); 802 803 // wait for task in scope1 to complete to avoid racing with task in scope2 804 while (subtask1.state() == Subtask.State.UNAVAILABLE) { 805 Thread.sleep(10); 806 } 807 808 // thread in scope2 shutdown scope1 809 Subtask<Boolean> subtask2 = scope2.fork(() -> { 810 scope1.shutdown(); 811 return true; 812 }); 813 814 scope2.join(); 815 scope1.join(); 816 817 assertTrue(subtask1.get()); 818 assertTrue(subtask1.get()); 819 820 // random thread cannot shutdown 821 try (var pool = Executors.newSingleThreadExecutor()) { 822 Future<Void> future = pool.submit(() -> { 823 assertThrows(WrongThreadException.class, scope1::shutdown); 824 assertThrows(WrongThreadException.class, scope2::shutdown); 825 return null; 826 }); 827 future.get(); 828 } 829 } 830 } 831 832 /** 833 * Test isShutdown. 834 */ 835 @Test 836 void testIsShutdown() { 837 try (var scope = new StructuredTaskScope<Object>()) { 838 assertFalse(scope.isShutdown()); // before shutdown 839 scope.shutdown(); 840 assertTrue(scope.isShutdown()); // after shutdown 841 scope.close(); 842 assertTrue(scope.isShutdown()); // after cose 843 } 844 } 845 846 /** 847 * Test close without join, no subtasks forked. 848 */ 849 @Test 850 void testCloseWithoutJoin1() { 851 try (var scope = new StructuredTaskScope<Object>()) { 852 // do nothing 853 } 854 } 855 856 /** 857 * Test close without join, unfinished subtasks. 858 */ 859 @ParameterizedTest 860 @MethodSource("factories") 861 void testCloseWithoutJoin2(ThreadFactory factory) { 862 try (var scope = new StructuredTaskScope<String>(null, factory)) { 863 Subtask<String> subtask = scope.fork(() -> { 864 Thread.sleep(Duration.ofDays(1)); 865 return null; 866 }); 867 assertThrows(IllegalStateException.class, scope::close); 868 869 // subtask result/exception not available 870 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 871 assertThrows(IllegalStateException.class, subtask::get); 872 assertThrows(IllegalStateException.class, subtask::exception); 873 } 874 } 875 876 /** 877 * Test close without join, unfinished subtasks forked after join. 878 */ 879 @ParameterizedTest 880 @MethodSource("factories") 881 void testCloseWithoutJoin3(ThreadFactory factory) throws Exception { 882 try (var scope = new StructuredTaskScope(null, factory)) { 883 scope.fork(() -> "foo"); 884 scope.join(); 885 886 Subtask<String> subtask = scope.fork(() -> { 887 Thread.sleep(Duration.ofDays(1)); 888 return null; 889 }); 890 assertThrows(IllegalStateException.class, scope::close); 891 892 // subtask result/exception not available 893 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 894 assertThrows(IllegalStateException.class, subtask::get); 895 assertThrows(IllegalStateException.class, subtask::exception); 896 } 897 } 898 899 /** 900 * Test close after join throws. Close should not throw as join attempted. 901 */ 902 @ParameterizedTest 903 @MethodSource("factories") 904 void testCloseAfterJoinThrows(ThreadFactory factory) throws Exception { 905 try (var scope = new StructuredTaskScope<Object>()) { 906 var subtask = scope.fork(() -> { 907 Thread.sleep(Duration.ofDays(1)); 908 return null; 909 }); 910 911 // join throws 912 Thread.currentThread().interrupt(); 913 assertThrows(InterruptedException.class, scope::join); 914 assertThrows(IllegalStateException.class, subtask::get); 915 } 916 } 917 918 /** 919 * Test close after joinUntil throws. Close should not throw as join attempted. 920 */ 921 @ParameterizedTest 922 @MethodSource("factories") 923 void testCloseAfterJoinUntilThrows(ThreadFactory factory) throws Exception { 924 try (var scope = new StructuredTaskScope<Object>()) { 925 var subtask = scope.fork(() -> { 926 Thread.sleep(Duration.ofDays(1)); 927 return null; 928 }); 929 930 // joinUntil throws 931 assertThrows(TimeoutException.class, () -> scope.joinUntil(Instant.now())); 932 assertThrows(IllegalStateException.class, subtask::get); 933 } 934 } 935 936 /** 937 * Test close is owner confined. 938 */ 939 @ParameterizedTest 940 @MethodSource("factories") 941 void testCloseConfined(ThreadFactory factory) throws Exception { 942 try (var scope = new StructuredTaskScope<Boolean>()) { 943 944 // attempt to close from thread in scope 945 Subtask<Boolean> subtask = scope.fork(() -> { 946 assertThrows(WrongThreadException.class, scope::close); 947 return true; 948 }); 949 950 scope.join(); 951 assertTrue(subtask.get()); 952 953 // random thread cannot close scope 954 try (var pool = Executors.newCachedThreadPool(factory)) { 955 Future<Boolean> future = pool.submit(() -> { 956 assertThrows(WrongThreadException.class, scope::close); 957 return null; 958 }); 959 future.get(); 960 } 961 } 962 } 963 964 /** 965 * Test close with interrupt status set. 966 */ 967 @ParameterizedTest 968 @MethodSource("factories") 969 void testInterruptClose1(ThreadFactory factory) throws Exception { 970 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 971 var done = new AtomicBoolean(); 972 scope.fork(() -> { 973 try { 974 Thread.sleep(Duration.ofDays(1)); 975 } catch (InterruptedException e) { 976 // interrupted by shutdown, expected 977 } 978 Thread.sleep(Duration.ofMillis(100)); // force close to wait 979 done.set(true); 980 return null; 981 }); 982 983 scope.shutdown(); 984 scope.join(); 985 986 // invoke close with interrupt status set 987 Thread.currentThread().interrupt(); 988 try { 989 scope.close(); 990 } finally { 991 assertTrue(Thread.interrupted()); // clear interrupt status 992 assertTrue(done.get()); 993 } 994 } 995 } 996 997 /** 998 * Test interrupting thread waiting in close. 999 */ 1000 @ParameterizedTest 1001 @MethodSource("factories") 1002 void testInterruptClose2(ThreadFactory factory) throws Exception { 1003 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 1004 var done = new AtomicBoolean(); 1005 Thread mainThread = Thread.currentThread(); 1006 scope.fork(() -> { 1007 try { 1008 Thread.sleep(Duration.ofDays(1)); 1009 } catch (InterruptedException e) { 1010 // interrupted by shutdown, expected 1011 } 1012 1013 // interrupt main thread when it blocks in close 1014 interruptThreadAt(mainThread, "java.util.concurrent.StructuredTaskScope.close"); 1015 1016 Thread.sleep(Duration.ofMillis(100)); // force close to wait 1017 done.set(true); 1018 return null; 1019 }); 1020 1021 scope.shutdown(); // interrupts task 1022 scope.join(); 1023 try { 1024 scope.close(); 1025 } finally { 1026 assertTrue(Thread.interrupted()); // clear interrupt status 1027 assertTrue(done.get()); 1028 } 1029 } 1030 } 1031 1032 /** 1033 * Test that closing an enclosing scope closes the thread flock of a nested scope. 1034 */ 1035 @Test 1036 void testCloseThrowsStructureViolation() throws Exception { 1037 try (var scope1 = new StructuredTaskScope<Object>()) { 1038 try (var scope2 = new StructuredTaskScope<Object>()) { 1039 1040 // join + close enclosing scope 1041 scope1.join(); 1042 try { 1043 scope1.close(); 1044 fail("close did not throw"); 1045 } catch (StructureViolationException expected) { } 1046 1047 // underlying flock should be closed, fork should return a cancelled task 1048 var executed = new AtomicBoolean(); 1049 Subtask<Void> subtask = scope2.fork(() -> { 1050 executed.set(true); 1051 return null; 1052 }); 1053 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1054 scope2.join(); 1055 assertFalse(executed.get()); 1056 } 1057 } 1058 } 1059 1060 /** 1061 * A StructuredTaskScope that collects the subtasks notified to the handleComplete method. 1062 */ 1063 private static class CollectAll<T> extends StructuredTaskScope<T> { 1064 private final Set<Subtask<? extends T>> subtasks = ConcurrentHashMap.newKeySet(); 1065 1066 CollectAll(ThreadFactory factory) { 1067 super(null, factory); 1068 } 1069 1070 @Override 1071 protected void handleComplete(Subtask<? extends T> subtask) { 1072 subtasks.add(subtask); 1073 } 1074 1075 Set<Subtask<? extends T>> subtasks() { 1076 return subtasks; 1077 } 1078 1079 Subtask<? extends T> find(Callable<T> task) { 1080 return subtasks.stream() 1081 .filter(h -> task.equals(h.task())) 1082 .findAny() 1083 .orElseThrow(); 1084 } 1085 } 1086 1087 /** 1088 * Test that handleComplete method is invoked for tasks that complete before shutdown. 1089 */ 1090 @ParameterizedTest 1091 @MethodSource("factories") 1092 void testHandleCompleteBeforeShutdown(ThreadFactory factory) throws Exception { 1093 try (var scope = new CollectAll<String>(factory)) { 1094 Callable<String> task1 = () -> "foo"; 1095 Callable<String> task2 = () -> { throw new FooException(); }; 1096 scope.fork(task1); 1097 scope.fork(task2); 1098 scope.join(); 1099 1100 var subtask1 = scope.find(task1); 1101 assertEquals("foo", subtask1.get()); 1102 1103 var subtask2 = scope.find(task2); 1104 assertTrue(subtask2.exception() instanceof FooException); 1105 } 1106 } 1107 1108 /** 1109 * Test that handleComplete method is not invoked for tasks that finish after shutdown 1110 * or are forked after shutdown. 1111 */ 1112 @ParameterizedTest 1113 @MethodSource("factories") 1114 void testHandleCompleteAfterShutdown(ThreadFactory factory) throws Exception { 1115 try (var scope = new CollectAll<String>(factory)) { 1116 Callable<String> task1 = () -> { 1117 try { 1118 Thread.sleep(Duration.ofDays(1)); 1119 } catch (InterruptedException ignore) { } 1120 return "foo"; 1121 }; 1122 Callable<String> task2 = () -> { 1123 Thread.sleep(Duration.ofDays(1)); 1124 return "bar"; 1125 }; 1126 Callable<String> task3 = () -> "baz"; 1127 1128 // forked before shutdown, will complete after shutdown 1129 var subtask1 = scope.fork(task1); 1130 var subtask2 = scope.fork(task2); 1131 1132 scope.shutdown(); 1133 1134 // forked after shutdown 1135 var subtask3 = scope.fork(task3); 1136 1137 scope.join(); 1138 1139 // handleComplete should not be called 1140 for (int i = 0; i < 3; i++) { 1141 assertEquals(0, scope.subtasks().size()); 1142 Thread.sleep(20); 1143 } 1144 1145 assertEquals(Subtask.State.UNAVAILABLE, subtask1.state()); 1146 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 1147 assertEquals(Subtask.State.UNAVAILABLE, subtask3.state()); 1148 } 1149 } 1150 1151 /** 1152 * Test that the default handleComplete throws IllegalArgumentException if called 1153 * with a running task. 1154 */ 1155 @Test 1156 void testHandleCompleteThrows() throws Exception { 1157 class TestScope<T> extends StructuredTaskScope<T> { 1158 protected void handleComplete(Subtask<? extends T> subtask) { 1159 super.handleComplete(subtask); 1160 } 1161 } 1162 1163 try (var scope = new TestScope<String>()) { 1164 var subtask = scope.fork(() -> { 1165 Thread.sleep(Duration.ofDays(1)); 1166 return "foo"; 1167 }); 1168 1169 // running task 1170 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1171 assertThrows(IllegalArgumentException.class, () -> scope.handleComplete(subtask)); 1172 scope.shutdown(); 1173 1174 // null task 1175 assertThrows(NullPointerException.class, () -> scope.handleComplete(null)); 1176 1177 scope.join(); 1178 } 1179 } 1180 1181 /** 1182 * Test ensureOwnerAndJoined. 1183 */ 1184 @ParameterizedTest 1185 @MethodSource("factories") 1186 void testEnsureOwnerAndJoined(ThreadFactory factory) throws Exception { 1187 class MyScope<T> extends StructuredTaskScope<T> { 1188 MyScope(ThreadFactory factory) { 1189 super(null, factory); 1190 } 1191 void invokeEnsureOwnerAndJoined() { 1192 super.ensureOwnerAndJoined(); 1193 } 1194 } 1195 1196 try (var scope = new MyScope<Boolean>(factory)) { 1197 // owner thread, before join 1198 scope.fork(() -> true); 1199 assertThrows(IllegalStateException.class, () -> { 1200 scope.invokeEnsureOwnerAndJoined(); 1201 }); 1202 1203 // owner thread, after join 1204 scope.join(); 1205 scope.invokeEnsureOwnerAndJoined(); 1206 1207 // thread in scope cannot invoke ensureOwnerAndJoined 1208 Subtask<Boolean> subtask = scope.fork(() -> { 1209 assertThrows(WrongThreadException.class, () -> { 1210 scope.invokeEnsureOwnerAndJoined(); 1211 }); 1212 return true; 1213 }); 1214 scope.join(); 1215 assertTrue(subtask.get()); 1216 1217 // random thread cannot invoke ensureOwnerAndJoined 1218 try (var pool = Executors.newSingleThreadExecutor()) { 1219 Future<Void> future = pool.submit(() -> { 1220 assertThrows(WrongThreadException.class, () -> { 1221 scope.invokeEnsureOwnerAndJoined(); 1222 }); 1223 return null; 1224 }); 1225 future.get(); 1226 } 1227 } 1228 } 1229 1230 /** 1231 * Test ensureOwnerAndJoined after the task scope has been closed. 1232 */ 1233 @ParameterizedTest 1234 @MethodSource("factories") 1235 void testEnsureOwnerAndJoinedAfterClose(ThreadFactory factory) throws Exception { 1236 class MyScope<T> extends StructuredTaskScope<T> { 1237 MyScope(ThreadFactory factory) { 1238 super(null, factory); 1239 } 1240 public void invokeEnsureOwnerAndJoined() { 1241 super.ensureOwnerAndJoined(); 1242 } 1243 } 1244 1245 // ensureOwnerAndJoined after close, join invoked 1246 try (var scope = new MyScope<String>(factory)) { 1247 scope.fork(() -> "foo"); 1248 scope.join(); 1249 scope.close(); 1250 scope.invokeEnsureOwnerAndJoined(); // should not throw 1251 } 1252 1253 // ensureOwnerAndJoined after close, join not invoked 1254 try (var scope = new MyScope<String>(factory)) { 1255 scope.fork(() -> "foo"); 1256 assertThrows(IllegalStateException.class, scope::close); 1257 scope.invokeEnsureOwnerAndJoined(); // should not throw 1258 } 1259 } 1260 1261 1262 /** 1263 * Test toString. 1264 */ 1265 @Test 1266 void testToString() throws Exception { 1267 ThreadFactory factory = Thread.ofVirtual().factory(); 1268 try (var scope = new StructuredTaskScope<Object>("duke", factory)) { 1269 // open 1270 assertTrue(scope.toString().contains("duke")); 1271 1272 // shutdown 1273 scope.shutdown(); 1274 assertTrue(scope.toString().contains("duke")); 1275 1276 // closed 1277 scope.join(); 1278 scope.close(); 1279 assertTrue(scope.toString().contains("duke")); 1280 } 1281 } 1282 1283 /** 1284 * Test Subtask with task that completes successfully. 1285 */ 1286 @ParameterizedTest 1287 @MethodSource("factories") 1288 void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception { 1289 try (var scope = new StructuredTaskScope<String>(null, factory)) { 1290 Callable<String> task = () -> "foo"; 1291 Subtask<String> subtask = scope.fork(task); 1292 1293 // before join, owner thread 1294 assertEquals(task, subtask.task()); 1295 assertThrows(IllegalStateException.class, subtask::get); 1296 assertThrows(IllegalStateException.class, subtask::exception); 1297 1298 scope.join(); 1299 1300 // after join 1301 assertEquals(task, subtask.task()); 1302 assertEquals(Subtask.State.SUCCESS, subtask.state()); 1303 assertEquals("foo", subtask.get()); 1304 assertThrows(IllegalStateException.class, subtask::exception); 1305 } 1306 } 1307 1308 /** 1309 * Test Subtask with task that fails. 1310 */ 1311 @ParameterizedTest 1312 @MethodSource("factories") 1313 void testSubtaskWhenFailed(ThreadFactory factory) throws Exception { 1314 try (var scope = new StructuredTaskScope<String>(null, factory)) { 1315 Callable<String> task = () -> { throw new FooException(); }; 1316 Subtask<String> subtask = scope.fork(task); 1317 1318 // before join, owner thread 1319 assertEquals(task, subtask.task()); 1320 assertThrows(IllegalStateException.class, subtask::get); 1321 assertThrows(IllegalStateException.class, subtask::exception); 1322 1323 scope.join(); 1324 1325 // after join 1326 assertEquals(task, subtask.task()); 1327 assertEquals(Subtask.State.FAILED, subtask.state()); 1328 assertThrows(IllegalStateException.class, subtask::get); 1329 assertTrue(subtask.exception() instanceof FooException); 1330 } 1331 } 1332 1333 /** 1334 * Test Subtask with a task that has not completed. 1335 */ 1336 @ParameterizedTest 1337 @MethodSource("factories") 1338 void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception { 1339 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 1340 Callable<Void> task = () -> { 1341 Thread.sleep(Duration.ofDays(1)); 1342 return null; 1343 }; 1344 Subtask<Void> subtask = scope.fork(task); 1345 1346 // before join 1347 assertEquals(task, subtask.task()); 1348 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1349 assertThrows(IllegalStateException.class, subtask::get); 1350 assertThrows(IllegalStateException.class, subtask::exception); 1351 1352 // attempt join, join throws 1353 Thread.currentThread().interrupt(); 1354 assertThrows(InterruptedException.class, scope::join); 1355 1356 // after join 1357 assertEquals(task, subtask.task()); 1358 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1359 assertThrows(IllegalStateException.class, subtask::get); 1360 assertThrows(IllegalStateException.class, subtask::exception); 1361 } 1362 } 1363 1364 /** 1365 * Test Subtask when forked after shutdown. 1366 */ 1367 @ParameterizedTest 1368 @MethodSource("factories") 1369 void testSubtaskWhenShutdown(ThreadFactory factory) throws Exception { 1370 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 1371 Callable<Void> task = () -> { 1372 Thread.sleep(Duration.ofDays(1)); 1373 return null; 1374 }; 1375 1376 scope.shutdown(); 1377 1378 // fork after shutdown 1379 Subtask<Void> subtask = scope.fork(task); 1380 assertEquals(task, subtask.task()); 1381 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1382 assertThrows(IllegalStateException.class, subtask::get); 1383 assertThrows(IllegalStateException.class, subtask::exception); 1384 } 1385 } 1386 1387 /** 1388 * Test Subtask::toString. 1389 */ 1390 @Test 1391 void testSubtaskToString() throws Exception { 1392 try (var scope = new StructuredTaskScope<Object>()) { 1393 // success 1394 var subtask1 = scope.fork(() -> "foo"); 1395 scope.join(); 1396 assertTrue(subtask1.toString().contains("Completed successfully")); 1397 1398 // failed 1399 var subtask2 = scope.fork(() -> { throw new FooException(); }); 1400 scope.join(); 1401 assertTrue(subtask2.toString().contains("Failed")); 1402 1403 // not completed 1404 Callable<Void> sleepForDay = () -> { 1405 Thread.sleep(Duration.ofDays(1)); 1406 return null; 1407 }; 1408 var subtask3 = scope.fork(sleepForDay); 1409 assertTrue(subtask3.toString().contains("Unavailable")); 1410 1411 scope.shutdown(); 1412 1413 // forked after shutdown 1414 var subtask4 = scope.fork(sleepForDay); 1415 assertTrue(subtask4.toString().contains("Unavailable")); 1416 1417 scope.join(); 1418 } 1419 } 1420 1421 /** 1422 * Test ShutdownOnSuccess with no completed tasks. 1423 */ 1424 @Test 1425 void testShutdownOnSuccess1() throws Exception { 1426 try (var scope = new ShutdownOnSuccess<Object>()) { 1427 assertThrows(IllegalStateException.class, () -> scope.result()); 1428 assertThrows(IllegalStateException.class, () -> scope.result(e -> null)); 1429 } 1430 } 1431 1432 /** 1433 * Test ShutdownOnSuccess with tasks that complete successfully. 1434 */ 1435 @ParameterizedTest 1436 @MethodSource("factories") 1437 void testShutdownOnSuccess2(ThreadFactory factory) throws Exception { 1438 try (var scope = new ShutdownOnSuccess<String>(null, factory)) { 1439 scope.fork(() -> "foo"); 1440 scope.join(); // ensures foo completes first 1441 scope.fork(() -> "bar"); 1442 scope.join(); 1443 assertEquals("foo", scope.result()); 1444 assertEquals("foo", scope.result(e -> null)); 1445 } 1446 } 1447 1448 /** 1449 * Test ShutdownOnSuccess with a task that completes successfully with a null result. 1450 */ 1451 @ParameterizedTest 1452 @MethodSource("factories") 1453 void testShutdownOnSuccess3(ThreadFactory factory) throws Exception { 1454 try (var scope = new ShutdownOnSuccess<Object>(null, factory)) { 1455 scope.fork(() -> null); 1456 scope.join(); 1457 assertNull(scope.result()); 1458 assertNull(scope.result(e -> null)); 1459 } 1460 } 1461 1462 /** 1463 * Test ShutdownOnSuccess with tasks that complete succcessfully and tasks that fail. 1464 */ 1465 @ParameterizedTest 1466 @MethodSource("factories") 1467 void testShutdownOnSuccess4(ThreadFactory factory) throws Exception { 1468 try (var scope = new ShutdownOnSuccess<String>(null, factory)) { 1469 scope.fork(() -> "foo"); 1470 scope.fork(() -> { throw new ArithmeticException(); }); 1471 scope.join(); 1472 assertEquals("foo", scope.result()); 1473 assertEquals("foo", scope.result(e -> null)); 1474 } 1475 } 1476 1477 /** 1478 * Test ShutdownOnSuccess with a task that fails. 1479 */ 1480 @ParameterizedTest 1481 @MethodSource("factories") 1482 void testShutdownOnSuccess5(ThreadFactory factory) throws Exception { 1483 try (var scope = new ShutdownOnSuccess<Object>(null, factory)) { 1484 scope.fork(() -> { throw new ArithmeticException(); }); 1485 scope.join(); 1486 Throwable ex = assertThrows(ExecutionException.class, () -> scope.result()); 1487 assertTrue(ex.getCause() instanceof ArithmeticException); 1488 ex = assertThrows(FooException.class, () -> scope.result(e -> new FooException(e))); 1489 assertTrue(ex.getCause() instanceof ArithmeticException); 1490 } 1491 } 1492 1493 /** 1494 * Test ShutdownOnSuccess methods are confined to the owner. 1495 */ 1496 @ParameterizedTest 1497 @MethodSource("factories") 1498 void testShutdownOnSuccessConfined(ThreadFactory factory) throws Exception { 1499 // owner before join 1500 try (var scope = new ShutdownOnSuccess<Boolean>(null, factory)) { 1501 scope.fork(() -> { throw new FooException(); }); 1502 assertThrows(IllegalStateException.class, scope::result); 1503 assertThrows(IllegalStateException.class, () -> { 1504 scope.result(e -> new RuntimeException(e)); 1505 }); 1506 scope.join(); 1507 } 1508 1509 // non-owner 1510 try (var scope = new ShutdownOnSuccess<Boolean>(null, factory)) { 1511 Subtask<Boolean> subtask = scope.fork(() -> { 1512 assertThrows(WrongThreadException.class, scope::result); 1513 assertThrows(WrongThreadException.class, () -> { 1514 scope.result(e -> new RuntimeException(e)); 1515 }); 1516 return true; 1517 }); 1518 scope.join(); 1519 assertTrue(subtask.get()); 1520 } 1521 } 1522 1523 /** 1524 * Test ShutdownOnFailure with no completed tasks. 1525 */ 1526 @Test 1527 void testShutdownOnFailure1() throws Throwable { 1528 try (var scope = new ShutdownOnFailure()) { 1529 assertTrue(scope.exception().isEmpty()); 1530 scope.throwIfFailed(); 1531 scope.throwIfFailed(e -> new FooException(e)); 1532 } 1533 } 1534 1535 /** 1536 * Test ShutdownOnFailure with tasks that complete successfully. 1537 */ 1538 @ParameterizedTest 1539 @MethodSource("factories") 1540 void testShutdownOnFailure2(ThreadFactory factory) throws Throwable { 1541 try (var scope = new ShutdownOnFailure(null, factory)) { 1542 scope.fork(() -> "foo"); 1543 scope.fork(() -> "bar"); 1544 scope.join(); 1545 1546 // no exception 1547 assertTrue(scope.exception().isEmpty()); 1548 scope.throwIfFailed(); 1549 scope.throwIfFailed(e -> new FooException(e)); 1550 } 1551 } 1552 1553 /** 1554 * Test ShutdownOnFailure with tasks that complete succcessfully and tasks that fail. 1555 */ 1556 @ParameterizedTest 1557 @MethodSource("factories") 1558 void testShutdownOnFailure3(ThreadFactory factory) throws Throwable { 1559 try (var scope = new ShutdownOnFailure(null, factory)) { 1560 1561 // one task completes successfully, the other fails 1562 scope.fork(() -> "foo"); 1563 scope.fork(() -> { throw new ArithmeticException(); }); 1564 scope.join(); 1565 1566 Throwable ex = scope.exception().orElse(null); 1567 assertTrue(ex instanceof ArithmeticException); 1568 1569 ex = assertThrows(ExecutionException.class, () -> scope.throwIfFailed()); 1570 assertTrue(ex.getCause() instanceof ArithmeticException); 1571 1572 ex = assertThrows(FooException.class, 1573 () -> scope.throwIfFailed(e -> new FooException(e))); 1574 assertTrue(ex.getCause() instanceof ArithmeticException); 1575 } 1576 } 1577 1578 /** 1579 * Test ShutdownOnFailure methods are confined to the owner. 1580 */ 1581 @ParameterizedTest 1582 @MethodSource("factories") 1583 void testShutdownOnFailureConfined(ThreadFactory factory) throws Exception { 1584 // owner before join 1585 try (var scope = new ShutdownOnFailure(null, factory)) { 1586 scope.fork(() -> "foo"); 1587 assertThrows(IllegalStateException.class, scope::exception); 1588 assertThrows(IllegalStateException.class, scope::throwIfFailed); 1589 assertThrows(IllegalStateException.class, () -> { 1590 scope.throwIfFailed(e -> new RuntimeException(e)); 1591 }); 1592 scope.join(); 1593 } 1594 1595 // non-owner 1596 try (var scope = new ShutdownOnFailure(null, factory)) { 1597 Subtask<Boolean> subtask = scope.fork(() -> { 1598 assertThrows(WrongThreadException.class, scope::exception); 1599 assertThrows(WrongThreadException.class, scope::throwIfFailed); 1600 assertThrows(WrongThreadException.class, () -> { 1601 scope.throwIfFailed(e -> new RuntimeException(e)); 1602 }); 1603 return true; 1604 }); 1605 scope.join(); 1606 assertTrue(subtask.get()); 1607 } 1608 } 1609 1610 /** 1611 * Test for NullPointerException. 1612 */ 1613 @Test 1614 void testNulls() throws Exception { 1615 assertThrows(NullPointerException.class, () -> new StructuredTaskScope("", null)); 1616 try (var scope = new StructuredTaskScope<Object>()) { 1617 assertThrows(NullPointerException.class, () -> scope.fork(null)); 1618 assertThrows(NullPointerException.class, () -> scope.joinUntil(null)); 1619 } 1620 1621 assertThrows(NullPointerException.class, () -> new ShutdownOnSuccess<Object>("", null)); 1622 try (var scope = new ShutdownOnSuccess<Object>()) { 1623 assertThrows(NullPointerException.class, () -> scope.fork(null)); 1624 assertThrows(NullPointerException.class, () -> scope.joinUntil(null)); 1625 assertThrows(NullPointerException.class, () -> scope.result(null)); 1626 } 1627 1628 assertThrows(NullPointerException.class, () -> new ShutdownOnFailure("", null)); 1629 try (var scope = new ShutdownOnFailure()) { 1630 assertThrows(NullPointerException.class, () -> scope.fork(null)); 1631 assertThrows(NullPointerException.class, () -> scope.joinUntil(null)); 1632 assertThrows(NullPointerException.class, () -> scope.throwIfFailed(null)); 1633 } 1634 } 1635 1636 /** 1637 * A runtime exception for tests. 1638 */ 1639 private static class FooException extends RuntimeException { 1640 FooException() { } 1641 FooException(Throwable cause) { super(cause); } 1642 } 1643 1644 /** 1645 * Returns the current time in milliseconds. 1646 */ 1647 private long millisTime() { 1648 long now = System.nanoTime(); 1649 return TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS); 1650 } 1651 1652 /** 1653 * Check the duration of a task 1654 * @param start start time, in milliseconds 1655 * @param min minimum expected duration, in milliseconds 1656 * @param max maximum expected duration, in milliseconds 1657 * @return the duration (now - start), in milliseconds 1658 */ 1659 private long expectDuration(long start, long min, long max) { 1660 long duration = millisTime() - start; 1661 assertTrue(duration >= min, 1662 "Duration " + duration + "ms, expected >= " + min + "ms"); 1663 assertTrue(duration <= max, 1664 "Duration " + duration + "ms, expected <= " + max + "ms"); 1665 return duration; 1666 } 1667 1668 /** 1669 * Interrupts a thread when it waits (timed or untimed) at location "{@code c.m}". 1670 * {@code c} is the fully qualified class name and {@code m} is the method name. 1671 */ 1672 private void interruptThreadAt(Thread target, String location) throws InterruptedException { 1673 int index = location.lastIndexOf('.'); 1674 String className = location.substring(0, index); 1675 String methodName = location.substring(index + 1); 1676 1677 boolean found = false; 1678 while (!found) { 1679 Thread.State state = target.getState(); 1680 assertTrue(state != TERMINATED); 1681 if ((state == WAITING || state == TIMED_WAITING) 1682 && contains(target.getStackTrace(), className, methodName)) { 1683 found = true; 1684 } else { 1685 Thread.sleep(20); 1686 } 1687 } 1688 target.interrupt(); 1689 } 1690 1691 /** 1692 * Schedules the current thread to be interrupted when it waits (timed or untimed) 1693 * at the given location. 1694 */ 1695 private void scheduleInterruptAt(String location) { 1696 Thread target = Thread.currentThread(); 1697 scheduler.submit(() -> { 1698 interruptThreadAt(target, location); 1699 return null; 1700 }); 1701 } 1702 1703 /** 1704 * Returns true if the given stack trace contains an element for the given class 1705 * and method name. 1706 */ 1707 private boolean contains(StackTraceElement[] stack, String className, String methodName) { 1708 return Arrays.stream(stack) 1709 .anyMatch(e -> className.equals(e.getClassName()) 1710 && methodName.equals(e.getMethodName())); 1711 } 1712 }