1 /* 2 * Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test id=platform 26 * @bug 8284199 8296779 8306647 27 * @summary Basic tests for StructuredTaskScope 28 * @enablePreview 29 * @run junit/othervm -DthreadFactory=platform StructuredTaskScopeTest 30 */ 31 32 /* 33 * @test id=virtual 34 * @enablePreview 35 * @run junit/othervm -DthreadFactory=virtual StructuredTaskScopeTest 36 */ 37 38 import java.time.Duration; 39 import java.io.IOException; 40 import java.time.Instant; 41 import java.util.Arrays; 42 import java.util.ArrayList; 43 import java.util.List; 44 import java.util.Set; 45 import java.util.concurrent.Callable; 46 import java.util.concurrent.ConcurrentHashMap; 47 import java.util.concurrent.CountDownLatch; 48 import java.util.concurrent.Executors; 49 import java.util.concurrent.ExecutionException; 50 import java.util.concurrent.Future; 51 import java.util.concurrent.ThreadFactory; 52 import java.util.concurrent.TimeoutException; 53 import java.util.concurrent.TimeUnit; 54 import java.util.concurrent.RejectedExecutionException; 55 import java.util.concurrent.ScheduledExecutorService; 56 import java.util.concurrent.StructuredTaskScope; 57 import java.util.concurrent.StructuredTaskScope.Subtask; 58 import java.util.concurrent.StructuredTaskScope.ShutdownOnSuccess; 59 import java.util.concurrent.StructuredTaskScope.ShutdownOnFailure; 60 import java.util.concurrent.StructureViolationException; 61 import java.util.concurrent.atomic.AtomicBoolean; 62 import java.util.concurrent.atomic.AtomicInteger; 63 import java.util.concurrent.atomic.AtomicReference; 64 import java.util.function.Supplier; 65 import java.util.stream.Stream; 66 import static java.lang.Thread.State.*; 67 68 import org.junit.jupiter.api.Test; 69 import org.junit.jupiter.api.BeforeAll; 70 import org.junit.jupiter.api.AfterAll; 71 import org.junit.jupiter.params.ParameterizedTest; 72 import org.junit.jupiter.params.provider.MethodSource; 73 import static org.junit.jupiter.api.Assertions.*; 74 75 class StructuredTaskScopeTest { 76 private static ScheduledExecutorService scheduler; 77 private static List<ThreadFactory> threadFactories; 78 79 @BeforeAll 80 static void setup() throws Exception { 81 scheduler = Executors.newSingleThreadScheduledExecutor(); 82 83 // thread factories 84 String value = System.getProperty("threadFactory"); 85 List<ThreadFactory> list = new ArrayList<>(); 86 if (value == null || value.equals("platform")) 87 list.add(Thread.ofPlatform().factory()); 88 if (value == null || value.equals("virtual")) 89 list.add(Thread.ofVirtual().factory()); 90 assertTrue(list.size() > 0, "No thread factories for tests"); 91 threadFactories = list; 92 } 93 94 @AfterAll 95 static void shutdown() { 96 scheduler.shutdown(); 97 } 98 99 private static Stream<ThreadFactory> factories() { 100 return threadFactories.stream(); 101 } 102 103 /** 104 * Test that fork creates a new thread for each task. 105 */ 106 @ParameterizedTest 107 @MethodSource("factories") 108 void testForkCreatesThread(ThreadFactory factory) throws Exception { 109 Set<Long> tids = ConcurrentHashMap.newKeySet(); 110 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 111 for (int i = 0; i < 100; i++) { 112 scope.fork(() -> { 113 tids.add(Thread.currentThread().threadId()); 114 return null; 115 }); 116 } 117 scope.join(); 118 } 119 assertEquals(100, tids.size()); 120 } 121 122 /** 123 * Test that fork creates a new virtual thread for each task. 124 */ 125 @Test 126 void testForkCreateVirtualThread() throws Exception { 127 Set<Thread> threads = ConcurrentHashMap.newKeySet(); 128 try (var scope = new StructuredTaskScope<Object>()) { 129 for (int i = 0; i < 100; i++) { 130 scope.fork(() -> { 131 threads.add(Thread.currentThread()); 132 return null; 133 }); 134 } 135 scope.join(); 136 } 137 assertEquals(100, threads.size()); 138 threads.forEach(t -> assertTrue(t.isVirtual())); 139 } 140 141 /** 142 * Test that fork creates a new thread with the given thread factory. 143 */ 144 @ParameterizedTest 145 @MethodSource("factories") 146 void testForkUsesFactory(ThreadFactory factory) throws Exception { 147 var count = new AtomicInteger(); 148 ThreadFactory countingFactory = task -> { 149 count.incrementAndGet(); 150 return factory.newThread(task); 151 }; 152 try (var scope = new StructuredTaskScope<Object>(null, countingFactory)) { 153 for (int i = 0; i < 100; i++) { 154 scope.fork(() -> null); 155 } 156 scope.join(); 157 } 158 assertEquals(100, count.get()); 159 } 160 161 /** 162 * Test fork is confined to threads in the scope "tree". 163 */ 164 @ParameterizedTest 165 @MethodSource("factories") 166 void testForkConfined(ThreadFactory factory) throws Exception { 167 try (var scope1 = new StructuredTaskScope<Boolean>(); 168 var scope2 = new StructuredTaskScope<Boolean>()) { 169 170 // thread in scope1 cannot fork thread in scope2 171 Subtask<Boolean> subtask1 = scope1.fork(() -> { 172 assertThrows(WrongThreadException.class, () -> { 173 scope2.fork(() -> null); 174 }); 175 return true; 176 }); 177 178 // thread in scope2 can fork thread in scope1 179 Subtask<Boolean> subtask2 = scope2.fork(() -> { 180 scope1.fork(() -> null); 181 return true; 182 }); 183 184 scope2.join(); 185 scope1.join(); 186 187 assertTrue(subtask1.get()); 188 assertTrue(subtask2.get()); 189 190 // random thread cannot fork 191 try (var pool = Executors.newSingleThreadExecutor()) { 192 Future<Void> future = pool.submit(() -> { 193 assertThrows(WrongThreadException.class, () -> { 194 scope1.fork(() -> null); 195 }); 196 assertThrows(WrongThreadException.class, () -> { 197 scope2.fork(() -> null); 198 }); 199 return null; 200 }); 201 future.get(); 202 } 203 } 204 } 205 206 /** 207 * Test fork after join completes. 208 */ 209 @ParameterizedTest 210 @MethodSource("factories") 211 void testForkAfterJoin(ThreadFactory factory) throws Exception { 212 try (var scope = new StructuredTaskScope<String>(null, factory)) { 213 // round 1 214 var subtask1 = scope.fork(() -> "foo"); 215 assertThrows(IllegalStateException.class, subtask1::get); 216 scope.join(); 217 assertEquals("foo", subtask1.get()); 218 219 // round 2 220 var subtask2 = scope.fork(() -> "bar"); 221 assertEquals("foo", subtask1.get()); 222 assertThrows(IllegalStateException.class, subtask2::get); 223 scope.join(); 224 assertEquals("foo", subtask1.get()); 225 assertEquals("bar", subtask2.get()); 226 227 // round 3 228 var subtask3 = scope.fork(() -> "baz"); 229 assertEquals("foo", subtask1.get()); 230 assertEquals("bar", subtask2.get()); 231 assertThrows(IllegalStateException.class, subtask3::get); 232 scope.join(); 233 assertEquals("foo", subtask1.get()); 234 assertEquals("bar", subtask2.get()); 235 assertEquals("baz", subtask3.get()); 236 } 237 } 238 239 /** 240 * Test fork after join throws. 241 */ 242 @ParameterizedTest 243 @MethodSource("factories") 244 void testForkAfterJoinThrows(ThreadFactory factory) throws Exception { 245 try (var scope = new StructuredTaskScope<String>(null, factory)) { 246 var latch = new CountDownLatch(1); 247 var subtask1 = scope.fork(() -> { 248 latch.await(); 249 return "foo"; 250 }); 251 252 // join throws 253 Thread.currentThread().interrupt(); 254 assertThrows(InterruptedException.class, scope::join); 255 256 // allow subtask1 to finish 257 latch.countDown(); 258 259 // continue to fork 260 var subtask2 = scope.fork(() -> "bar"); 261 assertThrows(IllegalStateException.class, subtask1::get); 262 assertThrows(IllegalStateException.class, subtask2::get); 263 scope.join(); 264 assertEquals("foo", subtask1.get()); 265 assertEquals("bar", subtask2.get()); 266 } 267 } 268 269 /** 270 * Test fork after scope is shutdown. 271 */ 272 @ParameterizedTest 273 @MethodSource("factories") 274 void testForkAfterShutdown(ThreadFactory factory) throws Exception { 275 var executed = new AtomicBoolean(); 276 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 277 scope.shutdown(); 278 Subtask<String> subtask = scope.fork(() -> { 279 executed.set(true); 280 return null; 281 }); 282 scope.join(); 283 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 284 assertThrows(IllegalStateException.class, subtask::get); 285 assertThrows(IllegalStateException.class, subtask::exception); 286 } 287 assertFalse(executed.get()); 288 } 289 290 /** 291 * Test fork after scope is closed. 292 */ 293 @ParameterizedTest 294 @MethodSource("factories") 295 void testForkAfterClose(ThreadFactory factory) throws Exception { 296 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 297 scope.close(); 298 assertThrows(IllegalStateException.class, () -> scope.fork(() -> null)); 299 } 300 } 301 302 /** 303 * Test fork when the thread factory rejects creating a thread. 304 */ 305 @Test 306 void testForkRejectedExecutionException() throws Exception { 307 ThreadFactory factory = task -> null; 308 try (var scope = new StructuredTaskScope(null, factory)) { 309 assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null)); 310 scope.join(); 311 } 312 } 313 314 /** 315 * Test join with no subtasks. 316 */ 317 @Test 318 void testJoinWithNoSubtasks() throws Exception { 319 try (var scope = new StructuredTaskScope()) { 320 scope.join(); 321 } 322 } 323 324 /** 325 * Test join with unfinished subtasks. 326 */ 327 @ParameterizedTest 328 @MethodSource("factories") 329 void testJoinWithSubtasks(ThreadFactory factory) throws Exception { 330 try (var scope = new StructuredTaskScope(null, factory)) { 331 Subtask<String> subtask = scope.fork(() -> { 332 Thread.sleep(Duration.ofMillis(50)); 333 return "foo"; 334 }); 335 scope.join(); 336 assertEquals("foo", subtask.get()); 337 } 338 } 339 340 /** 341 * Test join is owner confined. 342 */ 343 @ParameterizedTest 344 @MethodSource("factories") 345 void testJoinConfined(ThreadFactory factory) throws Exception { 346 try (var scope = new StructuredTaskScope<Boolean>()) { 347 348 // thread in scope cannot join 349 Subtask<Boolean> subtask = scope.fork(() -> { 350 assertThrows(WrongThreadException.class, () -> { scope.join(); }); 351 return true; 352 }); 353 354 scope.join(); 355 356 assertTrue(subtask.get()); 357 358 // random thread cannot join 359 try (var pool = Executors.newSingleThreadExecutor()) { 360 Future<Void> future = pool.submit(() -> { 361 assertThrows(WrongThreadException.class, scope::join); 362 return null; 363 }); 364 future.get(); 365 } 366 } 367 } 368 369 /** 370 * Test join with interrupt status set. 371 */ 372 @ParameterizedTest 373 @MethodSource("factories") 374 void testInterruptJoin1(ThreadFactory factory) throws Exception { 375 try (var scope = new StructuredTaskScope(null, factory)) { 376 var latch = new CountDownLatch(1); 377 378 Subtask<String> subtask = scope.fork(() -> { 379 latch.await(); 380 return "foo"; 381 }); 382 383 // join should throw 384 Thread.currentThread().interrupt(); 385 try { 386 scope.join(); 387 fail("join did not throw"); 388 } catch (InterruptedException expected) { 389 assertFalse(Thread.interrupted()); // interrupt status should be clear 390 } finally { 391 // let task continue 392 latch.countDown(); 393 } 394 395 // join should complete 396 scope.join(); 397 assertEquals("foo", subtask.get()); 398 } 399 } 400 401 /** 402 * Test interrupt of thread blocked in join. 403 */ 404 @ParameterizedTest 405 @MethodSource("factories") 406 void testInterruptJoin2(ThreadFactory factory) throws Exception { 407 try (var scope = new StructuredTaskScope(null, factory)) { 408 var latch = new CountDownLatch(1); 409 Subtask<String> subtask = scope.fork(() -> { 410 latch.await(); 411 return "foo"; 412 }); 413 414 // join should throw 415 scheduleInterruptAt("java.util.concurrent.StructuredTaskScope.join"); 416 try { 417 scope.join(); 418 fail("join did not throw"); 419 } catch (InterruptedException expected) { 420 assertFalse(Thread.interrupted()); // interrupt status should be clear 421 } finally { 422 // let task continue 423 latch.countDown(); 424 } 425 426 // join should complete 427 scope.join(); 428 assertEquals("foo", subtask.get()); 429 } 430 } 431 432 /** 433 * Test join when scope is shutdown. 434 */ 435 @ParameterizedTest 436 @MethodSource("factories") 437 void testJoinWithShutdown1(ThreadFactory factory) throws Exception { 438 try (var scope = new StructuredTaskScope<String>(null, factory)) { 439 var interrupted = new CountDownLatch(1); 440 var finish = new CountDownLatch(1); 441 442 Subtask<String> subtask = scope.fork(() -> { 443 try { 444 Thread.sleep(Duration.ofDays(1)); 445 } catch (InterruptedException e) { 446 interrupted.countDown(); 447 } 448 finish.await(); 449 return "foo"; 450 }); 451 452 scope.shutdown(); // should interrupt task 453 454 interrupted.await(); 455 456 scope.join(); 457 458 // signal task to finish 459 finish.countDown(); 460 } 461 } 462 463 /** 464 * Test shutdown when owner is blocked in join. 465 */ 466 @ParameterizedTest 467 @MethodSource("factories") 468 void testJoinWithShutdown2(ThreadFactory factory) throws Exception { 469 class MyScope<T> extends StructuredTaskScope<T> { 470 MyScope(ThreadFactory factory) { 471 super(null, factory); 472 } 473 @Override 474 protected void handleComplete(Subtask<? extends T> subtask) { 475 shutdown(); 476 } 477 } 478 479 try (var scope = new MyScope<String>(factory)) { 480 Subtask<String> subtask1 = scope.fork(() -> { 481 Thread.sleep(Duration.ofMillis(50)); 482 return "foo"; 483 }); 484 Subtask<String> subtask2 = scope.fork(() -> { 485 Thread.sleep(Duration.ofDays(1)); 486 return "bar"; 487 }); 488 489 // join should wakeup when shutdown is called 490 scope.join(); 491 492 // task1 should have completed successfully 493 assertEquals(Subtask.State.SUCCESS, subtask1.state()); 494 assertEquals("foo", subtask1.get()); 495 assertThrows(IllegalStateException.class, subtask1::exception); 496 497 // task2 result/exception not available 498 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 499 assertThrows(IllegalStateException.class, subtask2::get); 500 assertThrows(IllegalStateException.class, subtask2::exception); 501 } 502 } 503 504 /** 505 * Test join after scope is closed. 506 */ 507 @Test 508 void testJoinAfterClose() throws Exception { 509 try (var scope = new StructuredTaskScope()) { 510 scope.join(); 511 scope.close(); 512 assertThrows(IllegalStateException.class, () -> scope.join()); 513 assertThrows(IllegalStateException.class, () -> scope.joinUntil(Instant.now())); 514 } 515 } 516 517 /** 518 * Test joinUntil, subtasks finish before deadline expires. 519 */ 520 @ParameterizedTest 521 @MethodSource("factories") 522 void testJoinUntil1(ThreadFactory factory) throws Exception { 523 try (var scope = new StructuredTaskScope<String>(null, factory)) { 524 Subtask<String> subtask = scope.fork(() -> { 525 try { 526 Thread.sleep(Duration.ofSeconds(2)); 527 } catch (InterruptedException e) { } 528 return "foo"; 529 }); 530 531 long startMillis = millisTime(); 532 scope.joinUntil(Instant.now().plusSeconds(30)); 533 expectDuration(startMillis, /*min*/1900, /*max*/20_000); 534 assertEquals("foo", subtask.get()); 535 } 536 } 537 538 /** 539 * Test joinUntil, deadline expires before subtasks finish. 540 */ 541 @ParameterizedTest 542 @MethodSource("factories") 543 void testJoinUntil2(ThreadFactory factory) throws Exception { 544 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 545 Subtask<Void> subtask = scope.fork(() -> { 546 Thread.sleep(Duration.ofDays(1)); 547 return null; 548 }); 549 550 long startMillis = millisTime(); 551 try { 552 scope.joinUntil(Instant.now().plusSeconds(2)); 553 } catch (TimeoutException e) { 554 expectDuration(startMillis, /*min*/1900, /*max*/20_000); 555 } 556 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 557 } 558 } 559 560 /** 561 * Test joinUntil many times. 562 */ 563 @ParameterizedTest 564 @MethodSource("factories") 565 void testJoinUntil3(ThreadFactory factory) throws Exception { 566 try (var scope = new StructuredTaskScope<String>(null, factory)) { 567 Subtask<String> subtask = scope.fork(() -> { 568 Thread.sleep(Duration.ofDays(1)); 569 return null; 570 }); 571 572 for (int i = 0; i < 3; i++) { 573 try { 574 scope.joinUntil(Instant.now().plusMillis(50)); 575 fail("joinUntil did not throw"); 576 } catch (TimeoutException expected) { 577 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 578 } 579 } 580 } 581 } 582 583 /** 584 * Test joinUntil with a deadline that has already expired. 585 */ 586 @ParameterizedTest 587 @MethodSource("factories") 588 void testJoinUntil4(ThreadFactory factory) throws Exception { 589 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 590 Subtask<Void> subtask = scope.fork(() -> { 591 Thread.sleep(Duration.ofDays(1)); 592 return null; 593 }); 594 595 // now 596 try { 597 scope.joinUntil(Instant.now()); 598 fail("joinUntil did not throw"); 599 } catch (TimeoutException expected) { 600 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 601 } 602 603 // in the past 604 try { 605 scope.joinUntil(Instant.now().minusSeconds(1)); 606 fail("joinUntil did not throw"); 607 } catch (TimeoutException expected) { 608 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 609 } 610 } 611 } 612 613 /** 614 * Test joinUntil with interrupt status set. 615 */ 616 @ParameterizedTest 617 @MethodSource("factories") 618 void testInterruptJoinUntil1(ThreadFactory factory) throws Exception { 619 try (var scope = new StructuredTaskScope<String>(null, factory)) { 620 var latch = new CountDownLatch(1); 621 622 Subtask<String> subtask = scope.fork(() -> { 623 latch.await(); 624 return "foo"; 625 }); 626 627 // joinUntil should throw 628 Thread.currentThread().interrupt(); 629 try { 630 scope.joinUntil(Instant.now().plusSeconds(30)); 631 fail("joinUntil did not throw"); 632 } catch (InterruptedException expected) { 633 assertFalse(Thread.interrupted()); // interrupt status should be clear 634 } finally { 635 // let task continue 636 latch.countDown(); 637 } 638 639 // join should complete 640 scope.join(); 641 assertEquals("foo", subtask.get()); 642 } 643 } 644 645 /** 646 * Test interrupt of thread blocked in joinUntil. 647 */ 648 @ParameterizedTest 649 @MethodSource("factories") 650 void testInterruptJoinUntil2(ThreadFactory factory) throws Exception { 651 try (var scope = new StructuredTaskScope(null, factory)) { 652 var latch = new CountDownLatch(1); 653 654 Subtask<String> subtask = scope.fork(() -> { 655 latch.await(); 656 return "foo"; 657 }); 658 659 // joinUntil should throw 660 scheduleInterruptAt("java.util.concurrent.StructuredTaskScope.joinUntil"); 661 try { 662 scope.joinUntil(Instant.now().plusSeconds(30)); 663 fail("joinUntil did not throw"); 664 } catch (InterruptedException expected) { 665 assertFalse(Thread.interrupted()); // interrupt status should be clear 666 } finally { 667 // let task continue 668 latch.countDown(); 669 } 670 671 // join should complete 672 scope.join(); 673 assertEquals("foo", subtask.get()); 674 } 675 } 676 677 /** 678 * Test that shutdown interrupts unfinished subtasks. 679 */ 680 @ParameterizedTest 681 @MethodSource("factories") 682 void testShutdownInterruptsThreads1(ThreadFactory factory) throws Exception { 683 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 684 var interrupted = new AtomicBoolean(); 685 var latch = new CountDownLatch(1); 686 var subtask = scope.fork(() -> { 687 try { 688 Thread.sleep(Duration.ofDays(1)); 689 } catch (InterruptedException e) { 690 interrupted.set(true); 691 } finally { 692 latch.countDown(); 693 } 694 return null; 695 }); 696 697 scope.shutdown(); 698 699 // wait for task to complete 700 latch.await(); 701 assertTrue(interrupted.get()); 702 703 scope.join(); 704 705 // subtask result/exception not available 706 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 707 assertThrows(IllegalStateException.class, subtask::get); 708 assertThrows(IllegalStateException.class, subtask::exception); 709 } 710 } 711 712 /** 713 * Test that shutdown does not interrupt current thread. 714 */ 715 @ParameterizedTest 716 @MethodSource("factories") 717 void testShutdownInterruptsThreads2(ThreadFactory factory) throws Exception { 718 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 719 var interrupted = new AtomicBoolean(); 720 var latch = new CountDownLatch(1); 721 var subtask = scope.fork(() -> { 722 try { 723 scope.shutdown(); 724 interrupted.set(Thread.currentThread().isInterrupted()); 725 } finally { 726 latch.countDown(); 727 } 728 return null; 729 }); 730 731 // wait for task to complete 732 latch.await(); 733 assertFalse(interrupted.get()); 734 735 scope.join(); 736 } 737 } 738 739 /** 740 * Test shutdown wakes join. 741 */ 742 @ParameterizedTest 743 @MethodSource("factories") 744 void testShutdownWakesJoin(ThreadFactory factory) throws Exception { 745 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 746 var latch = new CountDownLatch(1); 747 scope.fork(() -> { 748 Thread.sleep(Duration.ofMillis(100)); // give time for join to block 749 scope.shutdown(); 750 latch.await(); 751 return null; 752 }); 753 754 scope.join(); 755 756 // join woke up, allow task to complete 757 latch.countDown(); 758 } 759 } 760 761 /** 762 * Test shutdown after scope is closed. 763 */ 764 @Test 765 void testShutdownAfterClose() throws Exception { 766 try (var scope = new StructuredTaskScope<Object>()) { 767 scope.join(); 768 scope.close(); 769 assertThrows(IllegalStateException.class, scope::shutdown); 770 } 771 } 772 773 /** 774 * Test shutdown is confined to threads in the scope "tree". 775 */ 776 @ParameterizedTest 777 @MethodSource("factories") 778 void testShutdownConfined(ThreadFactory factory) throws Exception { 779 try (var scope1 = new StructuredTaskScope<Boolean>(); 780 var scope2 = new StructuredTaskScope<Boolean>()) { 781 782 // thread in scope1 cannot shutdown scope2 783 Subtask<Boolean> subtask1 = scope1.fork(() -> { 784 assertThrows(WrongThreadException.class, scope2::shutdown); 785 return true; 786 }); 787 788 // wait for task in scope1 to complete to avoid racing with task in scope2 789 while (subtask1.state() == Subtask.State.UNAVAILABLE) { 790 Thread.sleep(10); 791 } 792 793 // thread in scope2 shutdown scope1 794 Subtask<Boolean> subtask2 = scope2.fork(() -> { 795 scope1.shutdown(); 796 return true; 797 }); 798 799 scope2.join(); 800 scope1.join(); 801 802 assertTrue(subtask1.get()); 803 assertTrue(subtask1.get()); 804 805 // random thread cannot shutdown 806 try (var pool = Executors.newSingleThreadExecutor()) { 807 Future<Void> future = pool.submit(() -> { 808 assertThrows(WrongThreadException.class, scope1::shutdown); 809 assertThrows(WrongThreadException.class, scope2::shutdown); 810 return null; 811 }); 812 future.get(); 813 } 814 } 815 } 816 817 /** 818 * Test isShutdown. 819 */ 820 @Test 821 void testIsShutdown() { 822 try (var scope = new StructuredTaskScope<Object>()) { 823 assertFalse(scope.isShutdown()); // before shutdown 824 scope.shutdown(); 825 assertTrue(scope.isShutdown()); // after shutdown 826 scope.close(); 827 assertTrue(scope.isShutdown()); // after cose 828 } 829 } 830 831 /** 832 * Test close without join, no subtasks forked. 833 */ 834 @Test 835 void testCloseWithoutJoin1() { 836 try (var scope = new StructuredTaskScope<Object>()) { 837 // do nothing 838 } 839 } 840 841 /** 842 * Test close without join, unfinished subtasks. 843 */ 844 @ParameterizedTest 845 @MethodSource("factories") 846 void testCloseWithoutJoin2(ThreadFactory factory) { 847 try (var scope = new StructuredTaskScope<String>(null, factory)) { 848 Subtask<String> subtask = scope.fork(() -> { 849 Thread.sleep(Duration.ofDays(1)); 850 return null; 851 }); 852 assertThrows(IllegalStateException.class, scope::close); 853 854 // subtask result/exception not available 855 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 856 assertThrows(IllegalStateException.class, subtask::get); 857 assertThrows(IllegalStateException.class, subtask::exception); 858 } 859 } 860 861 /** 862 * Test close without join, unfinished subtasks forked after join. 863 */ 864 @ParameterizedTest 865 @MethodSource("factories") 866 void testCloseWithoutJoin3(ThreadFactory factory) throws Exception { 867 try (var scope = new StructuredTaskScope(null, factory)) { 868 scope.fork(() -> "foo"); 869 scope.join(); 870 871 Subtask<String> subtask = scope.fork(() -> { 872 Thread.sleep(Duration.ofDays(1)); 873 return null; 874 }); 875 assertThrows(IllegalStateException.class, scope::close); 876 877 // subtask result/exception not available 878 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 879 assertThrows(IllegalStateException.class, subtask::get); 880 assertThrows(IllegalStateException.class, subtask::exception); 881 } 882 } 883 884 /** 885 * Test close after join throws. Close should not throw as join attempted. 886 */ 887 @ParameterizedTest 888 @MethodSource("factories") 889 void testCloseAfterJoinThrows(ThreadFactory factory) throws Exception { 890 try (var scope = new StructuredTaskScope<Object>()) { 891 var subtask = scope.fork(() -> { 892 Thread.sleep(Duration.ofDays(1)); 893 return null; 894 }); 895 896 // join throws 897 Thread.currentThread().interrupt(); 898 assertThrows(InterruptedException.class, scope::join); 899 assertThrows(IllegalStateException.class, subtask::get); 900 } 901 } 902 903 /** 904 * Test close after joinUntil throws. Close should not throw as join attempted. 905 */ 906 @ParameterizedTest 907 @MethodSource("factories") 908 void testCloseAfterJoinUntilThrows(ThreadFactory factory) throws Exception { 909 try (var scope = new StructuredTaskScope<Object>()) { 910 var subtask = scope.fork(() -> { 911 Thread.sleep(Duration.ofDays(1)); 912 return null; 913 }); 914 915 // joinUntil throws 916 assertThrows(TimeoutException.class, () -> scope.joinUntil(Instant.now())); 917 assertThrows(IllegalStateException.class, subtask::get); 918 } 919 } 920 921 /** 922 * Test close is owner confined. 923 */ 924 @ParameterizedTest 925 @MethodSource("factories") 926 void testCloseConfined(ThreadFactory factory) throws Exception { 927 try (var scope = new StructuredTaskScope<Boolean>()) { 928 929 // attempt to close from thread in scope 930 Subtask<Boolean> subtask = scope.fork(() -> { 931 assertThrows(WrongThreadException.class, scope::close); 932 return true; 933 }); 934 935 scope.join(); 936 assertTrue(subtask.get()); 937 938 // random thread cannot close scope 939 try (var pool = Executors.newCachedThreadPool(factory)) { 940 Future<Boolean> future = pool.submit(() -> { 941 assertThrows(WrongThreadException.class, scope::close); 942 return null; 943 }); 944 future.get(); 945 } 946 } 947 } 948 949 /** 950 * Test close with interrupt status set. 951 */ 952 @ParameterizedTest 953 @MethodSource("factories") 954 void testInterruptClose1(ThreadFactory factory) throws Exception { 955 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 956 var done = new AtomicBoolean(); 957 scope.fork(() -> { 958 try { 959 Thread.sleep(Duration.ofDays(1)); 960 } catch (InterruptedException e) { 961 // interrupted by shutdown, expected 962 } 963 Thread.sleep(Duration.ofMillis(100)); // force close to wait 964 done.set(true); 965 return null; 966 }); 967 968 scope.shutdown(); 969 scope.join(); 970 971 // invoke close with interrupt status set 972 Thread.currentThread().interrupt(); 973 try { 974 scope.close(); 975 } finally { 976 assertTrue(Thread.interrupted()); // clear interrupt status 977 assertTrue(done.get()); 978 } 979 } 980 } 981 982 /** 983 * Test interrupting thread waiting in close. 984 */ 985 @ParameterizedTest 986 @MethodSource("factories") 987 void testInterruptClose2(ThreadFactory factory) throws Exception { 988 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 989 var done = new AtomicBoolean(); 990 Thread mainThread = Thread.currentThread(); 991 scope.fork(() -> { 992 try { 993 Thread.sleep(Duration.ofDays(1)); 994 } catch (InterruptedException e) { 995 // interrupted by shutdown, expected 996 } 997 998 // interrupt main thread when it blocks in close 999 interruptThreadAt(mainThread, "java.util.concurrent.StructuredTaskScope.close"); 1000 1001 Thread.sleep(Duration.ofMillis(100)); // force close to wait 1002 done.set(true); 1003 return null; 1004 }); 1005 1006 scope.shutdown(); // interrupts task 1007 scope.join(); 1008 try { 1009 scope.close(); 1010 } finally { 1011 assertTrue(Thread.interrupted()); // clear interrupt status 1012 assertTrue(done.get()); 1013 } 1014 } 1015 } 1016 1017 /** 1018 * Test that closing an enclosing scope closes the thread flock of a nested scope. 1019 */ 1020 @Test 1021 void testCloseThrowsStructureViolation() throws Exception { 1022 try (var scope1 = new StructuredTaskScope<Object>()) { 1023 try (var scope2 = new StructuredTaskScope<Object>()) { 1024 1025 // join + close enclosing scope 1026 scope1.join(); 1027 try { 1028 scope1.close(); 1029 fail("close did not throw"); 1030 } catch (StructureViolationException expected) { } 1031 1032 // underlying flock should be closed, fork should return a cancelled task 1033 var executed = new AtomicBoolean(); 1034 Subtask<Void> subtask = scope2.fork(() -> { 1035 executed.set(true); 1036 return null; 1037 }); 1038 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1039 scope2.join(); 1040 assertFalse(executed.get()); 1041 } 1042 } 1043 } 1044 1045 /** 1046 * A StructuredTaskScope that collects the subtasks notified to the handleComplete method. 1047 */ 1048 private static class CollectAll<T> extends StructuredTaskScope<T> { 1049 private final Set<Subtask<? extends T>> subtasks = ConcurrentHashMap.newKeySet(); 1050 1051 CollectAll(ThreadFactory factory) { 1052 super(null, factory); 1053 } 1054 1055 @Override 1056 protected void handleComplete(Subtask<? extends T> subtask) { 1057 subtasks.add(subtask); 1058 } 1059 1060 Set<Subtask<? extends T>> subtasks() { 1061 return subtasks; 1062 } 1063 1064 Subtask<? extends T> find(Callable<T> task) { 1065 return subtasks.stream() 1066 .filter(h -> task.equals(h.task())) 1067 .findAny() 1068 .orElseThrow(); 1069 } 1070 } 1071 1072 /** 1073 * Test that handleComplete method is invoked for tasks that complete before shutdown. 1074 */ 1075 @ParameterizedTest 1076 @MethodSource("factories") 1077 void testHandleCompleteBeforeShutdown(ThreadFactory factory) throws Exception { 1078 try (var scope = new CollectAll<String>(factory)) { 1079 Callable<String> task1 = () -> "foo"; 1080 Callable<String> task2 = () -> { throw new FooException(); }; 1081 scope.fork(task1); 1082 scope.fork(task2); 1083 scope.join(); 1084 1085 var subtask1 = scope.find(task1); 1086 assertEquals("foo", subtask1.get()); 1087 1088 var subtask2 = scope.find(task2); 1089 assertTrue(subtask2.exception() instanceof FooException); 1090 } 1091 } 1092 1093 /** 1094 * Test that handleComplete method is not invoked for tasks that finish after shutdown 1095 * or are forked after shutdown. 1096 */ 1097 @ParameterizedTest 1098 @MethodSource("factories") 1099 void testHandleCompleteAfterShutdown(ThreadFactory factory) throws Exception { 1100 try (var scope = new CollectAll<String>(factory)) { 1101 Callable<String> task1 = () -> { 1102 try { 1103 Thread.sleep(Duration.ofDays(1)); 1104 } catch (InterruptedException ignore) { } 1105 return "foo"; 1106 }; 1107 Callable<String> task2 = () -> { 1108 Thread.sleep(Duration.ofDays(1)); 1109 return "bar"; 1110 }; 1111 Callable<String> task3 = () -> "baz"; 1112 1113 // forked before shutdown, will complete after shutdown 1114 var subtask1 = scope.fork(task1); 1115 var subtask2 = scope.fork(task2); 1116 1117 scope.shutdown(); 1118 1119 // forked after shutdown 1120 var subtask3 = scope.fork(task3); 1121 1122 scope.join(); 1123 1124 // handleComplete should not be called 1125 for (int i = 0; i < 3; i++) { 1126 assertEquals(0, scope.subtasks().size()); 1127 Thread.sleep(20); 1128 } 1129 1130 assertEquals(Subtask.State.UNAVAILABLE, subtask1.state()); 1131 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 1132 assertEquals(Subtask.State.UNAVAILABLE, subtask3.state()); 1133 } 1134 } 1135 1136 /** 1137 * Test that the default handleComplete throws IllegalArgumentException if called 1138 * with a running task. 1139 */ 1140 @Test 1141 void testHandleCompleteThrows() throws Exception { 1142 class TestScope<T> extends StructuredTaskScope<T> { 1143 protected void handleComplete(Subtask<? extends T> subtask) { 1144 super.handleComplete(subtask); 1145 } 1146 } 1147 1148 try (var scope = new TestScope<String>()) { 1149 var subtask = scope.fork(() -> { 1150 Thread.sleep(Duration.ofDays(1)); 1151 return "foo"; 1152 }); 1153 1154 // running task 1155 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1156 assertThrows(IllegalArgumentException.class, () -> scope.handleComplete(subtask)); 1157 scope.shutdown(); 1158 1159 // null task 1160 assertThrows(NullPointerException.class, () -> scope.handleComplete(null)); 1161 1162 scope.join(); 1163 } 1164 } 1165 1166 /** 1167 * Test ensureOwnerAndJoined. 1168 */ 1169 @ParameterizedTest 1170 @MethodSource("factories") 1171 void testEnsureOwnerAndJoined(ThreadFactory factory) throws Exception { 1172 class MyScope<T> extends StructuredTaskScope<T> { 1173 MyScope(ThreadFactory factory) { 1174 super(null, factory); 1175 } 1176 void invokeEnsureOwnerAndJoined() { 1177 super.ensureOwnerAndJoined(); 1178 } 1179 } 1180 1181 try (var scope = new MyScope<Boolean>(factory)) { 1182 // owner thread, before join 1183 scope.fork(() -> true); 1184 assertThrows(IllegalStateException.class, () -> { 1185 scope.invokeEnsureOwnerAndJoined(); 1186 }); 1187 1188 // owner thread, after join 1189 scope.join(); 1190 scope.invokeEnsureOwnerAndJoined(); 1191 1192 // thread in scope cannot invoke ensureOwnerAndJoined 1193 Subtask<Boolean> subtask = scope.fork(() -> { 1194 assertThrows(WrongThreadException.class, () -> { 1195 scope.invokeEnsureOwnerAndJoined(); 1196 }); 1197 return true; 1198 }); 1199 scope.join(); 1200 assertTrue(subtask.get()); 1201 1202 // random thread cannot invoke ensureOwnerAndJoined 1203 try (var pool = Executors.newSingleThreadExecutor()) { 1204 Future<Void> future = pool.submit(() -> { 1205 assertThrows(WrongThreadException.class, () -> { 1206 scope.invokeEnsureOwnerAndJoined(); 1207 }); 1208 return null; 1209 }); 1210 future.get(); 1211 } 1212 } 1213 } 1214 1215 /** 1216 * Test ensureOwnerAndJoined after the task scope has been closed. 1217 */ 1218 @ParameterizedTest 1219 @MethodSource("factories") 1220 void testEnsureOwnerAndJoinedAfterClose(ThreadFactory factory) throws Exception { 1221 class MyScope<T> extends StructuredTaskScope<T> { 1222 MyScope(ThreadFactory factory) { 1223 super(null, factory); 1224 } 1225 public void invokeEnsureOwnerAndJoined() { 1226 super.ensureOwnerAndJoined(); 1227 } 1228 } 1229 1230 // ensureOwnerAndJoined after close, join invoked 1231 try (var scope = new MyScope<String>(factory)) { 1232 scope.fork(() -> "foo"); 1233 scope.join(); 1234 scope.close(); 1235 scope.invokeEnsureOwnerAndJoined(); // should not throw 1236 } 1237 1238 // ensureOwnerAndJoined after close, join not invoked 1239 try (var scope = new MyScope<String>(factory)) { 1240 scope.fork(() -> "foo"); 1241 assertThrows(IllegalStateException.class, scope::close); 1242 scope.invokeEnsureOwnerAndJoined(); // should not throw 1243 } 1244 } 1245 1246 1247 /** 1248 * Test toString. 1249 */ 1250 @Test 1251 void testToString() throws Exception { 1252 ThreadFactory factory = Thread.ofVirtual().factory(); 1253 try (var scope = new StructuredTaskScope<Object>("duke", factory)) { 1254 // open 1255 assertTrue(scope.toString().contains("duke")); 1256 1257 // shutdown 1258 scope.shutdown(); 1259 assertTrue(scope.toString().contains("duke")); 1260 1261 // closed 1262 scope.join(); 1263 scope.close(); 1264 assertTrue(scope.toString().contains("duke")); 1265 } 1266 } 1267 1268 /** 1269 * Test Subtask with task that completes successfully. 1270 */ 1271 @ParameterizedTest 1272 @MethodSource("factories") 1273 void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception { 1274 try (var scope = new StructuredTaskScope<String>(null, factory)) { 1275 Callable<String> task = () -> "foo"; 1276 Subtask<String> subtask = scope.fork(task); 1277 1278 // before join, owner thread 1279 assertEquals(task, subtask.task()); 1280 assertThrows(IllegalStateException.class, subtask::get); 1281 assertThrows(IllegalStateException.class, subtask::exception); 1282 1283 scope.join(); 1284 1285 // after join 1286 assertEquals(task, subtask.task()); 1287 assertEquals(Subtask.State.SUCCESS, subtask.state()); 1288 assertEquals("foo", subtask.get()); 1289 assertThrows(IllegalStateException.class, subtask::exception); 1290 } 1291 } 1292 1293 /** 1294 * Test Subtask with task that fails. 1295 */ 1296 @ParameterizedTest 1297 @MethodSource("factories") 1298 void testSubtaskWhenFailed(ThreadFactory factory) throws Exception { 1299 try (var scope = new StructuredTaskScope<String>(null, factory)) { 1300 Callable<String> task = () -> { throw new FooException(); }; 1301 Subtask<String> subtask = scope.fork(task); 1302 1303 // before join, owner thread 1304 assertEquals(task, subtask.task()); 1305 assertThrows(IllegalStateException.class, subtask::get); 1306 assertThrows(IllegalStateException.class, subtask::exception); 1307 1308 scope.join(); 1309 1310 // after join 1311 assertEquals(task, subtask.task()); 1312 assertEquals(Subtask.State.FAILED, subtask.state()); 1313 assertThrows(IllegalStateException.class, subtask::get); 1314 assertTrue(subtask.exception() instanceof FooException); 1315 } 1316 } 1317 1318 /** 1319 * Test Subtask with a task that has not completed. 1320 */ 1321 @ParameterizedTest 1322 @MethodSource("factories") 1323 void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception { 1324 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 1325 Callable<Void> task = () -> { 1326 Thread.sleep(Duration.ofDays(1)); 1327 return null; 1328 }; 1329 Subtask<Void> subtask = scope.fork(task); 1330 1331 // before join 1332 assertEquals(task, subtask.task()); 1333 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1334 assertThrows(IllegalStateException.class, subtask::get); 1335 assertThrows(IllegalStateException.class, subtask::exception); 1336 1337 // attempt join, join throws 1338 Thread.currentThread().interrupt(); 1339 assertThrows(InterruptedException.class, scope::join); 1340 1341 // after join 1342 assertEquals(task, subtask.task()); 1343 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1344 assertThrows(IllegalStateException.class, subtask::get); 1345 assertThrows(IllegalStateException.class, subtask::exception); 1346 } 1347 } 1348 1349 /** 1350 * Test Subtask when forked after shutdown. 1351 */ 1352 @ParameterizedTest 1353 @MethodSource("factories") 1354 void testSubtaskWhenShutdown(ThreadFactory factory) throws Exception { 1355 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 1356 Callable<Void> task = () -> { 1357 Thread.sleep(Duration.ofDays(1)); 1358 return null; 1359 }; 1360 1361 scope.shutdown(); 1362 1363 // fork after shutdown 1364 Subtask<Void> subtask = scope.fork(task); 1365 scope.join(); 1366 assertEquals(task, subtask.task()); 1367 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1368 assertThrows(IllegalStateException.class, subtask::get); 1369 assertThrows(IllegalStateException.class, subtask::exception); 1370 } 1371 } 1372 1373 /** 1374 * Test Subtask::toString. 1375 */ 1376 @Test 1377 void testSubtaskToString() throws Exception { 1378 try (var scope = new StructuredTaskScope<Object>()) { 1379 // success 1380 var subtask1 = scope.fork(() -> "foo"); 1381 scope.join(); 1382 assertTrue(subtask1.toString().contains("Completed successfully")); 1383 1384 // failed 1385 var subtask2 = scope.fork(() -> { throw new FooException(); }); 1386 scope.join(); 1387 assertTrue(subtask2.toString().contains("Failed")); 1388 1389 // not completed 1390 Callable<Void> sleepForDay = () -> { 1391 Thread.sleep(Duration.ofDays(1)); 1392 return null; 1393 }; 1394 var subtask3 = scope.fork(sleepForDay); 1395 assertTrue(subtask3.toString().contains("Unavailable")); 1396 1397 scope.shutdown(); 1398 1399 // forked after shutdown 1400 var subtask4 = scope.fork(sleepForDay); 1401 assertTrue(subtask4.toString().contains("Unavailable")); 1402 1403 scope.join(); 1404 } 1405 } 1406 1407 /** 1408 * Test ShutdownOnSuccess with no completed tasks. 1409 */ 1410 @Test 1411 void testShutdownOnSuccess1() throws Exception { 1412 try (var scope = new ShutdownOnSuccess<Object>()) { 1413 assertThrows(IllegalStateException.class, () -> scope.result()); 1414 assertThrows(IllegalStateException.class, () -> scope.result(e -> null)); 1415 } 1416 } 1417 1418 /** 1419 * Test ShutdownOnSuccess with tasks that complete successfully. 1420 */ 1421 @ParameterizedTest 1422 @MethodSource("factories") 1423 void testShutdownOnSuccess2(ThreadFactory factory) throws Exception { 1424 try (var scope = new ShutdownOnSuccess<String>(null, factory)) { 1425 scope.fork(() -> "foo"); 1426 scope.join(); // ensures foo completes first 1427 scope.fork(() -> "bar"); 1428 scope.join(); 1429 assertEquals("foo", scope.result()); 1430 assertEquals("foo", scope.result(e -> null)); 1431 } 1432 } 1433 1434 /** 1435 * Test ShutdownOnSuccess with a task that completes successfully with a null result. 1436 */ 1437 @ParameterizedTest 1438 @MethodSource("factories") 1439 void testShutdownOnSuccess3(ThreadFactory factory) throws Exception { 1440 try (var scope = new ShutdownOnSuccess<Object>(null, factory)) { 1441 scope.fork(() -> null); 1442 scope.join(); 1443 assertNull(scope.result()); 1444 assertNull(scope.result(e -> null)); 1445 } 1446 } 1447 1448 /** 1449 * Test ShutdownOnSuccess with tasks that complete succcessfully and tasks that fail. 1450 */ 1451 @ParameterizedTest 1452 @MethodSource("factories") 1453 void testShutdownOnSuccess4(ThreadFactory factory) throws Exception { 1454 try (var scope = new ShutdownOnSuccess<String>(null, factory)) { 1455 scope.fork(() -> "foo"); 1456 scope.fork(() -> { throw new ArithmeticException(); }); 1457 scope.join(); 1458 assertEquals("foo", scope.result()); 1459 assertEquals("foo", scope.result(e -> null)); 1460 } 1461 } 1462 1463 /** 1464 * Test ShutdownOnSuccess with a task that fails. 1465 */ 1466 @ParameterizedTest 1467 @MethodSource("factories") 1468 void testShutdownOnSuccess5(ThreadFactory factory) throws Exception { 1469 try (var scope = new ShutdownOnSuccess<Object>(null, factory)) { 1470 scope.fork(() -> { throw new ArithmeticException(); }); 1471 scope.join(); 1472 Throwable ex = assertThrows(ExecutionException.class, () -> scope.result()); 1473 assertTrue(ex.getCause() instanceof ArithmeticException); 1474 ex = assertThrows(FooException.class, () -> scope.result(e -> new FooException(e))); 1475 assertTrue(ex.getCause() instanceof ArithmeticException); 1476 } 1477 } 1478 1479 /** 1480 * Test ShutdownOnSuccess methods are confined to the owner. 1481 */ 1482 @ParameterizedTest 1483 @MethodSource("factories") 1484 void testShutdownOnSuccessConfined(ThreadFactory factory) throws Exception { 1485 // owner before join 1486 try (var scope = new ShutdownOnSuccess<Boolean>(null, factory)) { 1487 scope.fork(() -> { throw new FooException(); }); 1488 assertThrows(IllegalStateException.class, scope::result); 1489 assertThrows(IllegalStateException.class, () -> { 1490 scope.result(e -> new RuntimeException(e)); 1491 }); 1492 scope.join(); 1493 } 1494 1495 // non-owner 1496 try (var scope = new ShutdownOnSuccess<Boolean>(null, factory)) { 1497 Subtask<Boolean> subtask = scope.fork(() -> { 1498 assertThrows(WrongThreadException.class, scope::result); 1499 assertThrows(WrongThreadException.class, () -> { 1500 scope.result(e -> new RuntimeException(e)); 1501 }); 1502 return true; 1503 }); 1504 scope.join(); 1505 assertTrue(subtask.get()); 1506 } 1507 } 1508 1509 /** 1510 * Test ShutdownOnFailure with no completed tasks. 1511 */ 1512 @Test 1513 void testShutdownOnFailure1() throws Throwable { 1514 try (var scope = new ShutdownOnFailure()) { 1515 assertTrue(scope.exception().isEmpty()); 1516 scope.throwIfFailed(); 1517 scope.throwIfFailed(e -> new FooException(e)); 1518 } 1519 } 1520 1521 /** 1522 * Test ShutdownOnFailure with tasks that complete successfully. 1523 */ 1524 @ParameterizedTest 1525 @MethodSource("factories") 1526 void testShutdownOnFailure2(ThreadFactory factory) throws Throwable { 1527 try (var scope = new ShutdownOnFailure(null, factory)) { 1528 scope.fork(() -> "foo"); 1529 scope.fork(() -> "bar"); 1530 scope.join(); 1531 1532 // no exception 1533 assertTrue(scope.exception().isEmpty()); 1534 scope.throwIfFailed(); 1535 scope.throwIfFailed(e -> new FooException(e)); 1536 } 1537 } 1538 1539 /** 1540 * Test ShutdownOnFailure with tasks that complete succcessfully and tasks that fail. 1541 */ 1542 @ParameterizedTest 1543 @MethodSource("factories") 1544 void testShutdownOnFailure3(ThreadFactory factory) throws Throwable { 1545 try (var scope = new ShutdownOnFailure(null, factory)) { 1546 1547 // one task completes successfully, the other fails 1548 scope.fork(() -> "foo"); 1549 scope.fork(() -> { throw new ArithmeticException(); }); 1550 scope.join(); 1551 1552 Throwable ex = scope.exception().orElse(null); 1553 assertTrue(ex instanceof ArithmeticException); 1554 1555 ex = assertThrows(ExecutionException.class, () -> scope.throwIfFailed()); 1556 assertTrue(ex.getCause() instanceof ArithmeticException); 1557 1558 ex = assertThrows(FooException.class, 1559 () -> scope.throwIfFailed(e -> new FooException(e))); 1560 assertTrue(ex.getCause() instanceof ArithmeticException); 1561 } 1562 } 1563 1564 /** 1565 * Test ShutdownOnFailure methods are confined to the owner. 1566 */ 1567 @ParameterizedTest 1568 @MethodSource("factories") 1569 void testShutdownOnFailureConfined(ThreadFactory factory) throws Exception { 1570 // owner before join 1571 try (var scope = new ShutdownOnFailure(null, factory)) { 1572 scope.fork(() -> "foo"); 1573 assertThrows(IllegalStateException.class, scope::exception); 1574 assertThrows(IllegalStateException.class, scope::throwIfFailed); 1575 assertThrows(IllegalStateException.class, () -> { 1576 scope.throwIfFailed(e -> new RuntimeException(e)); 1577 }); 1578 scope.join(); 1579 } 1580 1581 // non-owner 1582 try (var scope = new ShutdownOnFailure(null, factory)) { 1583 Subtask<Boolean> subtask = scope.fork(() -> { 1584 assertThrows(WrongThreadException.class, scope::exception); 1585 assertThrows(WrongThreadException.class, scope::throwIfFailed); 1586 assertThrows(WrongThreadException.class, () -> { 1587 scope.throwIfFailed(e -> new RuntimeException(e)); 1588 }); 1589 return true; 1590 }); 1591 scope.join(); 1592 assertTrue(subtask.get()); 1593 } 1594 } 1595 1596 /** 1597 * Test for NullPointerException. 1598 */ 1599 @Test 1600 void testNulls() throws Exception { 1601 assertThrows(NullPointerException.class, () -> new StructuredTaskScope("", null)); 1602 try (var scope = new StructuredTaskScope<Object>()) { 1603 assertThrows(NullPointerException.class, () -> scope.fork(null)); 1604 assertThrows(NullPointerException.class, () -> scope.joinUntil(null)); 1605 } 1606 1607 assertThrows(NullPointerException.class, () -> new ShutdownOnSuccess<Object>("", null)); 1608 try (var scope = new ShutdownOnSuccess<Object>()) { 1609 assertThrows(NullPointerException.class, () -> scope.fork(null)); 1610 assertThrows(NullPointerException.class, () -> scope.joinUntil(null)); 1611 assertThrows(NullPointerException.class, () -> scope.result(null)); 1612 } 1613 1614 assertThrows(NullPointerException.class, () -> new ShutdownOnFailure("", null)); 1615 try (var scope = new ShutdownOnFailure()) { 1616 assertThrows(NullPointerException.class, () -> scope.fork(null)); 1617 assertThrows(NullPointerException.class, () -> scope.joinUntil(null)); 1618 assertThrows(NullPointerException.class, () -> scope.throwIfFailed(null)); 1619 } 1620 } 1621 1622 /** 1623 * A runtime exception for tests. 1624 */ 1625 private static class FooException extends RuntimeException { 1626 FooException() { } 1627 FooException(Throwable cause) { super(cause); } 1628 } 1629 1630 /** 1631 * Returns the current time in milliseconds. 1632 */ 1633 private long millisTime() { 1634 long now = System.nanoTime(); 1635 return TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS); 1636 } 1637 1638 /** 1639 * Check the duration of a task 1640 * @param start start time, in milliseconds 1641 * @param min minimum expected duration, in milliseconds 1642 * @param max maximum expected duration, in milliseconds 1643 * @return the duration (now - start), in milliseconds 1644 */ 1645 private long expectDuration(long start, long min, long max) { 1646 long duration = millisTime() - start; 1647 assertTrue(duration >= min, 1648 "Duration " + duration + "ms, expected >= " + min + "ms"); 1649 assertTrue(duration <= max, 1650 "Duration " + duration + "ms, expected <= " + max + "ms"); 1651 return duration; 1652 } 1653 1654 /** 1655 * Interrupts a thread when it waits (timed or untimed) at location "{@code c.m}". 1656 * {@code c} is the fully qualified class name and {@code m} is the method name. 1657 */ 1658 private void interruptThreadAt(Thread target, String location) throws InterruptedException { 1659 int index = location.lastIndexOf('.'); 1660 String className = location.substring(0, index); 1661 String methodName = location.substring(index + 1); 1662 1663 boolean found = false; 1664 while (!found) { 1665 Thread.State state = target.getState(); 1666 assertTrue(state != TERMINATED); 1667 if ((state == WAITING || state == TIMED_WAITING) 1668 && contains(target.getStackTrace(), className, methodName)) { 1669 found = true; 1670 } else { 1671 Thread.sleep(20); 1672 } 1673 } 1674 target.interrupt(); 1675 } 1676 1677 /** 1678 * Schedules the current thread to be interrupted when it waits (timed or untimed) 1679 * at the given location. 1680 */ 1681 private void scheduleInterruptAt(String location) { 1682 Thread target = Thread.currentThread(); 1683 scheduler.submit(() -> { 1684 interruptThreadAt(target, location); 1685 return null; 1686 }); 1687 } 1688 1689 /** 1690 * Returns true if the given stack trace contains an element for the given class 1691 * and method name. 1692 */ 1693 private boolean contains(StackTraceElement[] stack, String className, String methodName) { 1694 return Arrays.stream(stack) 1695 .anyMatch(e -> className.equals(e.getClassName()) 1696 && methodName.equals(e.getMethodName())); 1697 } 1698 }