1 /* 2 * Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test id=platform 26 * @bug 8284199 8296779 8306647 27 * @summary Basic tests for StructuredTaskScope 28 * @enablePreview 29 * @run junit/othervm -DthreadFactory=platform StructuredTaskScopeTest 30 */ 31 32 /* 33 * @test id=virtual 34 * @enablePreview 35 * @run junit/othervm -DthreadFactory=virtual StructuredTaskScopeTest 36 */ 37 38 import java.time.Duration; 39 import java.io.IOException; 40 import java.time.Instant; 41 import java.util.Arrays; 42 import java.util.ArrayList; 43 import java.util.List; 44 import java.util.Set; 45 import java.util.concurrent.Callable; 46 import java.util.concurrent.ConcurrentHashMap; 47 import java.util.concurrent.CountDownLatch; 48 import java.util.concurrent.Executors; 49 import java.util.concurrent.ExecutionException; 50 import java.util.concurrent.Future; 51 import java.util.concurrent.ThreadFactory; 52 import java.util.concurrent.TimeoutException; 53 import java.util.concurrent.TimeUnit; 54 import java.util.concurrent.RejectedExecutionException; 55 import java.util.concurrent.ScheduledExecutorService; 56 import java.util.concurrent.StructuredTaskScope; 57 import java.util.concurrent.StructuredTaskScope.Subtask; 58 import java.util.concurrent.StructuredTaskScope.ShutdownOnSuccess; 59 import java.util.concurrent.StructuredTaskScope.ShutdownOnFailure; 60 import java.util.concurrent.StructureViolationException; 61 import java.util.concurrent.atomic.AtomicBoolean; 62 import java.util.concurrent.atomic.AtomicInteger; 63 import java.util.concurrent.atomic.AtomicReference; 64 import java.util.function.Supplier; 65 import java.util.stream.Stream; 66 import static java.lang.Thread.State.*; 67 68 import org.junit.jupiter.api.Test; 69 import org.junit.jupiter.api.BeforeAll; 70 import org.junit.jupiter.api.AfterAll; 71 import org.junit.jupiter.params.ParameterizedTest; 72 import org.junit.jupiter.params.provider.MethodSource; 73 import static org.junit.jupiter.api.Assertions.*; 74 75 class StructuredTaskScopeTest { 76 private static ScheduledExecutorService scheduler; 77 private static List<ThreadFactory> threadFactories; 78 79 @BeforeAll 80 static void setup() throws Exception { 81 scheduler = Executors.newSingleThreadScheduledExecutor(); 82 83 // thread factories 84 String value = System.getProperty("threadFactory"); 85 List<ThreadFactory> list = new ArrayList<>(); 86 if (value == null || value.equals("platform")) 87 list.add(Thread.ofPlatform().factory()); 88 if (value == null || value.equals("virtual")) 89 list.add(Thread.ofVirtual().factory()); 90 assertTrue(list.size() > 0, "No thread factories for tests"); 91 threadFactories = list; 92 } 93 94 @AfterAll 95 static void shutdown() { 96 scheduler.shutdown(); 97 } 98 99 private static Stream<ThreadFactory> factories() { 100 return threadFactories.stream(); 101 } 102 103 /** 104 * Test that fork creates a new thread for each task. 105 */ 106 @ParameterizedTest 107 @MethodSource("factories") 108 void testForkCreatesThread(ThreadFactory factory) throws Exception { 109 Set<Long> tids = ConcurrentHashMap.newKeySet(); 110 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 111 for (int i = 0; i < 100; i++) { 112 scope.fork(() -> { 113 tids.add(Thread.currentThread().threadId()); 114 return null; 115 }); 116 } 117 scope.join(); 118 } 119 assertEquals(100, tids.size()); 120 } 121 122 /** 123 * Test that fork creates a new virtual thread for each task. 124 */ 125 @Test 126 void testForkCreateVirtualThread() throws Exception { 127 Set<Thread> threads = ConcurrentHashMap.newKeySet(); 128 try (var scope = new StructuredTaskScope<Object>()) { 129 for (int i = 0; i < 100; i++) { 130 scope.fork(() -> { 131 threads.add(Thread.currentThread()); 132 return null; 133 }); 134 } 135 scope.join(); 136 } 137 assertEquals(100, threads.size()); 138 threads.forEach(t -> assertTrue(t.isVirtual())); 139 } 140 141 /** 142 * Test that fork creates a new thread with the given thread factory. 143 */ 144 @ParameterizedTest 145 @MethodSource("factories") 146 void testForkUsesFactory(ThreadFactory factory) throws Exception { 147 var count = new AtomicInteger(); 148 ThreadFactory countingFactory = task -> { 149 count.incrementAndGet(); 150 return factory.newThread(task); 151 }; 152 try (var scope = new StructuredTaskScope<Object>(null, countingFactory)) { 153 for (int i = 0; i < 100; i++) { 154 scope.fork(() -> null); 155 } 156 scope.join(); 157 } 158 assertEquals(100, count.get()); 159 } 160 161 /** 162 * Test fork is confined to threads in the scope "tree". 163 */ 164 @ParameterizedTest 165 @MethodSource("factories") 166 void testForkConfined(ThreadFactory factory) throws Exception { 167 try (var scope1 = new StructuredTaskScope<Boolean>(); 168 var scope2 = new StructuredTaskScope<Boolean>()) { 169 170 // thread in scope1 cannot fork thread in scope2 171 Subtask<Boolean> subtask1 = scope1.fork(() -> { 172 assertThrows(WrongThreadException.class, () -> { 173 scope2.fork(() -> null); 174 }); 175 return true; 176 }); 177 178 // thread in scope2 can fork thread in scope1 179 Subtask<Boolean> subtask2 = scope2.fork(() -> { 180 scope1.fork(() -> null); 181 return true; 182 }); 183 184 scope2.join(); 185 scope1.join(); 186 187 assertTrue(subtask1.get()); 188 assertTrue(subtask2.get()); 189 190 // random thread cannot fork 191 try (var pool = Executors.newSingleThreadExecutor()) { 192 Future<Void> future = pool.submit(() -> { 193 assertThrows(WrongThreadException.class, () -> { 194 scope1.fork(() -> null); 195 }); 196 assertThrows(WrongThreadException.class, () -> { 197 scope2.fork(() -> null); 198 }); 199 return null; 200 }); 201 future.get(); 202 } 203 } 204 } 205 206 /** 207 * Test fork after join completes. 208 */ 209 @ParameterizedTest 210 @MethodSource("factories") 211 void testForkAfterJoin(ThreadFactory factory) throws Exception { 212 try (var scope = new StructuredTaskScope<String>(null, factory)) { 213 // round 1 214 var subtask1 = scope.fork(() -> "foo"); 215 scope.join(); 216 assertEquals("foo", subtask1.get()); 217 218 // round 2 219 var subtask2 = scope.fork(() -> "bar"); 220 assertEquals("foo", subtask1.get()); 221 scope.join(); 222 assertEquals("foo", subtask1.get()); 223 assertEquals("bar", subtask2.get()); 224 225 // round 3 226 var subtask3 = scope.fork(() -> "baz"); 227 assertEquals("foo", subtask1.get()); 228 assertEquals("bar", subtask2.get()); 229 scope.join(); 230 assertEquals("foo", subtask1.get()); 231 assertEquals("bar", subtask2.get()); 232 assertEquals("baz", subtask3.get()); 233 } 234 } 235 236 /** 237 * Test fork after join throws. 238 */ 239 @ParameterizedTest 240 @MethodSource("factories") 241 void testForkAfterJoinThrows(ThreadFactory factory) throws Exception { 242 try (var scope = new StructuredTaskScope<String>(null, factory)) { 243 var latch = new CountDownLatch(1); 244 var subtask1 = scope.fork(() -> { 245 latch.await(); 246 return "foo"; 247 }); 248 249 // join throws 250 Thread.currentThread().interrupt(); 251 assertThrows(InterruptedException.class, scope::join); 252 253 // allow subtask1 to finish 254 latch.countDown(); 255 256 // continue to fork 257 var subtask2 = scope.fork(() -> "bar"); 258 scope.join(); 259 assertEquals("foo", subtask1.get()); 260 assertEquals("bar", subtask2.get()); 261 } 262 } 263 264 /** 265 * Test fork after scope is shutdown. 266 */ 267 @ParameterizedTest 268 @MethodSource("factories") 269 void testForkAfterShutdown(ThreadFactory factory) throws Exception { 270 var executed = new AtomicBoolean(); 271 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 272 scope.shutdown(); 273 Subtask<String> subtask = scope.fork(() -> { 274 executed.set(true); 275 return null; 276 }); 277 scope.join(); 278 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 279 assertThrows(IllegalStateException.class, subtask::get); 280 assertThrows(IllegalStateException.class, subtask::exception); 281 } 282 assertFalse(executed.get()); 283 } 284 285 /** 286 * Test fork after scope is closed. 287 */ 288 @ParameterizedTest 289 @MethodSource("factories") 290 void testForkAfterClose(ThreadFactory factory) throws Exception { 291 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 292 scope.close(); 293 assertThrows(IllegalStateException.class, () -> scope.fork(() -> null)); 294 } 295 } 296 297 /** 298 * Test fork when the thread factory rejects creating a thread. 299 */ 300 @Test 301 void testForkRejectedExecutionException() throws Exception { 302 ThreadFactory factory = task -> null; 303 try (var scope = new StructuredTaskScope(null, factory)) { 304 assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null)); 305 scope.join(); 306 } 307 } 308 309 /** 310 * Test join with no subtasks. 311 */ 312 @Test 313 void testJoinWithNoSubtasks() throws Exception { 314 try (var scope = new StructuredTaskScope()) { 315 scope.join(); 316 } 317 } 318 319 /** 320 * Test join with unfinished subtasks. 321 */ 322 @ParameterizedTest 323 @MethodSource("factories") 324 void testJoinWithSubtasks(ThreadFactory factory) throws Exception { 325 try (var scope = new StructuredTaskScope(null, factory)) { 326 Subtask<String> subtask = scope.fork(() -> { 327 Thread.sleep(Duration.ofMillis(50)); 328 return "foo"; 329 }); 330 scope.join(); 331 assertEquals("foo", subtask.get()); 332 } 333 } 334 335 /** 336 * Test join is owner confined. 337 */ 338 @ParameterizedTest 339 @MethodSource("factories") 340 void testJoinConfined(ThreadFactory factory) throws Exception { 341 try (var scope = new StructuredTaskScope<Boolean>()) { 342 343 // thread in scope cannot join 344 Subtask<Boolean> subtask = scope.fork(() -> { 345 assertThrows(WrongThreadException.class, () -> { scope.join(); }); 346 return true; 347 }); 348 349 scope.join(); 350 351 assertTrue(subtask.get()); 352 353 // random thread cannot join 354 try (var pool = Executors.newSingleThreadExecutor()) { 355 Future<Void> future = pool.submit(() -> { 356 assertThrows(WrongThreadException.class, scope::join); 357 return null; 358 }); 359 future.get(); 360 } 361 } 362 } 363 364 /** 365 * Test join with interrupt status set. 366 */ 367 @ParameterizedTest 368 @MethodSource("factories") 369 void testInterruptJoin1(ThreadFactory factory) throws Exception { 370 try (var scope = new StructuredTaskScope(null, factory)) { 371 var latch = new CountDownLatch(1); 372 373 Subtask<String> subtask = scope.fork(() -> { 374 latch.await(); 375 return "foo"; 376 }); 377 378 // join should throw 379 Thread.currentThread().interrupt(); 380 try { 381 scope.join(); 382 fail("join did not throw"); 383 } catch (InterruptedException expected) { 384 assertFalse(Thread.interrupted()); // interrupt status should be clear 385 } finally { 386 // let task continue 387 latch.countDown(); 388 } 389 390 // join should complete 391 scope.join(); 392 assertEquals("foo", subtask.get()); 393 } 394 } 395 396 /** 397 * Test interrupt of thread blocked in join. 398 */ 399 @ParameterizedTest 400 @MethodSource("factories") 401 void testInterruptJoin2(ThreadFactory factory) throws Exception { 402 try (var scope = new StructuredTaskScope(null, factory)) { 403 var latch = new CountDownLatch(1); 404 Subtask<String> subtask = scope.fork(() -> { 405 latch.await(); 406 return "foo"; 407 }); 408 409 // join should throw 410 scheduleInterruptAt("java.util.concurrent.StructuredTaskScope.join"); 411 try { 412 scope.join(); 413 fail("join did not throw"); 414 } catch (InterruptedException expected) { 415 assertFalse(Thread.interrupted()); // interrupt status should be clear 416 } finally { 417 // let task continue 418 latch.countDown(); 419 } 420 421 // join should complete 422 scope.join(); 423 assertEquals("foo", subtask.get()); 424 } 425 } 426 427 /** 428 * Test join when scope is shutdown. 429 */ 430 @ParameterizedTest 431 @MethodSource("factories") 432 void testJoinWithShutdown1(ThreadFactory factory) throws Exception { 433 try (var scope = new StructuredTaskScope<String>(null, factory)) { 434 var interrupted = new CountDownLatch(1); 435 var finish = new CountDownLatch(1); 436 437 Subtask<String> subtask = scope.fork(() -> { 438 try { 439 Thread.sleep(Duration.ofDays(1)); 440 } catch (InterruptedException e) { 441 interrupted.countDown(); 442 } 443 finish.await(); 444 return "foo"; 445 }); 446 447 scope.shutdown(); // should interrupt task 448 449 interrupted.await(); 450 451 scope.join(); 452 453 // signal task to finish 454 finish.countDown(); 455 } 456 } 457 458 /** 459 * Test shutdown when owner is blocked in join. 460 */ 461 @ParameterizedTest 462 @MethodSource("factories") 463 void testJoinWithShutdown2(ThreadFactory factory) throws Exception { 464 class MyScope<T> extends StructuredTaskScope<T> { 465 MyScope(ThreadFactory factory) { 466 super(null, factory); 467 } 468 @Override 469 protected void handleComplete(Subtask<? extends T> subtask) { 470 shutdown(); 471 } 472 } 473 474 try (var scope = new MyScope<String>(factory)) { 475 Subtask<String> subtask1 = scope.fork(() -> { 476 Thread.sleep(Duration.ofMillis(50)); 477 return "foo"; 478 }); 479 Subtask<String> subtask2 = scope.fork(() -> { 480 Thread.sleep(Duration.ofDays(1)); 481 return "bar"; 482 }); 483 484 // join should wakeup when shutdown is called 485 scope.join(); 486 487 // task1 should have completed successfully 488 assertEquals(Subtask.State.SUCCESS, subtask1.state()); 489 assertEquals("foo", subtask1.get()); 490 assertThrows(IllegalStateException.class, subtask1::exception); 491 492 // task2 result/exception not available 493 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 494 assertThrows(IllegalStateException.class, subtask2::get); 495 assertThrows(IllegalStateException.class, subtask2::exception); 496 } 497 } 498 499 /** 500 * Test join after scope is closed. 501 */ 502 @Test 503 void testJoinAfterClose() throws Exception { 504 try (var scope = new StructuredTaskScope()) { 505 scope.join(); 506 scope.close(); 507 assertThrows(IllegalStateException.class, () -> scope.join()); 508 assertThrows(IllegalStateException.class, () -> scope.joinUntil(Instant.now())); 509 } 510 } 511 512 /** 513 * Test joinUntil, subtasks finish before deadline expires. 514 */ 515 @ParameterizedTest 516 @MethodSource("factories") 517 void testJoinUntil1(ThreadFactory factory) throws Exception { 518 try (var scope = new StructuredTaskScope<String>(null, factory)) { 519 Subtask<String> subtask = scope.fork(() -> { 520 try { 521 Thread.sleep(Duration.ofSeconds(2)); 522 } catch (InterruptedException e) { } 523 return "foo"; 524 }); 525 526 long startMillis = millisTime(); 527 scope.joinUntil(Instant.now().plusSeconds(30)); 528 expectDuration(startMillis, /*min*/1900, /*max*/20_000); 529 assertEquals("foo", subtask.get()); 530 } 531 } 532 533 /** 534 * Test joinUntil, deadline expires before subtasks finish. 535 */ 536 @ParameterizedTest 537 @MethodSource("factories") 538 void testJoinUntil2(ThreadFactory factory) throws Exception { 539 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 540 Subtask<Void> subtask = scope.fork(() -> { 541 Thread.sleep(Duration.ofDays(1)); 542 return null; 543 }); 544 545 long startMillis = millisTime(); 546 try { 547 scope.joinUntil(Instant.now().plusSeconds(2)); 548 } catch (TimeoutException e) { 549 expectDuration(startMillis, /*min*/1900, /*max*/20_000); 550 } 551 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 552 } 553 } 554 555 /** 556 * Test joinUntil many times. 557 */ 558 @ParameterizedTest 559 @MethodSource("factories") 560 void testJoinUntil3(ThreadFactory factory) throws Exception { 561 try (var scope = new StructuredTaskScope<String>(null, factory)) { 562 Subtask<String> subtask = scope.fork(() -> { 563 Thread.sleep(Duration.ofDays(1)); 564 return null; 565 }); 566 567 for (int i = 0; i < 3; i++) { 568 try { 569 scope.joinUntil(Instant.now().plusMillis(50)); 570 fail("joinUntil did not throw"); 571 } catch (TimeoutException expected) { 572 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 573 } 574 } 575 } 576 } 577 578 /** 579 * Test joinUntil with a deadline that has already expired. 580 */ 581 @ParameterizedTest 582 @MethodSource("factories") 583 void testJoinUntil4(ThreadFactory factory) throws Exception { 584 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 585 Subtask<Void> subtask = scope.fork(() -> { 586 Thread.sleep(Duration.ofDays(1)); 587 return null; 588 }); 589 590 // now 591 try { 592 scope.joinUntil(Instant.now()); 593 fail("joinUntil did not throw"); 594 } catch (TimeoutException expected) { 595 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 596 } 597 598 // in the past 599 try { 600 scope.joinUntil(Instant.now().minusSeconds(1)); 601 fail("joinUntil did not throw"); 602 } catch (TimeoutException expected) { 603 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 604 } 605 } 606 } 607 608 /** 609 * Test joinUntil with interrupt status set. 610 */ 611 @ParameterizedTest 612 @MethodSource("factories") 613 void testInterruptJoinUntil1(ThreadFactory factory) throws Exception { 614 try (var scope = new StructuredTaskScope<String>(null, factory)) { 615 var latch = new CountDownLatch(1); 616 617 Subtask<String> subtask = scope.fork(() -> { 618 latch.await(); 619 return "foo"; 620 }); 621 622 // joinUntil should throw 623 Thread.currentThread().interrupt(); 624 try { 625 scope.joinUntil(Instant.now().plusSeconds(30)); 626 fail("joinUntil did not throw"); 627 } catch (InterruptedException expected) { 628 assertFalse(Thread.interrupted()); // interrupt status should be clear 629 } finally { 630 // let task continue 631 latch.countDown(); 632 } 633 634 // join should complete 635 scope.join(); 636 assertEquals("foo", subtask.get()); 637 } 638 } 639 640 /** 641 * Test interrupt of thread blocked in joinUntil. 642 */ 643 @ParameterizedTest 644 @MethodSource("factories") 645 void testInterruptJoinUntil2(ThreadFactory factory) throws Exception { 646 try (var scope = new StructuredTaskScope(null, factory)) { 647 var latch = new CountDownLatch(1); 648 649 Subtask<String> subtask = scope.fork(() -> { 650 latch.await(); 651 return "foo"; 652 }); 653 654 // joinUntil should throw 655 scheduleInterruptAt("java.util.concurrent.StructuredTaskScope.joinUntil"); 656 try { 657 scope.joinUntil(Instant.now().plusSeconds(30)); 658 fail("joinUntil did not throw"); 659 } catch (InterruptedException expected) { 660 assertFalse(Thread.interrupted()); // interrupt status should be clear 661 } finally { 662 // let task continue 663 latch.countDown(); 664 } 665 666 // join should complete 667 scope.join(); 668 assertEquals("foo", subtask.get()); 669 } 670 } 671 672 /** 673 * Test that shutdown interrupts unfinished subtasks. 674 */ 675 @ParameterizedTest 676 @MethodSource("factories") 677 void testShutdownInterruptsThreads1(ThreadFactory factory) throws Exception { 678 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 679 var interrupted = new AtomicBoolean(); 680 var latch = new CountDownLatch(1); 681 var subtask = scope.fork(() -> { 682 try { 683 Thread.sleep(Duration.ofDays(1)); 684 } catch (InterruptedException e) { 685 interrupted.set(true); 686 } finally { 687 latch.countDown(); 688 } 689 return null; 690 }); 691 692 scope.shutdown(); 693 694 // wait for task to complete 695 latch.await(); 696 assertTrue(interrupted.get()); 697 698 scope.join(); 699 700 // subtask result/exception not available 701 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 702 assertThrows(IllegalStateException.class, subtask::get); 703 assertThrows(IllegalStateException.class, subtask::exception); 704 } 705 } 706 707 /** 708 * Test that shutdown does not interrupt current thread. 709 */ 710 @ParameterizedTest 711 @MethodSource("factories") 712 void testShutdownInterruptsThreads2(ThreadFactory factory) throws Exception { 713 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 714 var interrupted = new AtomicBoolean(); 715 var latch = new CountDownLatch(1); 716 var subtask = scope.fork(() -> { 717 try { 718 scope.shutdown(); 719 interrupted.set(Thread.currentThread().isInterrupted()); 720 } finally { 721 latch.countDown(); 722 } 723 return null; 724 }); 725 726 // wait for task to complete 727 latch.await(); 728 assertFalse(interrupted.get()); 729 730 scope.join(); 731 } 732 } 733 734 /** 735 * Test shutdown wakes join. 736 */ 737 @ParameterizedTest 738 @MethodSource("factories") 739 void testShutdownWakesJoin(ThreadFactory factory) throws Exception { 740 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 741 var latch = new CountDownLatch(1); 742 scope.fork(() -> { 743 Thread.sleep(Duration.ofMillis(100)); // give time for join to block 744 scope.shutdown(); 745 latch.await(); 746 return null; 747 }); 748 749 scope.join(); 750 751 // join woke up, allow task to complete 752 latch.countDown(); 753 } 754 } 755 756 /** 757 * Test shutdown after scope is closed. 758 */ 759 @Test 760 void testShutdownAfterClose() throws Exception { 761 try (var scope = new StructuredTaskScope<Object>()) { 762 scope.join(); 763 scope.close(); 764 assertThrows(IllegalStateException.class, scope::shutdown); 765 } 766 } 767 768 /** 769 * Test shutdown is confined to threads in the scope "tree". 770 */ 771 @ParameterizedTest 772 @MethodSource("factories") 773 void testShutdownConfined(ThreadFactory factory) throws Exception { 774 try (var scope1 = new StructuredTaskScope<Boolean>(); 775 var scope2 = new StructuredTaskScope<Boolean>()) { 776 777 // thread in scope1 cannot shutdown scope2 778 Subtask<Boolean> subtask1 = scope1.fork(() -> { 779 assertThrows(WrongThreadException.class, scope2::shutdown); 780 return true; 781 }); 782 783 // wait for task in scope1 to complete to avoid racing with task in scope2 784 while (subtask1.state() == Subtask.State.UNAVAILABLE) { 785 Thread.sleep(10); 786 } 787 788 // thread in scope2 shutdown scope1 789 Subtask<Boolean> subtask2 = scope2.fork(() -> { 790 scope1.shutdown(); 791 return true; 792 }); 793 794 scope2.join(); 795 scope1.join(); 796 797 assertTrue(subtask1.get()); 798 assertTrue(subtask1.get()); 799 800 // random thread cannot shutdown 801 try (var pool = Executors.newSingleThreadExecutor()) { 802 Future<Void> future = pool.submit(() -> { 803 assertThrows(WrongThreadException.class, scope1::shutdown); 804 assertThrows(WrongThreadException.class, scope2::shutdown); 805 return null; 806 }); 807 future.get(); 808 } 809 } 810 } 811 812 /** 813 * Test isShutdown. 814 */ 815 @Test 816 void testIsShutdown() { 817 try (var scope = new StructuredTaskScope<Object>()) { 818 assertFalse(scope.isShutdown()); // before shutdown 819 scope.shutdown(); 820 assertTrue(scope.isShutdown()); // after shutdown 821 scope.close(); 822 assertTrue(scope.isShutdown()); // after cose 823 } 824 } 825 826 /** 827 * Test close without join, no subtasks forked. 828 */ 829 @Test 830 void testCloseWithoutJoin1() { 831 try (var scope = new StructuredTaskScope<Object>()) { 832 // do nothing 833 } 834 } 835 836 /** 837 * Test close without join, unfinished subtasks. 838 */ 839 @ParameterizedTest 840 @MethodSource("factories") 841 void testCloseWithoutJoin2(ThreadFactory factory) { 842 try (var scope = new StructuredTaskScope<String>(null, factory)) { 843 Subtask<String> subtask = scope.fork(() -> { 844 Thread.sleep(Duration.ofDays(1)); 845 return null; 846 }); 847 assertThrows(IllegalStateException.class, scope::close); 848 849 // subtask result/exception not available 850 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 851 assertThrows(IllegalStateException.class, subtask::get); 852 assertThrows(IllegalStateException.class, subtask::exception); 853 } 854 } 855 856 /** 857 * Test close without join, unfinished subtasks forked after join. 858 */ 859 @ParameterizedTest 860 @MethodSource("factories") 861 void testCloseWithoutJoin3(ThreadFactory factory) throws Exception { 862 try (var scope = new StructuredTaskScope(null, factory)) { 863 scope.fork(() -> "foo"); 864 scope.join(); 865 866 Subtask<String> subtask = scope.fork(() -> { 867 Thread.sleep(Duration.ofDays(1)); 868 return null; 869 }); 870 assertThrows(IllegalStateException.class, scope::close); 871 872 // subtask result/exception not available 873 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 874 assertThrows(IllegalStateException.class, subtask::get); 875 assertThrows(IllegalStateException.class, subtask::exception); 876 } 877 } 878 879 /** 880 * Test close after join throws. Close should not throw as join attempted. 881 */ 882 @ParameterizedTest 883 @MethodSource("factories") 884 void testCloseAfterJoinThrows(ThreadFactory factory) throws Exception { 885 try (var scope = new StructuredTaskScope<Object>()) { 886 var subtask = scope.fork(() -> { 887 Thread.sleep(Duration.ofDays(1)); 888 return null; 889 }); 890 891 // join throws 892 Thread.currentThread().interrupt(); 893 assertThrows(InterruptedException.class, scope::join); 894 assertThrows(IllegalStateException.class, subtask::get); 895 } 896 } 897 898 /** 899 * Test close after joinUntil throws. Close should not throw as join attempted. 900 */ 901 @ParameterizedTest 902 @MethodSource("factories") 903 void testCloseAfterJoinUntilThrows(ThreadFactory factory) throws Exception { 904 try (var scope = new StructuredTaskScope<Object>()) { 905 var subtask = scope.fork(() -> { 906 Thread.sleep(Duration.ofDays(1)); 907 return null; 908 }); 909 910 // joinUntil throws 911 assertThrows(TimeoutException.class, () -> scope.joinUntil(Instant.now())); 912 assertThrows(IllegalStateException.class, subtask::get); 913 } 914 } 915 916 /** 917 * Test close is owner confined. 918 */ 919 @ParameterizedTest 920 @MethodSource("factories") 921 void testCloseConfined(ThreadFactory factory) throws Exception { 922 try (var scope = new StructuredTaskScope<Boolean>()) { 923 924 // attempt to close from thread in scope 925 Subtask<Boolean> subtask = scope.fork(() -> { 926 assertThrows(WrongThreadException.class, scope::close); 927 return true; 928 }); 929 930 scope.join(); 931 assertTrue(subtask.get()); 932 933 // random thread cannot close scope 934 try (var pool = Executors.newCachedThreadPool(factory)) { 935 Future<Boolean> future = pool.submit(() -> { 936 assertThrows(WrongThreadException.class, scope::close); 937 return null; 938 }); 939 future.get(); 940 } 941 } 942 } 943 944 /** 945 * Test close with interrupt status set. 946 */ 947 @ParameterizedTest 948 @MethodSource("factories") 949 void testInterruptClose1(ThreadFactory factory) throws Exception { 950 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 951 var done = new AtomicBoolean(); 952 scope.fork(() -> { 953 try { 954 Thread.sleep(Duration.ofDays(1)); 955 } catch (InterruptedException e) { 956 // interrupted by shutdown, expected 957 } 958 Thread.sleep(Duration.ofMillis(100)); // force close to wait 959 done.set(true); 960 return null; 961 }); 962 963 scope.shutdown(); 964 scope.join(); 965 966 // invoke close with interrupt status set 967 Thread.currentThread().interrupt(); 968 try { 969 scope.close(); 970 } finally { 971 assertTrue(Thread.interrupted()); // clear interrupt status 972 assertTrue(done.get()); 973 } 974 } 975 } 976 977 /** 978 * Test interrupting thread waiting in close. 979 */ 980 @ParameterizedTest 981 @MethodSource("factories") 982 void testInterruptClose2(ThreadFactory factory) throws Exception { 983 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 984 var done = new AtomicBoolean(); 985 Thread mainThread = Thread.currentThread(); 986 scope.fork(() -> { 987 try { 988 Thread.sleep(Duration.ofDays(1)); 989 } catch (InterruptedException e) { 990 // interrupted by shutdown, expected 991 } 992 993 // interrupt main thread when it blocks in close 994 interruptThreadAt(mainThread, "java.util.concurrent.StructuredTaskScope.close"); 995 996 Thread.sleep(Duration.ofMillis(100)); // force close to wait 997 done.set(true); 998 return null; 999 }); 1000 1001 scope.shutdown(); // interrupts task 1002 scope.join(); 1003 try { 1004 scope.close(); 1005 } finally { 1006 assertTrue(Thread.interrupted()); // clear interrupt status 1007 assertTrue(done.get()); 1008 } 1009 } 1010 } 1011 1012 /** 1013 * Test that closing an enclosing scope closes the thread flock of a nested scope. 1014 */ 1015 @Test 1016 void testCloseThrowsStructureViolation() throws Exception { 1017 try (var scope1 = new StructuredTaskScope<Object>()) { 1018 try (var scope2 = new StructuredTaskScope<Object>()) { 1019 1020 // join + close enclosing scope 1021 scope1.join(); 1022 try { 1023 scope1.close(); 1024 fail("close did not throw"); 1025 } catch (StructureViolationException expected) { } 1026 1027 // underlying flock should be closed, fork should return a cancelled task 1028 var executed = new AtomicBoolean(); 1029 Subtask<Void> subtask = scope2.fork(() -> { 1030 executed.set(true); 1031 return null; 1032 }); 1033 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1034 scope2.join(); 1035 assertFalse(executed.get()); 1036 } 1037 } 1038 } 1039 1040 /** 1041 * A StructuredTaskScope that collects the subtasks notified to the handleComplete method. 1042 */ 1043 private static class CollectAll<T> extends StructuredTaskScope<T> { 1044 private final Set<Subtask<? extends T>> subtasks = ConcurrentHashMap.newKeySet(); 1045 1046 CollectAll(ThreadFactory factory) { 1047 super(null, factory); 1048 } 1049 1050 @Override 1051 protected void handleComplete(Subtask<? extends T> subtask) { 1052 subtasks.add(subtask); 1053 } 1054 1055 Set<Subtask<? extends T>> subtasks() { 1056 return subtasks; 1057 } 1058 1059 Subtask<? extends T> find(Callable<T> task) { 1060 return subtasks.stream() 1061 .filter(h -> task.equals(h.task())) 1062 .findAny() 1063 .orElseThrow(); 1064 } 1065 } 1066 1067 /** 1068 * Test that handleComplete method is invoked for tasks that complete before shutdown. 1069 */ 1070 @ParameterizedTest 1071 @MethodSource("factories") 1072 void testHandleCompleteBeforeShutdown(ThreadFactory factory) throws Exception { 1073 try (var scope = new CollectAll<String>(factory)) { 1074 Callable<String> task1 = () -> "foo"; 1075 Callable<String> task2 = () -> { throw new FooException(); }; 1076 scope.fork(task1); 1077 scope.fork(task2); 1078 scope.join(); 1079 1080 var subtask1 = scope.find(task1); 1081 assertEquals("foo", subtask1.get()); 1082 1083 var subtask2 = scope.find(task2); 1084 assertTrue(subtask2.exception() instanceof FooException); 1085 } 1086 } 1087 1088 /** 1089 * Test that handleComplete method is not invoked for tasks that finish after shutdown 1090 * or are forked after shutdown. 1091 */ 1092 @ParameterizedTest 1093 @MethodSource("factories") 1094 void testHandleCompleteAfterShutdown(ThreadFactory factory) throws Exception { 1095 try (var scope = new CollectAll<String>(factory)) { 1096 Callable<String> task1 = () -> { 1097 try { 1098 Thread.sleep(Duration.ofDays(1)); 1099 } catch (InterruptedException ignore) { } 1100 return "foo"; 1101 }; 1102 Callable<String> task2 = () -> { 1103 Thread.sleep(Duration.ofDays(1)); 1104 return "bar"; 1105 }; 1106 Callable<String> task3 = () -> "baz"; 1107 1108 // forked before shutdown, will complete after shutdown 1109 var subtask1 = scope.fork(task1); 1110 var subtask2 = scope.fork(task2); 1111 1112 scope.shutdown(); 1113 1114 // forked after shutdown 1115 var subtask3 = scope.fork(task3); 1116 1117 scope.join(); 1118 1119 // handleComplete should not be called 1120 for (int i = 0; i < 3; i++) { 1121 assertEquals(0, scope.subtasks().size()); 1122 Thread.sleep(20); 1123 } 1124 1125 assertEquals(Subtask.State.UNAVAILABLE, subtask1.state()); 1126 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 1127 assertEquals(Subtask.State.UNAVAILABLE, subtask3.state()); 1128 } 1129 } 1130 1131 /** 1132 * Test that the default handleComplete throws IllegalArgumentException if called 1133 * with a running task. 1134 */ 1135 @Test 1136 void testHandleCompleteThrows() throws Exception { 1137 class TestScope<T> extends StructuredTaskScope<T> { 1138 protected void handleComplete(Subtask<? extends T> subtask) { 1139 super.handleComplete(subtask); 1140 } 1141 } 1142 1143 try (var scope = new TestScope<String>()) { 1144 var subtask = scope.fork(() -> { 1145 Thread.sleep(Duration.ofDays(1)); 1146 return "foo"; 1147 }); 1148 1149 // running task 1150 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1151 assertThrows(IllegalArgumentException.class, () -> scope.handleComplete(subtask)); 1152 scope.shutdown(); 1153 1154 // null task 1155 assertThrows(NullPointerException.class, () -> scope.handleComplete(null)); 1156 1157 scope.join(); 1158 } 1159 } 1160 1161 /** 1162 * Test ensureOwnerAndJoined. 1163 */ 1164 @ParameterizedTest 1165 @MethodSource("factories") 1166 void testEnsureOwnerAndJoined(ThreadFactory factory) throws Exception { 1167 class MyScope<T> extends StructuredTaskScope<T> { 1168 MyScope(ThreadFactory factory) { 1169 super(null, factory); 1170 } 1171 void invokeEnsureOwnerAndJoined() { 1172 super.ensureOwnerAndJoined(); 1173 } 1174 } 1175 1176 try (var scope = new MyScope<Boolean>(factory)) { 1177 // owner thread, before join 1178 scope.fork(() -> true); 1179 assertThrows(IllegalStateException.class, () -> { 1180 scope.invokeEnsureOwnerAndJoined(); 1181 }); 1182 1183 // owner thread, after join 1184 scope.join(); 1185 scope.invokeEnsureOwnerAndJoined(); 1186 1187 // thread in scope cannot invoke ensureOwnerAndJoined 1188 Subtask<Boolean> subtask = scope.fork(() -> { 1189 assertThrows(WrongThreadException.class, () -> { 1190 scope.invokeEnsureOwnerAndJoined(); 1191 }); 1192 return true; 1193 }); 1194 scope.join(); 1195 assertTrue(subtask.get()); 1196 1197 // random thread cannot invoke ensureOwnerAndJoined 1198 try (var pool = Executors.newSingleThreadExecutor()) { 1199 Future<Void> future = pool.submit(() -> { 1200 assertThrows(WrongThreadException.class, () -> { 1201 scope.invokeEnsureOwnerAndJoined(); 1202 }); 1203 return null; 1204 }); 1205 future.get(); 1206 } 1207 } 1208 } 1209 1210 /** 1211 * Test ensureOwnerAndJoined after the task scope has been closed. 1212 */ 1213 @ParameterizedTest 1214 @MethodSource("factories") 1215 void testEnsureOwnerAndJoinedAfterClose(ThreadFactory factory) throws Exception { 1216 class MyScope<T> extends StructuredTaskScope<T> { 1217 MyScope(ThreadFactory factory) { 1218 super(null, factory); 1219 } 1220 public void invokeEnsureOwnerAndJoined() { 1221 super.ensureOwnerAndJoined(); 1222 } 1223 } 1224 1225 // ensureOwnerAndJoined after close, join invoked 1226 try (var scope = new MyScope<String>(factory)) { 1227 scope.fork(() -> "foo"); 1228 scope.join(); 1229 scope.close(); 1230 scope.invokeEnsureOwnerAndJoined(); // should not throw 1231 } 1232 1233 // ensureOwnerAndJoined after close, join not invoked 1234 try (var scope = new MyScope<String>(factory)) { 1235 scope.fork(() -> "foo"); 1236 assertThrows(IllegalStateException.class, scope::close); 1237 scope.invokeEnsureOwnerAndJoined(); // should not throw 1238 } 1239 } 1240 1241 1242 /** 1243 * Test toString. 1244 */ 1245 @Test 1246 void testToString() throws Exception { 1247 ThreadFactory factory = Thread.ofVirtual().factory(); 1248 try (var scope = new StructuredTaskScope<Object>("duke", factory)) { 1249 // open 1250 assertTrue(scope.toString().contains("duke")); 1251 1252 // shutdown 1253 scope.shutdown(); 1254 assertTrue(scope.toString().contains("duke")); 1255 1256 // closed 1257 scope.join(); 1258 scope.close(); 1259 assertTrue(scope.toString().contains("duke")); 1260 } 1261 } 1262 1263 /** 1264 * Test Subtask with task that completes successfully. 1265 */ 1266 @ParameterizedTest 1267 @MethodSource("factories") 1268 void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception { 1269 try (var scope = new StructuredTaskScope<String>(null, factory)) { 1270 Callable<String> task = () -> "foo"; 1271 Subtask<String> subtask = scope.fork(task); 1272 assertEquals(task, subtask.task()); 1273 scope.join(); 1274 1275 // after join 1276 assertEquals(task, subtask.task()); 1277 assertEquals(Subtask.State.SUCCESS, subtask.state()); 1278 assertEquals("foo", subtask.get()); 1279 assertThrows(IllegalStateException.class, subtask::exception); 1280 } 1281 } 1282 1283 /** 1284 * Test Subtask with task that fails. 1285 */ 1286 @ParameterizedTest 1287 @MethodSource("factories") 1288 void testSubtaskWhenFailed(ThreadFactory factory) throws Exception { 1289 try (var scope = new StructuredTaskScope<String>(null, factory)) { 1290 Callable<String> task = () -> { throw new FooException(); }; 1291 Subtask<String> subtask = scope.fork(task); 1292 assertEquals(task, subtask.task()); 1293 scope.join(); 1294 1295 // after join 1296 assertEquals(task, subtask.task()); 1297 assertEquals(Subtask.State.FAILED, subtask.state()); 1298 assertThrows(IllegalStateException.class, subtask::get); 1299 assertTrue(subtask.exception() instanceof FooException); 1300 } 1301 } 1302 1303 /** 1304 * Test Subtask with a task that has not completed. 1305 */ 1306 @ParameterizedTest 1307 @MethodSource("factories") 1308 void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception { 1309 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 1310 Callable<Void> task = () -> { 1311 Thread.sleep(Duration.ofDays(1)); 1312 return null; 1313 }; 1314 Subtask<Void> subtask = scope.fork(task); 1315 1316 // before join 1317 assertEquals(task, subtask.task()); 1318 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1319 assertThrows(IllegalStateException.class, subtask::get); 1320 assertThrows(IllegalStateException.class, subtask::exception); 1321 1322 // attempt join, join throws 1323 Thread.currentThread().interrupt(); 1324 assertThrows(InterruptedException.class, scope::join); 1325 1326 // after join 1327 assertEquals(task, subtask.task()); 1328 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1329 assertThrows(IllegalStateException.class, subtask::get); 1330 assertThrows(IllegalStateException.class, subtask::exception); 1331 } 1332 } 1333 1334 /** 1335 * Test Subtask when forked after shutdown. 1336 */ 1337 @ParameterizedTest 1338 @MethodSource("factories") 1339 void testSubtaskWhenShutdown(ThreadFactory factory) throws Exception { 1340 try (var scope = new StructuredTaskScope<Object>(null, factory)) { 1341 Callable<Void> task = () -> { 1342 Thread.sleep(Duration.ofDays(1)); 1343 return null; 1344 }; 1345 1346 scope.shutdown(); 1347 1348 // fork after shutdown 1349 Subtask<Void> subtask = scope.fork(task); 1350 scope.join(); 1351 assertEquals(task, subtask.task()); 1352 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1353 assertThrows(IllegalStateException.class, subtask::get); 1354 assertThrows(IllegalStateException.class, subtask::exception); 1355 } 1356 } 1357 1358 /** 1359 * Test Subtask::toString. 1360 */ 1361 @Test 1362 void testSubtaskToString() throws Exception { 1363 try (var scope = new StructuredTaskScope<Object>()) { 1364 // success 1365 var subtask1 = scope.fork(() -> "foo"); 1366 scope.join(); 1367 assertTrue(subtask1.toString().contains("Completed successfully")); 1368 1369 // failed 1370 var subtask2 = scope.fork(() -> { throw new FooException(); }); 1371 scope.join(); 1372 assertTrue(subtask2.toString().contains("Failed")); 1373 1374 // not completed 1375 Callable<Void> sleepForDay = () -> { 1376 Thread.sleep(Duration.ofDays(1)); 1377 return null; 1378 }; 1379 var subtask3 = scope.fork(sleepForDay); 1380 assertTrue(subtask3.toString().contains("Unavailable")); 1381 1382 scope.shutdown(); 1383 1384 // forked after shutdown 1385 var subtask4 = scope.fork(sleepForDay); 1386 assertTrue(subtask4.toString().contains("Unavailable")); 1387 1388 scope.join(); 1389 } 1390 } 1391 1392 /** 1393 * Test ShutdownOnSuccess with no completed tasks. 1394 */ 1395 @Test 1396 void testShutdownOnSuccess1() throws Exception { 1397 try (var scope = new ShutdownOnSuccess<Object>()) { 1398 assertThrows(IllegalStateException.class, () -> scope.result()); 1399 assertThrows(IllegalStateException.class, () -> scope.result(e -> null)); 1400 } 1401 } 1402 1403 /** 1404 * Test ShutdownOnSuccess with tasks that complete successfully. 1405 */ 1406 @ParameterizedTest 1407 @MethodSource("factories") 1408 void testShutdownOnSuccess2(ThreadFactory factory) throws Exception { 1409 try (var scope = new ShutdownOnSuccess<String>(null, factory)) { 1410 scope.fork(() -> "foo"); 1411 scope.join(); // ensures foo completes first 1412 scope.fork(() -> "bar"); 1413 scope.join(); 1414 assertEquals("foo", scope.result()); 1415 assertEquals("foo", scope.result(e -> null)); 1416 } 1417 } 1418 1419 /** 1420 * Test ShutdownOnSuccess with a task that completes successfully with a null result. 1421 */ 1422 @ParameterizedTest 1423 @MethodSource("factories") 1424 void testShutdownOnSuccess3(ThreadFactory factory) throws Exception { 1425 try (var scope = new ShutdownOnSuccess<Object>(null, factory)) { 1426 scope.fork(() -> null); 1427 scope.join(); 1428 assertNull(scope.result()); 1429 assertNull(scope.result(e -> null)); 1430 } 1431 } 1432 1433 /** 1434 * Test ShutdownOnSuccess with tasks that complete succcessfully and tasks that fail. 1435 */ 1436 @ParameterizedTest 1437 @MethodSource("factories") 1438 void testShutdownOnSuccess4(ThreadFactory factory) throws Exception { 1439 try (var scope = new ShutdownOnSuccess<String>(null, factory)) { 1440 scope.fork(() -> "foo"); 1441 scope.fork(() -> { throw new ArithmeticException(); }); 1442 scope.join(); 1443 assertEquals("foo", scope.result()); 1444 assertEquals("foo", scope.result(e -> null)); 1445 } 1446 } 1447 1448 /** 1449 * Test ShutdownOnSuccess with a task that fails. 1450 */ 1451 @ParameterizedTest 1452 @MethodSource("factories") 1453 void testShutdownOnSuccess5(ThreadFactory factory) throws Exception { 1454 try (var scope = new ShutdownOnSuccess<Object>(null, factory)) { 1455 scope.fork(() -> { throw new ArithmeticException(); }); 1456 scope.join(); 1457 Throwable ex = assertThrows(ExecutionException.class, () -> scope.result()); 1458 assertTrue(ex.getCause() instanceof ArithmeticException); 1459 ex = assertThrows(FooException.class, () -> scope.result(e -> new FooException(e))); 1460 assertTrue(ex.getCause() instanceof ArithmeticException); 1461 } 1462 } 1463 1464 /** 1465 * Test ShutdownOnSuccess methods are confined to the owner. 1466 */ 1467 @ParameterizedTest 1468 @MethodSource("factories") 1469 void testShutdownOnSuccessConfined(ThreadFactory factory) throws Exception { 1470 // owner before join 1471 try (var scope = new ShutdownOnSuccess<Boolean>(null, factory)) { 1472 scope.fork(() -> { throw new FooException(); }); 1473 assertThrows(IllegalStateException.class, scope::result); 1474 assertThrows(IllegalStateException.class, () -> { 1475 scope.result(e -> new RuntimeException(e)); 1476 }); 1477 scope.join(); 1478 } 1479 1480 // non-owner 1481 try (var scope = new ShutdownOnSuccess<Boolean>(null, factory)) { 1482 Subtask<Boolean> subtask = scope.fork(() -> { 1483 assertThrows(WrongThreadException.class, scope::result); 1484 assertThrows(WrongThreadException.class, () -> { 1485 scope.result(e -> new RuntimeException(e)); 1486 }); 1487 return true; 1488 }); 1489 scope.join(); 1490 assertTrue(subtask.get()); 1491 } 1492 } 1493 1494 /** 1495 * Test ShutdownOnFailure with no completed tasks. 1496 */ 1497 @Test 1498 void testShutdownOnFailure1() throws Throwable { 1499 try (var scope = new ShutdownOnFailure()) { 1500 assertTrue(scope.exception().isEmpty()); 1501 scope.throwIfFailed(); 1502 scope.throwIfFailed(e -> new FooException(e)); 1503 } 1504 } 1505 1506 /** 1507 * Test ShutdownOnFailure with tasks that complete successfully. 1508 */ 1509 @ParameterizedTest 1510 @MethodSource("factories") 1511 void testShutdownOnFailure2(ThreadFactory factory) throws Throwable { 1512 try (var scope = new ShutdownOnFailure(null, factory)) { 1513 scope.fork(() -> "foo"); 1514 scope.fork(() -> "bar"); 1515 scope.join(); 1516 1517 // no exception 1518 assertTrue(scope.exception().isEmpty()); 1519 scope.throwIfFailed(); 1520 scope.throwIfFailed(e -> new FooException(e)); 1521 } 1522 } 1523 1524 /** 1525 * Test ShutdownOnFailure with tasks that complete succcessfully and tasks that fail. 1526 */ 1527 @ParameterizedTest 1528 @MethodSource("factories") 1529 void testShutdownOnFailure3(ThreadFactory factory) throws Throwable { 1530 try (var scope = new ShutdownOnFailure(null, factory)) { 1531 1532 // one task completes successfully, the other fails 1533 scope.fork(() -> "foo"); 1534 scope.fork(() -> { throw new ArithmeticException(); }); 1535 scope.join(); 1536 1537 Throwable ex = scope.exception().orElse(null); 1538 assertTrue(ex instanceof ArithmeticException); 1539 1540 ex = assertThrows(ExecutionException.class, () -> scope.throwIfFailed()); 1541 assertTrue(ex.getCause() instanceof ArithmeticException); 1542 1543 ex = assertThrows(FooException.class, 1544 () -> scope.throwIfFailed(e -> new FooException(e))); 1545 assertTrue(ex.getCause() instanceof ArithmeticException); 1546 } 1547 } 1548 1549 /** 1550 * Test ShutdownOnFailure methods are confined to the owner. 1551 */ 1552 @ParameterizedTest 1553 @MethodSource("factories") 1554 void testShutdownOnFailureConfined(ThreadFactory factory) throws Exception { 1555 // owner before join 1556 try (var scope = new ShutdownOnFailure(null, factory)) { 1557 scope.fork(() -> "foo"); 1558 assertThrows(IllegalStateException.class, scope::exception); 1559 assertThrows(IllegalStateException.class, scope::throwIfFailed); 1560 assertThrows(IllegalStateException.class, () -> { 1561 scope.throwIfFailed(e -> new RuntimeException(e)); 1562 }); 1563 scope.join(); 1564 } 1565 1566 // non-owner 1567 try (var scope = new ShutdownOnFailure(null, factory)) { 1568 Subtask<Boolean> subtask = scope.fork(() -> { 1569 assertThrows(WrongThreadException.class, scope::exception); 1570 assertThrows(WrongThreadException.class, scope::throwIfFailed); 1571 assertThrows(WrongThreadException.class, () -> { 1572 scope.throwIfFailed(e -> new RuntimeException(e)); 1573 }); 1574 return true; 1575 }); 1576 scope.join(); 1577 assertTrue(subtask.get()); 1578 } 1579 } 1580 1581 /** 1582 * Test for NullPointerException. 1583 */ 1584 @Test 1585 void testNulls() throws Exception { 1586 assertThrows(NullPointerException.class, () -> new StructuredTaskScope("", null)); 1587 try (var scope = new StructuredTaskScope<Object>()) { 1588 assertThrows(NullPointerException.class, () -> scope.fork(null)); 1589 assertThrows(NullPointerException.class, () -> scope.joinUntil(null)); 1590 } 1591 1592 assertThrows(NullPointerException.class, () -> new ShutdownOnSuccess<Object>("", null)); 1593 try (var scope = new ShutdownOnSuccess<Object>()) { 1594 assertThrows(NullPointerException.class, () -> scope.fork(null)); 1595 assertThrows(NullPointerException.class, () -> scope.joinUntil(null)); 1596 assertThrows(NullPointerException.class, () -> scope.result(null)); 1597 } 1598 1599 assertThrows(NullPointerException.class, () -> new ShutdownOnFailure("", null)); 1600 try (var scope = new ShutdownOnFailure()) { 1601 assertThrows(NullPointerException.class, () -> scope.fork(null)); 1602 assertThrows(NullPointerException.class, () -> scope.joinUntil(null)); 1603 assertThrows(NullPointerException.class, () -> scope.throwIfFailed(null)); 1604 } 1605 } 1606 1607 /** 1608 * A runtime exception for tests. 1609 */ 1610 private static class FooException extends RuntimeException { 1611 FooException() { } 1612 FooException(Throwable cause) { super(cause); } 1613 } 1614 1615 /** 1616 * Returns the current time in milliseconds. 1617 */ 1618 private long millisTime() { 1619 long now = System.nanoTime(); 1620 return TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS); 1621 } 1622 1623 /** 1624 * Check the duration of a task 1625 * @param start start time, in milliseconds 1626 * @param min minimum expected duration, in milliseconds 1627 * @param max maximum expected duration, in milliseconds 1628 * @return the duration (now - start), in milliseconds 1629 */ 1630 private long expectDuration(long start, long min, long max) { 1631 long duration = millisTime() - start; 1632 assertTrue(duration >= min, 1633 "Duration " + duration + "ms, expected >= " + min + "ms"); 1634 assertTrue(duration <= max, 1635 "Duration " + duration + "ms, expected <= " + max + "ms"); 1636 return duration; 1637 } 1638 1639 /** 1640 * Interrupts a thread when it waits (timed or untimed) at location "{@code c.m}". 1641 * {@code c} is the fully qualified class name and {@code m} is the method name. 1642 */ 1643 private void interruptThreadAt(Thread target, String location) throws InterruptedException { 1644 int index = location.lastIndexOf('.'); 1645 String className = location.substring(0, index); 1646 String methodName = location.substring(index + 1); 1647 1648 boolean found = false; 1649 while (!found) { 1650 Thread.State state = target.getState(); 1651 assertTrue(state != TERMINATED); 1652 if ((state == WAITING || state == TIMED_WAITING) 1653 && contains(target.getStackTrace(), className, methodName)) { 1654 found = true; 1655 } else { 1656 Thread.sleep(20); 1657 } 1658 } 1659 target.interrupt(); 1660 } 1661 1662 /** 1663 * Schedules the current thread to be interrupted when it waits (timed or untimed) 1664 * at the given location. 1665 */ 1666 private void scheduleInterruptAt(String location) { 1667 Thread target = Thread.currentThread(); 1668 scheduler.submit(() -> { 1669 interruptThreadAt(target, location); 1670 return null; 1671 }); 1672 } 1673 1674 /** 1675 * Returns true if the given stack trace contains an element for the given class 1676 * and method name. 1677 */ 1678 private boolean contains(StackTraceElement[] stack, String className, String methodName) { 1679 return Arrays.stream(stack) 1680 .anyMatch(e -> className.equals(e.getClassName()) 1681 && methodName.equals(e.getMethodName())); 1682 } 1683 }