1 /* 2 * Copyright (c) 2021, 2025, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test id=platform 26 * @bug 8284199 8296779 8306647 27 * @summary Basic tests for StructuredTaskScope 28 * @enablePreview 29 * @run junit/othervm -DthreadFactory=platform StructuredTaskScopeTest 30 */ 31 32 /* 33 * @test id=virtual 34 * @enablePreview 35 * @run junit/othervm -DthreadFactory=virtual StructuredTaskScopeTest 36 */ 37 38 import java.time.Duration; 39 import java.util.Arrays; 40 import java.util.ArrayList; 41 import java.util.List; 42 import java.util.Set; 43 import java.util.NoSuchElementException; 44 import java.util.concurrent.Callable; 45 import java.util.concurrent.ConcurrentHashMap; 46 import java.util.concurrent.CountDownLatch; 47 import java.util.concurrent.Executors; 48 import java.util.concurrent.Future; 49 import java.util.concurrent.LinkedTransferQueue; 50 import java.util.concurrent.ThreadFactory; 51 import java.util.concurrent.TimeUnit; 52 import java.util.concurrent.RejectedExecutionException; 53 import java.util.concurrent.ScheduledExecutorService; 54 import java.util.concurrent.StructuredTaskScope; 55 import java.util.concurrent.StructuredTaskScope.TimeoutException; 56 import java.util.concurrent.StructuredTaskScope.Configuration; 57 import java.util.concurrent.StructuredTaskScope.FailedException; 58 import java.util.concurrent.StructuredTaskScope.Joiner; 59 import java.util.concurrent.StructuredTaskScope.Subtask; 60 import java.util.concurrent.StructureViolationException; 61 import java.util.concurrent.atomic.AtomicBoolean; 62 import java.util.concurrent.atomic.AtomicInteger; 63 import java.util.concurrent.atomic.AtomicReference; 64 import java.util.function.Predicate; 65 import java.util.function.UnaryOperator; 66 import java.util.stream.Stream; 67 import static java.lang.Thread.State.*; 68 69 import org.junit.jupiter.api.Test; 70 import org.junit.jupiter.api.BeforeAll; 71 import org.junit.jupiter.api.AfterAll; 72 import org.junit.jupiter.params.ParameterizedTest; 73 import org.junit.jupiter.params.provider.MethodSource; 74 import static org.junit.jupiter.api.Assertions.*; 75 76 class StructuredTaskScopeTest { 77 private static ScheduledExecutorService scheduler; 78 private static List<ThreadFactory> threadFactories; 79 80 @BeforeAll 81 static void setup() throws Exception { 82 scheduler = Executors.newSingleThreadScheduledExecutor(); 83 84 // thread factories 85 String value = System.getProperty("threadFactory"); 86 List<ThreadFactory> list = new ArrayList<>(); 87 if (value == null || value.equals("platform")) 88 list.add(Thread.ofPlatform().factory()); 89 if (value == null || value.equals("virtual")) 90 list.add(Thread.ofVirtual().factory()); 91 assertTrue(list.size() > 0, "No thread factories for tests"); 92 threadFactories = list; 93 } 94 95 @AfterAll 96 static void shutdown() { 97 scheduler.shutdown(); 98 } 99 100 private static Stream<ThreadFactory> factories() { 101 return threadFactories.stream(); 102 } 103 104 /** 105 * Test that fork creates virtual threads when no ThreadFactory is configured. 106 */ 107 @Test 108 void testForkCreatesVirtualThread() throws Exception { 109 Set<Thread> threads = ConcurrentHashMap.newKeySet(); 110 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 111 for (int i = 0; i < 50; i++) { 112 // runnable 113 scope.fork(() -> { 114 threads.add(Thread.currentThread()); 115 }); 116 117 // callable 118 scope.fork(() -> { 119 threads.add(Thread.currentThread()); 120 return null; 121 }); 122 } 123 scope.join(); 124 } 125 assertEquals(100, threads.size()); 126 threads.forEach(t -> assertTrue(t.isVirtual())); 127 } 128 129 /** 130 * Test that fork create threads with the configured ThreadFactory. 131 */ 132 @ParameterizedTest 133 @MethodSource("factories") 134 void testForkUsesThreadFactory(ThreadFactory factory) throws Exception { 135 // TheadFactory that keeps reference to all threads it creates 136 class RecordingThreadFactory implements ThreadFactory { 137 final ThreadFactory delegate; 138 final Set<Thread> threads = ConcurrentHashMap.newKeySet(); 139 RecordingThreadFactory(ThreadFactory delegate) { 140 this.delegate = delegate; 141 } 142 @Override 143 public Thread newThread(Runnable task) { 144 Thread thread = delegate.newThread(task); 145 threads.add(thread); 146 return thread; 147 } 148 Set<Thread> threads() { 149 return threads; 150 } 151 } 152 var recordingThreadFactory = new RecordingThreadFactory(factory); 153 Set<Thread> threads = ConcurrentHashMap.newKeySet(); 154 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 155 cf -> cf.withThreadFactory(recordingThreadFactory))) { 156 157 for (int i = 0; i < 50; i++) { 158 // runnable 159 scope.fork(() -> { 160 threads.add(Thread.currentThread()); 161 }); 162 163 // callable 164 scope.fork(() -> { 165 threads.add(Thread.currentThread()); 166 return null; 167 }); 168 } 169 scope.join(); 170 } 171 assertEquals(100, threads.size()); 172 assertEquals(recordingThreadFactory.threads(), threads); 173 } 174 175 /** 176 * Test fork method is owner confined. 177 */ 178 @ParameterizedTest 179 @MethodSource("factories") 180 void testForkConfined(ThreadFactory factory) throws Exception { 181 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(), 182 cf -> cf.withThreadFactory(factory))) { 183 184 // random thread cannot fork 185 try (var pool = Executors.newSingleThreadExecutor()) { 186 Future<Void> future = pool.submit(() -> { 187 assertThrows(WrongThreadException.class, () -> { 188 scope.fork(() -> null); 189 }); 190 return null; 191 }); 192 future.get(); 193 } 194 195 // subtask cannot fork 196 Subtask<Boolean> subtask = scope.fork(() -> { 197 assertThrows(WrongThreadException.class, () -> { 198 scope.fork(() -> null); 199 }); 200 return true; 201 }); 202 scope.join(); 203 assertTrue(subtask.get()); 204 } 205 } 206 207 /** 208 * Test fork after join, no subtasks forked before join. 209 */ 210 @ParameterizedTest 211 @MethodSource("factories") 212 void testForkAfterJoinCompleted1(ThreadFactory factory) throws Exception { 213 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 214 cf -> cf.withThreadFactory(factory))) { 215 scope.join(); 216 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar")); 217 } 218 } 219 220 /** 221 * Test fork after join, subtasks forked before join. 222 */ 223 @ParameterizedTest 224 @MethodSource("factories") 225 void testForkAfterJoinCompleted2(ThreadFactory factory) throws Exception { 226 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 227 cf -> cf.withThreadFactory(factory))) { 228 scope.fork(() -> "foo"); 229 scope.join(); 230 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar")); 231 } 232 } 233 234 /** 235 * Test fork after join interrupted. 236 */ 237 @ParameterizedTest 238 @MethodSource("factories") 239 void testForkAfterJoinInterrupted(ThreadFactory factory) throws Exception { 240 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 241 cf -> cf.withThreadFactory(factory))) { 242 var subtask1 = scope.fork(() -> { 243 Thread.sleep(Duration.ofDays(1)); 244 return "foo"; 245 }); 246 247 // join throws 248 Thread.currentThread().interrupt(); 249 assertThrows(InterruptedException.class, scope::join); 250 251 // fork should throw 252 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar")); 253 } 254 } 255 256 /** 257 * Test fork after join timeout. 258 */ 259 @ParameterizedTest 260 @MethodSource("factories") 261 void testForkAfterJoinTimeout(ThreadFactory factory) throws Exception { 262 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 263 cf -> cf.withThreadFactory(factory) 264 .withTimeout(Duration.ofMillis(100)))) { 265 awaitCancelled(scope); 266 267 // join throws 268 assertThrows(TimeoutException.class, scope::join); 269 270 // fork should throw 271 assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar")); 272 } 273 } 274 275 /** 276 * Test fork after task scope is cancelled. This test uses a custom Joiner to 277 * cancel execution. 278 */ 279 @ParameterizedTest 280 @MethodSource("factories") 281 void testForkAfterCancel2(ThreadFactory factory) throws Exception { 282 var countingThreadFactory = new CountingThreadFactory(factory); 283 var testJoiner = new CancelAfterOneJoiner<String>(); 284 285 try (var scope = StructuredTaskScope.open(testJoiner, 286 cf -> cf.withThreadFactory(countingThreadFactory))) { 287 288 // fork subtask, the scope should be cancelled when the subtask completes 289 var subtask1 = scope.fork(() -> "foo"); 290 awaitCancelled(scope); 291 292 assertEquals(1, countingThreadFactory.threadCount()); 293 assertEquals(1, testJoiner.onForkCount()); 294 assertEquals(1, testJoiner.onCompleteCount()); 295 296 // fork second subtask, it should not run 297 var subtask2 = scope.fork(() -> "bar"); 298 299 // onFork should be invoked, newThread and onComplete should not be invoked 300 assertEquals(1, countingThreadFactory.threadCount()); 301 assertEquals(2, testJoiner.onForkCount()); 302 assertEquals(1, testJoiner.onCompleteCount()); 303 304 scope.join(); 305 306 assertEquals(1, countingThreadFactory.threadCount()); 307 assertEquals(2, testJoiner.onForkCount()); 308 assertEquals(1, testJoiner.onCompleteCount()); 309 assertEquals("foo", subtask1.get()); 310 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 311 } 312 } 313 314 /** 315 * Test fork after task scope is closed. 316 */ 317 @ParameterizedTest 318 @MethodSource("factories") 319 void testForkAfterClose(ThreadFactory factory) { 320 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 321 cf -> cf.withThreadFactory(factory))) { 322 scope.close(); 323 assertThrows(IllegalStateException.class, () -> scope.fork(() -> null)); 324 } 325 } 326 327 /** 328 * Test fork with a ThreadFactory that rejects creating a thread. 329 */ 330 @Test 331 void testForkRejectedExecutionException() { 332 ThreadFactory factory = task -> null; 333 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 334 cf -> cf.withThreadFactory(factory))) { 335 assertThrows(RejectedExecutionException.class, () -> scope.fork(() -> null)); 336 } 337 } 338 339 /** 340 * Test join with no subtasks. 341 */ 342 @Test 343 void testJoinWithNoSubtasks() throws Exception { 344 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 345 scope.join(); 346 } 347 } 348 349 /** 350 * Test join with a remaining subtask. 351 */ 352 @ParameterizedTest 353 @MethodSource("factories") 354 void testJoinWithRemainingSubtasks(ThreadFactory factory) throws Exception { 355 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 356 cf -> cf.withThreadFactory(factory))) { 357 Subtask<String> subtask = scope.fork(() -> { 358 Thread.sleep(Duration.ofMillis(100)); 359 return "foo"; 360 }); 361 scope.join(); 362 assertEquals("foo", subtask.get()); 363 } 364 } 365 366 /** 367 * Test join after join completed with a result. 368 */ 369 @Test 370 void testJoinAfterJoin1() throws Exception { 371 var results = new LinkedTransferQueue<>(List.of("foo", "bar", "baz")); 372 Joiner<Object, String> joiner = results::take; 373 try (var scope = StructuredTaskScope.open(joiner)) { 374 scope.fork(() -> "foo"); 375 assertEquals("foo", scope.join()); 376 377 // join already called 378 for (int i = 0 ; i < 3; i++) { 379 assertThrows(IllegalStateException.class, scope::join); 380 } 381 } 382 } 383 384 /** 385 * Test join after join completed with an exception. 386 */ 387 @Test 388 void testJoinAfterJoin2() throws Exception { 389 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) { 390 scope.fork(() -> { throw new FooException(); }); 391 Throwable ex = assertThrows(FailedException.class, scope::join); 392 assertTrue(ex.getCause() instanceof FooException); 393 394 // join already called 395 for (int i = 0 ; i < 3; i++) { 396 assertThrows(IllegalStateException.class, scope::join); 397 } 398 } 399 } 400 401 /** 402 * Test join after join completed with a timeout. 403 */ 404 @Test 405 void testJoinAfterJoinInterrupted() throws Exception { 406 try (var scope = StructuredTaskScope.open()) { 407 var latch = new CountDownLatch(1); 408 var subtask = scope.fork(() -> { 409 latch.await(); 410 return "foo"; 411 }); 412 413 // join throws InterruptedException 414 Thread.currentThread().interrupt(); 415 assertThrows(InterruptedException.class, scope::join); 416 417 latch.countDown(); 418 419 // retry join to get result 420 scope.join(); 421 assertEquals("foo", subtask.get()); 422 423 // retry after otbaining result 424 assertThrows(IllegalStateException.class, scope::join); 425 } 426 } 427 428 /** 429 * Test join after join completed with a timeout. 430 */ 431 @Test 432 void testJoinAfterJoinTimeout() throws Exception { 433 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(), 434 cf -> cf.withTimeout(Duration.ofMillis(100)))) { 435 // wait for scope to be cancelled by timeout 436 awaitCancelled(scope); 437 assertThrows(TimeoutException.class, scope::join); 438 439 // join already called 440 for (int i = 0 ; i < 3; i++) { 441 assertThrows(IllegalStateException.class, scope::join); 442 } 443 } 444 } 445 446 /** 447 * Test join invoked from Joiner.onTimeout. 448 */ 449 @Test 450 void testJoinInOnTimeout() throws Exception { 451 Thread owner = Thread.currentThread(); 452 var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>(); 453 454 var joiner = new Joiner<String, Void>() { 455 @Override 456 public void onTimeout() { 457 assertTrue(Thread.currentThread() == owner); 458 var scope = scopeRef.get(); 459 assertThrows(IllegalStateException.class, scope::join); 460 } 461 @Override 462 public Void result() { 463 return null; 464 } 465 }; 466 467 try (var scope = StructuredTaskScope.open(joiner, 468 cf -> cf.withTimeout(Duration.ofMillis(100)))) { 469 awaitCancelled(scope); 470 scopeRef.set(scope); 471 scope.join(); // invokes onTimeout 472 } 473 } 474 475 /** 476 * Test join method is owner confined. 477 */ 478 @ParameterizedTest 479 @MethodSource("factories") 480 void testJoinConfined(ThreadFactory factory) throws Exception { 481 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(), 482 cf -> cf.withThreadFactory(factory))) { 483 484 // random thread cannot join 485 try (var pool = Executors.newSingleThreadExecutor()) { 486 Future<Void> future = pool.submit(() -> { 487 assertThrows(WrongThreadException.class, scope::join); 488 return null; 489 }); 490 future.get(); 491 } 492 493 // subtask cannot join 494 Subtask<Boolean> subtask = scope.fork(() -> { 495 assertThrows(WrongThreadException.class, () -> { scope.join(); }); 496 return true; 497 }); 498 scope.join(); 499 assertTrue(subtask.get()); 500 } 501 } 502 503 /** 504 * Test join with interrupt status set. 505 */ 506 @ParameterizedTest 507 @MethodSource("factories") 508 void testInterruptJoin1(ThreadFactory factory) throws Exception { 509 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 510 cf -> cf.withThreadFactory(factory))) { 511 512 Subtask<String> subtask = scope.fork(() -> { 513 Thread.sleep(Duration.ofDays(1)); 514 return "foo"; 515 }); 516 517 // join should throw 518 Thread.currentThread().interrupt(); 519 try { 520 scope.join(); 521 fail("join did not throw"); 522 } catch (InterruptedException expected) { 523 assertFalse(Thread.interrupted()); // interrupt status should be cleared 524 } 525 } 526 } 527 528 /** 529 * Test interrupt of thread blocked in join. 530 */ 531 @ParameterizedTest 532 @MethodSource("factories") 533 void testInterruptJoin2(ThreadFactory factory) throws Exception { 534 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 535 cf -> cf.withThreadFactory(factory))) { 536 Subtask<String> subtask = scope.fork(() -> { 537 Thread.sleep(Duration.ofDays(1)); 538 return "foo"; 539 }); 540 541 // interrupt main thread when it blocks in join 542 scheduleInterruptAt("java.util.concurrent.StructuredTaskScopeImpl.join"); 543 try { 544 scope.join(); 545 fail("join did not throw"); 546 } catch (InterruptedException expected) { 547 assertFalse(Thread.interrupted()); // interrupt status should be clear 548 } 549 } 550 } 551 552 /** 553 * Test join when scope is cancelled. 554 */ 555 @ParameterizedTest 556 @MethodSource("factories") 557 void testJoinWhenCancelled(ThreadFactory factory) throws Exception { 558 var countingThreadFactory = new CountingThreadFactory(factory); 559 var testJoiner = new CancelAfterOneJoiner<String>(); 560 561 try (var scope = StructuredTaskScope.open(testJoiner, 562 cf -> cf.withThreadFactory(countingThreadFactory))) { 563 564 // fork subtask, the scope should be cancelled when the subtask completes 565 var subtask1 = scope.fork(() -> "foo"); 566 awaitCancelled(scope); 567 568 // fork second subtask, it should not run 569 var subtask2 = scope.fork(() -> { 570 Thread.sleep(Duration.ofDays(1)); 571 return "bar"; 572 }); 573 574 scope.join(); 575 576 assertEquals("foo", subtask1.get()); 577 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 578 } 579 } 580 581 /** 582 * Test join after scope is closed. 583 */ 584 @Test 585 void testJoinAfterClose() throws Exception { 586 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 587 scope.close(); 588 assertThrows(IllegalStateException.class, () -> scope.join()); 589 } 590 } 591 592 /** 593 * Test join with timeout, subtasks finish before timeout expires. 594 */ 595 @ParameterizedTest 596 @MethodSource("factories") 597 void testJoinWithTimeout1(ThreadFactory factory) throws Exception { 598 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 599 cf -> cf.withThreadFactory(factory) 600 .withTimeout(Duration.ofDays(1)))) { 601 602 Subtask<String> subtask = scope.fork(() -> { 603 Thread.sleep(Duration.ofSeconds(1)); 604 return "foo"; 605 }); 606 607 scope.join(); 608 609 assertFalse(scope.isCancelled()); 610 assertEquals("foo", subtask.get()); 611 } 612 } 613 614 /** 615 * Test join with timeout, timeout expires before subtasks finish. 616 */ 617 @ParameterizedTest 618 @MethodSource("factories") 619 void testJoinWithTimeout2(ThreadFactory factory) throws Exception { 620 long startMillis = millisTime(); 621 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 622 cf -> cf.withThreadFactory(factory) 623 .withTimeout(Duration.ofSeconds(2)))) { 624 625 Subtask<Void> subtask = scope.fork(() -> { 626 Thread.sleep(Duration.ofDays(1)); 627 return null; 628 }); 629 630 assertThrows(TimeoutException.class, scope::join); 631 expectDuration(startMillis, /*min*/1900, /*max*/20_000); 632 633 assertTrue(scope.isCancelled()); 634 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 635 } 636 } 637 638 /** 639 * Test join with timeout that has already expired. 640 */ 641 @ParameterizedTest 642 @MethodSource("factories") 643 void testJoinWithTimeout3(ThreadFactory factory) throws Exception { 644 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 645 cf -> cf.withThreadFactory(factory) 646 .withTimeout(Duration.ofSeconds(-1)))) { 647 648 Subtask<Void> subtask = scope.fork(() -> { 649 Thread.sleep(Duration.ofDays(1)); 650 return null; 651 }); 652 653 assertThrows(TimeoutException.class, scope::join); 654 655 assertTrue(scope.isCancelled()); 656 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 657 } 658 } 659 660 /** 661 * Test that cancelling execution interrupts unfinished threads. This test uses 662 * a custom Joiner to cancel execution. 663 */ 664 @ParameterizedTest 665 @MethodSource("factories") 666 void testCancelInterruptsThreads2(ThreadFactory factory) throws Exception { 667 var testJoiner = new CancelAfterOneJoiner<String>(); 668 669 try (var scope = StructuredTaskScope.open(testJoiner, 670 cf -> cf.withThreadFactory(factory))) { 671 672 // fork subtask1 that runs for a long time 673 var started = new CountDownLatch(1); 674 var interrupted = new CountDownLatch(1); 675 var subtask1 = scope.fork(() -> { 676 started.countDown(); 677 try { 678 Thread.sleep(Duration.ofDays(1)); 679 } catch (InterruptedException e) { 680 interrupted.countDown(); 681 } 682 }); 683 started.await(); 684 685 // fork subtask2, the scope should be cancelled when the subtask completes 686 var subtask2 = scope.fork(() -> "bar"); 687 awaitCancelled(scope); 688 689 // subtask1 should be interrupted 690 interrupted.await(); 691 692 scope.join(); 693 assertEquals(Subtask.State.UNAVAILABLE, subtask1.state()); 694 assertEquals("bar", subtask2.get()); 695 } 696 } 697 698 /** 699 * Test that timeout interrupts unfinished threads. 700 */ 701 @ParameterizedTest 702 @MethodSource("factories") 703 void testTimeoutInterruptsThreads(ThreadFactory factory) throws Exception { 704 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 705 cf -> cf.withThreadFactory(factory) 706 .withTimeout(Duration.ofSeconds(2)))) { 707 708 var started = new AtomicBoolean(); 709 var interrupted = new CountDownLatch(1); 710 Subtask<Void> subtask = scope.fork(() -> { 711 started.set(true); 712 try { 713 Thread.sleep(Duration.ofDays(1)); 714 } catch (InterruptedException e) { 715 interrupted.countDown(); 716 } 717 return null; 718 }); 719 720 // wait for scope to be cancelled by timeout 721 awaitCancelled(scope); 722 723 // if subtask started then it should be interrupted 724 if (started.get()) { 725 interrupted.await(); 726 } 727 728 assertThrows(TimeoutException.class, scope::join); 729 730 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 731 } 732 } 733 734 /** 735 * Test close without join, no subtasks forked. 736 */ 737 @Test 738 void testCloseWithoutJoin1() { 739 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 740 // do nothing 741 } 742 } 743 744 /** 745 * Test close without join, subtasks forked. 746 */ 747 @ParameterizedTest 748 @MethodSource("factories") 749 void testCloseWithoutJoin2(ThreadFactory factory) { 750 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 751 cf -> cf.withThreadFactory(factory))) { 752 Subtask<String> subtask = scope.fork(() -> { 753 Thread.sleep(Duration.ofDays(1)); 754 return null; 755 }); 756 757 // first call to close should throw 758 assertThrows(IllegalStateException.class, scope::close); 759 760 // subsequent calls to close should not throw 761 for (int i = 0; i < 3; i++) { 762 scope.close(); 763 } 764 765 // subtask result/exception not available 766 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 767 assertThrows(IllegalStateException.class, subtask::get); 768 assertThrows(IllegalStateException.class, subtask::exception); 769 } 770 } 771 772 /** 773 * Test close after join throws. Close should not throw as join attempted. 774 */ 775 @ParameterizedTest 776 @MethodSource("factories") 777 void testCloseAfterJoinThrows(ThreadFactory factory) throws Exception { 778 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 779 cf -> cf.withThreadFactory(factory))) { 780 var subtask = scope.fork(() -> { 781 Thread.sleep(Duration.ofDays(1)); 782 return null; 783 }); 784 785 // join throws 786 Thread.currentThread().interrupt(); 787 assertThrows(InterruptedException.class, scope::join); 788 assertThrows(IllegalStateException.class, subtask::get); 789 790 } // close should not throw 791 } 792 793 /** 794 * Test close method is owner confined. 795 */ 796 @ParameterizedTest 797 @MethodSource("factories") 798 void testCloseConfined(ThreadFactory factory) throws Exception { 799 try (var scope = StructuredTaskScope.open(Joiner.<Boolean>awaitAll(), 800 cf -> cf.withThreadFactory(factory))) { 801 802 // random thread cannot close scope 803 try (var pool = Executors.newCachedThreadPool(factory)) { 804 Future<Boolean> future = pool.submit(() -> { 805 assertThrows(WrongThreadException.class, scope::close); 806 return null; 807 }); 808 future.get(); 809 } 810 811 // subtask cannot close 812 Subtask<Boolean> subtask = scope.fork(() -> { 813 assertThrows(WrongThreadException.class, scope::close); 814 return true; 815 }); 816 scope.join(); 817 assertTrue(subtask.get()); 818 } 819 } 820 821 /** 822 * Test close with interrupt status set. 823 */ 824 @ParameterizedTest 825 @MethodSource("factories") 826 void testInterruptClose1(ThreadFactory factory) throws Exception { 827 var testJoiner = new CancelAfterOneJoiner<String>(); 828 try (var scope = StructuredTaskScope.open(testJoiner, 829 cf -> cf.withThreadFactory(factory))) { 830 831 // fork first subtask, a straggler as it continues after being interrupted 832 var started = new CountDownLatch(1); 833 var done = new AtomicBoolean(); 834 scope.fork(() -> { 835 started.countDown(); 836 try { 837 Thread.sleep(Duration.ofDays(1)); 838 } catch (InterruptedException e) { 839 // interrupted by cancel, expected 840 } 841 Thread.sleep(Duration.ofMillis(100)); // force close to wait 842 done.set(true); 843 return null; 844 }); 845 started.await(); 846 847 // fork second subtask, the scope should be cancelled when this subtask completes 848 scope.fork(() -> "bar"); 849 awaitCancelled(scope); 850 851 scope.join(); 852 853 // invoke close with interrupt status set 854 Thread.currentThread().interrupt(); 855 try { 856 scope.close(); 857 } finally { 858 assertTrue(Thread.interrupted()); // clear interrupt status 859 assertTrue(done.get()); 860 } 861 } 862 } 863 864 /** 865 * Test interrupting thread waiting in close. 866 */ 867 @ParameterizedTest 868 @MethodSource("factories") 869 void testInterruptClose2(ThreadFactory factory) throws Exception { 870 var testJoiner = new CancelAfterOneJoiner<String>(); 871 try (var scope = StructuredTaskScope.open(testJoiner, 872 cf -> cf.withThreadFactory(factory))) { 873 874 Thread mainThread = Thread.currentThread(); 875 876 // fork first subtask, a straggler as it continues after being interrupted 877 var started = new CountDownLatch(1); 878 var done = new AtomicBoolean(); 879 scope.fork(() -> { 880 started.countDown(); 881 try { 882 Thread.sleep(Duration.ofDays(1)); 883 } catch (InterruptedException e) { 884 // interrupted by cancel, expected 885 } 886 887 // interrupt main thread when it blocks in close 888 interruptThreadAt(mainThread, "java.util.concurrent.StructuredTaskScopeImpl.close"); 889 890 Thread.sleep(Duration.ofMillis(100)); // force close to wait 891 done.set(true); 892 return null; 893 }); 894 started.await(); 895 896 // fork second subtask, the scope should be cancelled when this subtask completes 897 scope.fork(() -> "bar"); 898 awaitCancelled(scope); 899 900 scope.join(); 901 902 // main thread will be interrupted while blocked in close 903 try { 904 scope.close(); 905 } finally { 906 assertTrue(Thread.interrupted()); // clear interrupt status 907 assertTrue(done.get()); 908 } 909 } 910 } 911 912 /** 913 * Test that closing an enclosing scope closes the thread flock of a nested scope. 914 */ 915 @Test 916 void testCloseThrowsStructureViolation() throws Exception { 917 try (var scope1 = StructuredTaskScope.open(Joiner.awaitAll())) { 918 try (var scope2 = StructuredTaskScope.open(Joiner.awaitAll())) { 919 920 // close enclosing scope 921 try { 922 scope1.close(); 923 fail("close did not throw"); 924 } catch (StructureViolationException expected) { } 925 926 // underlying flock should be closed 927 var executed = new AtomicBoolean(); 928 Subtask<?> subtask = scope2.fork(() -> executed.set(true)); 929 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 930 scope2.join(); 931 assertFalse(executed.get()); 932 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 933 } 934 } 935 } 936 937 /** 938 * Test that isCancelled returns true after close. 939 */ 940 @Test 941 void testIsCancelledAfterClose() throws Exception { 942 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 943 assertFalse(scope.isCancelled()); 944 scope.close(); 945 assertTrue(scope.isCancelled()); 946 } 947 } 948 949 /** 950 * Test Joiner.onFork throwing exception. 951 */ 952 @Test 953 void testOnForkThrows() throws Exception { 954 var joiner = new Joiner<String, Void>() { 955 @Override 956 public boolean onFork(Subtask<? extends String> subtask) { 957 throw new FooException(); 958 } 959 @Override 960 public Void result() { 961 return null; 962 } 963 }; 964 try (var scope = StructuredTaskScope.open(joiner)) { 965 assertThrows(FooException.class, () -> scope.fork(() -> "foo")); 966 } 967 } 968 969 /** 970 * Test Joiner.onFork returning true to cancel execution. 971 */ 972 @Test 973 void testOnForkCancelsExecution() throws Exception { 974 var joiner = new Joiner<String, Void>() { 975 @Override 976 public boolean onFork(Subtask<? extends String> subtask) { 977 return true; 978 } 979 @Override 980 public Void result() { 981 return null; 982 } 983 }; 984 try (var scope = StructuredTaskScope.open(joiner)) { 985 assertFalse(scope.isCancelled()); 986 scope.fork(() -> "foo"); 987 assertTrue(scope.isCancelled()); 988 scope.join(); 989 } 990 } 991 992 /** 993 * Test Joiner.onComplete throwing exception causes UHE to be invoked. 994 */ 995 @Test 996 void testOnCompleteThrows() throws Exception { 997 var joiner = new Joiner<String, Void>() { 998 @Override 999 public boolean onComplete(Subtask<? extends String> subtask) { 1000 throw new FooException(); 1001 } 1002 @Override 1003 public Void result() { 1004 return null; 1005 } 1006 }; 1007 var excRef = new AtomicReference<Throwable>(); 1008 Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e); 1009 ThreadFactory factory = Thread.ofVirtual() 1010 .uncaughtExceptionHandler(uhe) 1011 .factory(); 1012 try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) { 1013 scope.fork(() -> "foo"); 1014 scope.join(); 1015 assertInstanceOf(FooException.class, excRef.get()); 1016 } 1017 } 1018 1019 /** 1020 * Test Joiner.onComplete returning true to cancel execution. 1021 */ 1022 @Test 1023 void testOnCompleteCancelsExecution() throws Exception { 1024 var joiner = new Joiner<String, Void>() { 1025 @Override 1026 public boolean onComplete(Subtask<? extends String> subtask) { 1027 return true; 1028 } 1029 @Override 1030 public Void result() { 1031 return null; 1032 } 1033 }; 1034 try (var scope = StructuredTaskScope.open(joiner)) { 1035 assertFalse(scope.isCancelled()); 1036 scope.fork(() -> "foo"); 1037 awaitCancelled(scope); 1038 scope.join(); 1039 } 1040 } 1041 1042 /** 1043 * Test Joiner.onTimeout invoked by owner thread when timeout expires. 1044 */ 1045 @Test 1046 void testOnTimeoutInvoked() throws Exception { 1047 var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>(); 1048 Thread owner = Thread.currentThread(); 1049 var invokeCount = new AtomicInteger(); 1050 var joiner = new Joiner<String, Void>() { 1051 @Override 1052 public void onTimeout() { 1053 assertTrue(Thread.currentThread() == owner); 1054 assertTrue(scopeRef.get().isCancelled()); 1055 invokeCount.incrementAndGet(); 1056 } 1057 @Override 1058 public Void result() { 1059 return null; 1060 } 1061 }; 1062 try (var scope = StructuredTaskScope.open(joiner, 1063 cf -> cf.withTimeout(Duration.ofMillis(100)))) { 1064 scopeRef.set(scope); 1065 scope.fork(() -> { 1066 Thread.sleep(Duration.ofDays(1)); 1067 return null; 1068 }); 1069 scope.join(); 1070 assertEquals(1, invokeCount.get()); 1071 } 1072 } 1073 1074 /** 1075 * Test Joiner.onTimeout throwing an excepiton. 1076 */ 1077 @Test 1078 void testOnTimeoutThrows() throws Exception { 1079 var joiner = new Joiner<String, Void>() { 1080 @Override 1081 public void onTimeout() { 1082 throw new FooException(); 1083 } 1084 @Override 1085 public Void result() { 1086 return null; 1087 } 1088 }; 1089 try (var scope = StructuredTaskScope.open(joiner, 1090 cf -> cf.withTimeout(Duration.ofMillis(100)))) { 1091 // wait for scope to be cancelled by timeout 1092 awaitCancelled(scope); 1093 1094 // join should throw FooException on first usage 1095 assertThrows(FooException.class, scope::join); 1096 1097 // retry after onTimeout fails 1098 assertThrows(IllegalStateException.class, scope::join); 1099 } 1100 } 1101 1102 /** 1103 * Test toString. 1104 */ 1105 @Test 1106 void testToString() throws Exception { 1107 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 1108 cf -> cf.withName("duke"))) { 1109 1110 // open 1111 assertTrue(scope.toString().contains("duke")); 1112 1113 // closed 1114 scope.close(); 1115 assertTrue(scope.toString().contains("duke")); 1116 } 1117 } 1118 1119 /** 1120 * Test Subtask with task that completes successfully. 1121 */ 1122 @ParameterizedTest 1123 @MethodSource("factories") 1124 void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception { 1125 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 1126 cf -> cf.withThreadFactory(factory))) { 1127 Subtask<String> subtask = scope.fork(() -> "foo"); 1128 1129 // before join, owner thread 1130 assertThrows(IllegalStateException.class, subtask::get); 1131 assertThrows(IllegalStateException.class, subtask::exception); 1132 1133 // before join, another thread 1134 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get)); 1135 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception)); 1136 1137 scope.join(); 1138 1139 assertEquals(Subtask.State.SUCCESS, subtask.state()); 1140 1141 // after join, owner thread 1142 assertEquals("foo", subtask.get()); 1143 assertThrows(IllegalStateException.class, subtask::exception); 1144 1145 // after join, another thread 1146 assertEquals("foo", callInOtherThread(subtask::get)); 1147 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception)); 1148 } 1149 } 1150 1151 /** 1152 * Test Subtask with task that fails. 1153 */ 1154 @ParameterizedTest 1155 @MethodSource("factories") 1156 void testSubtaskWhenFailed(ThreadFactory factory) throws Exception { 1157 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 1158 cf -> cf.withThreadFactory(factory))) { 1159 1160 Subtask<String> subtask = scope.fork(() -> { throw new FooException(); }); 1161 1162 // before join, owner thread 1163 assertThrows(IllegalStateException.class, subtask::get); 1164 assertThrows(IllegalStateException.class, subtask::exception); 1165 1166 // before join, another thread 1167 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get)); 1168 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception)); 1169 1170 scope.join(); 1171 1172 assertEquals(Subtask.State.FAILED, subtask.state()); 1173 1174 // after join, owner thread 1175 assertThrows(IllegalStateException.class, subtask::get); 1176 assertTrue(subtask.exception() instanceof FooException); 1177 1178 // after join, another thread 1179 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get)); 1180 assertTrue(callInOtherThread(subtask::exception) instanceof FooException); 1181 } 1182 } 1183 1184 /** 1185 * Test Subtask with a task that has not completed. 1186 */ 1187 @ParameterizedTest 1188 @MethodSource("factories") 1189 void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception { 1190 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), 1191 cf -> cf.withThreadFactory(factory))) { 1192 Subtask<Void> subtask = scope.fork(() -> { 1193 Thread.sleep(Duration.ofDays(1)); 1194 return null; 1195 }); 1196 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1197 1198 // before join, owner thread 1199 assertThrows(IllegalStateException.class, subtask::get); 1200 assertThrows(IllegalStateException.class, subtask::exception); 1201 1202 // before join, another thread 1203 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get)); 1204 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception)); 1205 1206 // attempt join, join throws 1207 Thread.currentThread().interrupt(); 1208 assertThrows(InterruptedException.class, scope::join); 1209 1210 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1211 1212 // after join, owner thread 1213 assertThrows(IllegalStateException.class, subtask::get); 1214 assertThrows(IllegalStateException.class, subtask::exception); 1215 1216 // before join, another thread 1217 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get)); 1218 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception)); 1219 } 1220 } 1221 1222 /** 1223 * Test Subtask forked after execution cancelled. 1224 */ 1225 @ParameterizedTest 1226 @MethodSource("factories") 1227 void testSubtaskWhenCancelled(ThreadFactory factory) throws Exception { 1228 try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) { 1229 scope.fork(() -> "foo"); 1230 awaitCancelled(scope); 1231 1232 var subtask = scope.fork(() -> "foo"); 1233 1234 // before join, owner thread 1235 assertThrows(IllegalStateException.class, subtask::get); 1236 assertThrows(IllegalStateException.class, subtask::exception); 1237 1238 // before join, another thread 1239 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get)); 1240 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception)); 1241 1242 scope.join(); 1243 1244 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1245 1246 // after join, owner thread 1247 assertThrows(IllegalStateException.class, subtask::get); 1248 assertThrows(IllegalStateException.class, subtask::exception); 1249 1250 // before join, another thread 1251 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get)); 1252 assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception)); 1253 } 1254 } 1255 1256 /** 1257 * Test Subtask::toString. 1258 */ 1259 @Test 1260 void testSubtaskToString() throws Exception { 1261 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 1262 var latch = new CountDownLatch(1); 1263 var subtask1 = scope.fork(() -> { 1264 latch.await(); 1265 return "foo"; 1266 }); 1267 var subtask2 = scope.fork(() -> { throw new FooException(); }); 1268 1269 // subtask1 result is unavailable 1270 assertTrue(subtask1.toString().contains("Unavailable")); 1271 latch.countDown(); 1272 1273 scope.join(); 1274 1275 assertTrue(subtask1.toString().contains("Completed successfully")); 1276 assertTrue(subtask2.toString().contains("Failed")); 1277 } 1278 } 1279 1280 /** 1281 * Test Joiner.allSuccessfulOrThrow() with no subtasks. 1282 */ 1283 @Test 1284 void testAllSuccessfulOrThrow1() throws Throwable { 1285 try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow())) { 1286 var subtasks = scope.join().toList(); 1287 assertTrue(subtasks.isEmpty()); 1288 } 1289 } 1290 1291 /** 1292 * Test Joiner.allSuccessfulOrThrow() with subtasks that complete successfully. 1293 */ 1294 @ParameterizedTest 1295 @MethodSource("factories") 1296 void testAllSuccessfulOrThrow2(ThreadFactory factory) throws Throwable { 1297 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(), 1298 cf -> cf.withThreadFactory(factory))) { 1299 var subtask1 = scope.fork(() -> "foo"); 1300 var subtask2 = scope.fork(() -> "bar"); 1301 var subtasks = scope.join().toList(); 1302 assertEquals(List.of(subtask1, subtask2), subtasks); 1303 assertEquals("foo", subtask1.get()); 1304 assertEquals("bar", subtask2.get()); 1305 } 1306 } 1307 1308 /** 1309 * Test Joiner.allSuccessfulOrThrow() with a subtask that complete successfully and 1310 * a subtask that fails. 1311 */ 1312 @ParameterizedTest 1313 @MethodSource("factories") 1314 void testAllSuccessfulOrThrow3(ThreadFactory factory) throws Throwable { 1315 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(), 1316 cf -> cf.withThreadFactory(factory))) { 1317 scope.fork(() -> "foo"); 1318 scope.fork(() -> { throw new FooException(); }); 1319 try { 1320 scope.join(); 1321 } catch (FailedException e) { 1322 assertTrue(e.getCause() instanceof FooException); 1323 } 1324 } 1325 } 1326 1327 /** 1328 * Test Joiner.allSuccessfulOrThrow() with a timeout. 1329 */ 1330 @Test 1331 void testAllSuccessfulOrThrow4() throws Exception { 1332 try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(), 1333 cf -> cf.withTimeout(Duration.ofMillis(100)))) { 1334 scope.fork(() -> "foo"); 1335 scope.fork(() -> { 1336 Thread.sleep(Duration.ofDays(1)); 1337 return "bar"; 1338 }); 1339 assertThrows(TimeoutException.class, scope::join); 1340 1341 // retry after join throws TimeoutException 1342 assertThrows(IllegalStateException.class, scope::join); 1343 } 1344 } 1345 1346 /** 1347 * Test Joiner.anySuccessfulResultOrThrow() with no subtasks. 1348 */ 1349 @Test 1350 void testAnySuccessfulResultOrThrow1() throws Exception { 1351 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) { 1352 try { 1353 scope.join(); 1354 } catch (FailedException e) { 1355 assertTrue(e.getCause() instanceof NoSuchElementException); 1356 } 1357 } 1358 } 1359 1360 /** 1361 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully. 1362 */ 1363 @ParameterizedTest 1364 @MethodSource("factories") 1365 void testAnySuccessfulResultOrThrow2(ThreadFactory factory) throws Exception { 1366 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(), 1367 cf -> cf.withThreadFactory(factory))) { 1368 scope.fork(() -> "foo"); 1369 String result = scope.join(); 1370 assertEquals("foo", result); 1371 } 1372 } 1373 1374 /** 1375 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully 1376 * with a null result. 1377 */ 1378 @ParameterizedTest 1379 @MethodSource("factories") 1380 void testAnySuccessfulResultOrThrow3(ThreadFactory factory) throws Exception { 1381 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(), 1382 cf -> cf.withThreadFactory(factory))) { 1383 scope.fork(() -> null); 1384 String result = scope.join(); 1385 assertNull(result); 1386 } 1387 } 1388 1389 /** 1390 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that complete succcessfully 1391 * and a subtask that fails. 1392 */ 1393 @ParameterizedTest 1394 @MethodSource("factories") 1395 void testAnySuccessfulResultOrThrow4(ThreadFactory factory) throws Exception { 1396 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(), 1397 cf -> cf.withThreadFactory(factory))) { 1398 scope.fork(() -> "foo"); 1399 scope.fork(() -> { throw new FooException(); }); 1400 String first = scope.join(); 1401 assertEquals("foo", first); 1402 } 1403 } 1404 1405 /** 1406 * Test Joiner.anySuccessfulResultOrThrow() with a subtask that fails. 1407 */ 1408 @ParameterizedTest 1409 @MethodSource("factories") 1410 void testAnySuccessfulResultOrThrow5(ThreadFactory factory) throws Exception { 1411 try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(), 1412 cf -> cf.withThreadFactory(factory))) { 1413 scope.fork(() -> { throw new FooException(); }); 1414 Throwable ex = assertThrows(FailedException.class, scope::join); 1415 assertTrue(ex.getCause() instanceof FooException); 1416 } 1417 } 1418 1419 /** 1420 * Test Joiner.allSuccessfulOrThrow() with a timeout. 1421 */ 1422 @Test 1423 void anySuccessfulResultOrThrow6() throws Exception { 1424 try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(), 1425 cf -> cf.withTimeout(Duration.ofMillis(100)))) { 1426 scope.fork(() -> { throw new FooException(); }); 1427 scope.fork(() -> { 1428 Thread.sleep(Duration.ofDays(1)); 1429 return "bar"; 1430 }); 1431 assertThrows(TimeoutException.class, scope::join); 1432 1433 // retry after join throws TimeoutException 1434 assertThrows(IllegalStateException.class, scope::join); 1435 } 1436 } 1437 1438 /** 1439 * Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks. 1440 */ 1441 @Test 1442 void testAwaitSuccessfulOrThrow1() throws Throwable { 1443 try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) { 1444 var result = scope.join(); 1445 assertNull(result); 1446 } 1447 } 1448 1449 /** 1450 * Test Joiner.awaitAllSuccessfulOrThrow() with subtasks that complete successfully. 1451 */ 1452 @ParameterizedTest 1453 @MethodSource("factories") 1454 void testAwaitSuccessfulOrThrow2(ThreadFactory factory) throws Throwable { 1455 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(), 1456 cf -> cf.withThreadFactory(factory))) { 1457 var subtask1 = scope.fork(() -> "foo"); 1458 var subtask2 = scope.fork(() -> "bar"); 1459 var result = scope.join(); 1460 assertNull(result); 1461 assertEquals("foo", subtask1.get()); 1462 assertEquals("bar", subtask2.get()); 1463 } 1464 } 1465 1466 /** 1467 * Test Joiner.awaitAllSuccessfulOrThrow() with a subtask that complete successfully and 1468 * a subtask that fails. 1469 */ 1470 @ParameterizedTest 1471 @MethodSource("factories") 1472 void testAwaitSuccessfulOrThrow3(ThreadFactory factory) throws Throwable { 1473 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(), 1474 cf -> cf.withThreadFactory(factory))) { 1475 scope.fork(() -> "foo"); 1476 scope.fork(() -> { throw new FooException(); }); 1477 try { 1478 scope.join(); 1479 } catch (FailedException e) { 1480 assertTrue(e.getCause() instanceof FooException); 1481 } 1482 } 1483 } 1484 1485 /** 1486 * Test Joiner.awaitAllSuccessfulOrThrow() with a timeout. 1487 */ 1488 @Test 1489 void testAwaitSuccessfulOrThrow4() throws Exception { 1490 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(), 1491 cf -> cf.withTimeout(Duration.ofMillis(100)))) { 1492 scope.fork(() -> "foo"); 1493 scope.fork(() -> { 1494 Thread.sleep(Duration.ofDays(1)); 1495 return "bar"; 1496 }); 1497 assertThrows(TimeoutException.class, scope::join); 1498 1499 // retry after join throws TimeoutException 1500 assertThrows(IllegalStateException.class, scope::join); 1501 } 1502 } 1503 1504 /** 1505 * Test Joiner.awaitAll() with no subtasks. 1506 */ 1507 @Test 1508 void testAwaitAll1() throws Throwable { 1509 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 1510 var result = scope.join(); 1511 assertNull(result); 1512 } 1513 } 1514 1515 /** 1516 * Test Joiner.awaitAll() with subtasks that complete successfully. 1517 */ 1518 @ParameterizedTest 1519 @MethodSource("factories") 1520 void testAwaitAll2(ThreadFactory factory) throws Throwable { 1521 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 1522 cf -> cf.withThreadFactory(factory))) { 1523 var subtask1 = scope.fork(() -> "foo"); 1524 var subtask2 = scope.fork(() -> "bar"); 1525 var result = scope.join(); 1526 assertNull(result); 1527 assertEquals("foo", subtask1.get()); 1528 assertEquals("bar", subtask2.get()); 1529 } 1530 } 1531 1532 /** 1533 * Test Joiner.awaitAll() with a subtask that complete successfully and a subtask 1534 * that fails. 1535 */ 1536 @ParameterizedTest 1537 @MethodSource("factories") 1538 void testAwaitAll3(ThreadFactory factory) throws Throwable { 1539 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 1540 cf -> cf.withThreadFactory(factory))) { 1541 var subtask1 = scope.fork(() -> "foo"); 1542 var subtask2 = scope.fork(() -> { throw new FooException(); }); 1543 var result = scope.join(); 1544 assertNull(result); 1545 assertEquals("foo", subtask1.get()); 1546 assertTrue(subtask2.exception() instanceof FooException); 1547 } 1548 } 1549 1550 /** 1551 * Test Joiner.awaitAll() with a timeout. 1552 */ 1553 @Test 1554 void testAwaitAll4() throws Exception { 1555 try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(), 1556 cf -> cf.withTimeout(Duration.ofMillis(100)))) { 1557 scope.fork(() -> "foo"); 1558 scope.fork(() -> { 1559 Thread.sleep(Duration.ofDays(1)); 1560 return "bar"; 1561 }); 1562 assertThrows(TimeoutException.class, scope::join); 1563 1564 // retry after join throws TimeoutException 1565 assertThrows(IllegalStateException.class, scope::join); 1566 } 1567 } 1568 1569 /** 1570 * Test Joiner.allUntil(Predicate) with no subtasks. 1571 */ 1572 @Test 1573 void testAllUntil1() throws Throwable { 1574 try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) { 1575 var subtasks = scope.join(); 1576 assertEquals(0, subtasks.count()); 1577 } 1578 } 1579 1580 /** 1581 * Test Joiner.allUntil(Predicate) with no cancellation. 1582 */ 1583 @ParameterizedTest 1584 @MethodSource("factories") 1585 void testAllUntil2(ThreadFactory factory) throws Exception { 1586 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false), 1587 cf -> cf.withThreadFactory(factory))) { 1588 1589 var subtask1 = scope.fork(() -> "foo"); 1590 var subtask2 = scope.fork(() -> { throw new FooException(); }); 1591 1592 var subtasks = scope.join().toList(); 1593 assertEquals(List.of(subtask1, subtask2), subtasks); 1594 1595 assertEquals("foo", subtask1.get()); 1596 assertTrue(subtask2.exception() instanceof FooException); 1597 } 1598 } 1599 1600 /** 1601 * Test Joiner.allUntil(Predicate) with cancellation after one subtask completes. 1602 */ 1603 @ParameterizedTest 1604 @MethodSource("factories") 1605 void testAllUntil3(ThreadFactory factory) throws Exception { 1606 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> true), 1607 cf -> cf.withThreadFactory(factory))) { 1608 1609 var subtask1 = scope.fork(() -> "foo"); 1610 var subtask2 = scope.fork(() -> { 1611 Thread.sleep(Duration.ofDays(1)); 1612 return "bar"; 1613 }); 1614 1615 var subtasks = scope.join().toList(); 1616 assertEquals(List.of(subtask1, subtask2), subtasks); 1617 1618 assertEquals("foo", subtask1.get()); 1619 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 1620 } 1621 } 1622 1623 /** 1624 * Test Joiner.allUntil(Predicate) with cancellation after serveral subtasks complete. 1625 */ 1626 @ParameterizedTest 1627 @MethodSource("factories") 1628 void testAllUntil4(ThreadFactory factory) throws Exception { 1629 1630 // cancel execution after two or more failures 1631 class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> { 1632 final AtomicInteger failedCount = new AtomicInteger(); 1633 @Override 1634 public boolean test(Subtask<? extends T> subtask) { 1635 return subtask.state() == Subtask.State.FAILED 1636 && failedCount.incrementAndGet() >= 2; 1637 } 1638 } 1639 var joiner = Joiner.allUntil(new CancelAfterTwoFailures<String>()); 1640 1641 try (var scope = StructuredTaskScope.open(joiner)) { 1642 int forkCount = 0; 1643 1644 // fork subtasks until execution cancelled 1645 while (!scope.isCancelled()) { 1646 scope.fork(() -> "foo"); 1647 scope.fork(() -> { throw new FooException(); }); 1648 forkCount += 2; 1649 Thread.sleep(Duration.ofMillis(20)); 1650 } 1651 1652 var subtasks = scope.join().toList(); 1653 assertEquals(forkCount, subtasks.size()); 1654 1655 long failedCount = subtasks.stream() 1656 .filter(s -> s.state() == Subtask.State.FAILED) 1657 .count(); 1658 assertTrue(failedCount >= 2); 1659 } 1660 } 1661 1662 /** 1663 * Test Test Joiner.allUntil(Predicate) where the Predicate's test method throws. 1664 */ 1665 @Test 1666 void testAllUntil5() throws Exception { 1667 var joiner = Joiner.allUntil(_ -> { throw new FooException(); }); 1668 var excRef = new AtomicReference<Throwable>(); 1669 Thread.UncaughtExceptionHandler uhe = (t, e) -> excRef.set(e); 1670 ThreadFactory factory = Thread.ofVirtual() 1671 .uncaughtExceptionHandler(uhe) 1672 .factory(); 1673 try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) { 1674 scope.fork(() -> "foo"); 1675 scope.join(); 1676 assertInstanceOf(FooException.class, excRef.get()); 1677 } 1678 } 1679 1680 /** 1681 * Test Joiner.allUntil(Predicate) with a timeout. 1682 */ 1683 @Test 1684 void testAllUntil6() throws Exception { 1685 try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false), 1686 cf -> cf.withTimeout(Duration.ofMillis(100)))) { 1687 var subtask1 = scope.fork(() -> "foo"); 1688 var subtask2 = scope.fork(() -> { 1689 Thread.sleep(Duration.ofDays(1)); 1690 return "bar"; 1691 }); 1692 1693 // TimeoutException should not be thrown 1694 var subtasks = scope.join().toList(); 1695 1696 // stream should have two elements, subtask1 may or may not have completed 1697 assertEquals(List.of(subtask1, subtask2), subtasks); 1698 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 1699 1700 // retry after join throws TimeoutException 1701 assertThrows(IllegalStateException.class, scope::join); 1702 } 1703 } 1704 1705 /** 1706 * Test Joiner default methods. 1707 */ 1708 @Test 1709 void testJoinerDefaultMethods() throws Exception { 1710 try (var scope = StructuredTaskScope.open(new CancelAfterOneJoiner<String>())) { 1711 1712 // need subtasks to test default methods 1713 var subtask1 = scope.fork(() -> "foo"); 1714 awaitCancelled(scope); 1715 var subtask2 = scope.fork(() -> "bar"); 1716 scope.join(); 1717 1718 assertEquals(Subtask.State.SUCCESS, subtask1.state()); 1719 assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); 1720 1721 // Joiner that does not override default methods 1722 Joiner<Object, Void> joiner = () -> null; 1723 assertThrows(NullPointerException.class, () -> joiner.onFork(null)); 1724 assertThrows(NullPointerException.class, () -> joiner.onComplete(null)); 1725 assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1)); 1726 assertFalse(joiner.onFork(subtask2)); 1727 assertFalse(joiner.onComplete(subtask1)); 1728 assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2)); 1729 assertThrows(TimeoutException.class, joiner::onTimeout); 1730 } 1731 } 1732 1733 /** 1734 * Test Joiners onFork/onComplete methods with a subtask in an unexpected state. 1735 */ 1736 @Test 1737 void testJoinersWithUnavailableResult() throws Exception { 1738 try (var scope = StructuredTaskScope.open()) { 1739 var done = new CountDownLatch(1); 1740 var subtask = scope.fork(() -> { 1741 done.await(); 1742 return null; 1743 }); 1744 1745 // onComplete with uncompleted task should throw IAE 1746 assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); 1747 assertThrows(IllegalArgumentException.class, 1748 () -> Joiner.allSuccessfulOrThrow().onComplete(subtask)); 1749 assertThrows(IllegalArgumentException.class, 1750 () -> Joiner.anySuccessfulResultOrThrow().onComplete(subtask)); 1751 assertThrows(IllegalArgumentException.class, 1752 () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(subtask)); 1753 assertThrows(IllegalArgumentException.class, 1754 () -> Joiner.awaitAll().onComplete(subtask)); 1755 assertThrows(IllegalArgumentException.class, 1756 () -> Joiner.allUntil(_ -> false).onComplete(subtask)); 1757 1758 done.countDown(); 1759 scope.join(); 1760 1761 // onFork with completed task should throw IAE 1762 assertEquals(Subtask.State.SUCCESS, subtask.state()); 1763 assertThrows(IllegalArgumentException.class, 1764 () -> Joiner.allSuccessfulOrThrow().onFork(subtask)); 1765 assertThrows(IllegalArgumentException.class, 1766 () -> Joiner.anySuccessfulResultOrThrow().onFork(subtask)); 1767 assertThrows(IllegalArgumentException.class, 1768 () -> Joiner.awaitAllSuccessfulOrThrow().onFork(subtask)); 1769 assertThrows(IllegalArgumentException.class, 1770 () -> Joiner.awaitAll().onFork(subtask)); 1771 assertThrows(IllegalArgumentException.class, 1772 () -> Joiner.allUntil(_ -> false).onFork(subtask)); 1773 } 1774 1775 } 1776 1777 /** 1778 * Test the Configuration function apply method throwing an exception. 1779 */ 1780 @Test 1781 void testConfigFunctionThrows() throws Exception { 1782 assertThrows(FooException.class, 1783 () -> StructuredTaskScope.open(Joiner.awaitAll(), 1784 cf -> { throw new FooException(); })); 1785 } 1786 1787 /** 1788 * Test Configuration equals/hashCode/toString 1789 */ 1790 @Test 1791 void testConfigMethods() throws Exception { 1792 UnaryOperator<Configuration> configOperator = cf -> { 1793 var name = "duke"; 1794 var threadFactory = Thread.ofPlatform().factory(); 1795 var timeout = Duration.ofSeconds(10); 1796 1797 assertEquals(cf, cf); 1798 assertEquals(cf.withName(name), cf.withName(name)); 1799 assertEquals(cf.withThreadFactory(threadFactory), cf.withThreadFactory(threadFactory)); 1800 assertEquals(cf.withTimeout(timeout), cf.withTimeout(timeout)); 1801 1802 assertNotEquals(cf, cf.withName(name)); 1803 assertNotEquals(cf, cf.withThreadFactory(threadFactory)); 1804 assertNotEquals(cf, cf.withTimeout(timeout)); 1805 1806 assertEquals(cf.withName(name).hashCode(), cf.withName(name).hashCode()); 1807 assertEquals(cf.withThreadFactory(threadFactory).hashCode(), 1808 cf.withThreadFactory(threadFactory).hashCode()); 1809 assertEquals(cf.withTimeout(timeout).hashCode(), cf.withTimeout(timeout).hashCode()); 1810 1811 assertTrue(cf.withName(name).toString().contains(name)); 1812 assertTrue(cf.withThreadFactory(threadFactory).toString().contains(threadFactory.toString())); 1813 assertTrue(cf.withTimeout(timeout).toString().contains(timeout.toString())); 1814 1815 return cf; 1816 }; 1817 try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), configOperator)) { 1818 // do nothing 1819 } 1820 } 1821 1822 /** 1823 * Test for NullPointerException. 1824 */ 1825 @Test 1826 void testNulls() throws Exception { 1827 assertThrows(NullPointerException.class, 1828 () -> StructuredTaskScope.open(null)); 1829 assertThrows(NullPointerException.class, 1830 () -> StructuredTaskScope.open(null, cf -> cf)); 1831 assertThrows(NullPointerException.class, 1832 () -> StructuredTaskScope.open(Joiner.awaitAll(), null)); 1833 1834 assertThrows(NullPointerException.class, () -> Joiner.allUntil(null)); 1835 1836 // fork 1837 try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 1838 assertThrows(NullPointerException.class, () -> scope.fork((Callable<Object>) null)); 1839 assertThrows(NullPointerException.class, () -> scope.fork((Runnable) null)); 1840 } 1841 1842 // Configuration and withXXX methods 1843 assertThrows(NullPointerException.class, 1844 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> null)); 1845 assertThrows(NullPointerException.class, 1846 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withName(null))); 1847 assertThrows(NullPointerException.class, 1848 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withThreadFactory(null))); 1849 assertThrows(NullPointerException.class, 1850 () -> StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withTimeout(null))); 1851 1852 // Joiner.onFork/onComplete 1853 assertThrows(NullPointerException.class, 1854 () -> Joiner.awaitAllSuccessfulOrThrow().onFork(null)); 1855 assertThrows(NullPointerException.class, 1856 () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(null)); 1857 assertThrows(NullPointerException.class, 1858 () -> Joiner.awaitAll().onFork(null)); 1859 assertThrows(NullPointerException.class, 1860 () -> Joiner.awaitAll().onComplete(null)); 1861 assertThrows(NullPointerException.class, 1862 () -> Joiner.allSuccessfulOrThrow().onFork(null)); 1863 assertThrows(NullPointerException.class, 1864 () -> Joiner.allSuccessfulOrThrow().onComplete(null)); 1865 assertThrows(NullPointerException.class, 1866 () -> Joiner.anySuccessfulResultOrThrow().onFork(null)); 1867 assertThrows(NullPointerException.class, 1868 () -> Joiner.anySuccessfulResultOrThrow().onComplete(null)); 1869 } 1870 1871 /** 1872 * ThreadFactory that counts usage. 1873 */ 1874 private static class CountingThreadFactory implements ThreadFactory { 1875 final ThreadFactory delegate; 1876 final AtomicInteger threadCount = new AtomicInteger(); 1877 CountingThreadFactory(ThreadFactory delegate) { 1878 this.delegate = delegate; 1879 } 1880 @Override 1881 public Thread newThread(Runnable task) { 1882 threadCount.incrementAndGet(); 1883 return delegate.newThread(task); 1884 } 1885 int threadCount() { 1886 return threadCount.get(); 1887 } 1888 } 1889 1890 /** 1891 * A joiner that counts that counts the number of subtasks that are forked and the 1892 * number of subtasks that complete. 1893 */ 1894 private static class CountingJoiner<T> implements Joiner<T, Void> { 1895 final AtomicInteger onForkCount = new AtomicInteger(); 1896 final AtomicInteger onCompleteCount = new AtomicInteger(); 1897 @Override 1898 public boolean onFork(Subtask<? extends T> subtask) { 1899 onForkCount.incrementAndGet(); 1900 return false; 1901 } 1902 @Override 1903 public boolean onComplete(Subtask<? extends T> subtask) { 1904 onCompleteCount.incrementAndGet(); 1905 return false; 1906 } 1907 @Override 1908 public Void result() { 1909 return null; 1910 } 1911 int onForkCount() { 1912 return onForkCount.get(); 1913 } 1914 int onCompleteCount() { 1915 return onCompleteCount.get(); 1916 } 1917 } 1918 1919 /** 1920 * A joiner that cancels execution when a subtask completes. It also keeps a count 1921 * of the number of subtasks that are forked and the number of subtasks that complete. 1922 */ 1923 private static class CancelAfterOneJoiner<T> implements Joiner<T, Void> { 1924 final AtomicInteger onForkCount = new AtomicInteger(); 1925 final AtomicInteger onCompleteCount = new AtomicInteger(); 1926 @Override 1927 public boolean onFork(Subtask<? extends T> subtask) { 1928 onForkCount.incrementAndGet(); 1929 return false; 1930 } 1931 @Override 1932 public boolean onComplete(Subtask<? extends T> subtask) { 1933 onCompleteCount.incrementAndGet(); 1934 return true; 1935 } 1936 @Override 1937 public Void result() { 1938 return null; 1939 } 1940 int onForkCount() { 1941 return onForkCount.get(); 1942 } 1943 int onCompleteCount() { 1944 return onCompleteCount.get(); 1945 } 1946 } 1947 1948 /** 1949 * A runtime exception for tests. 1950 */ 1951 private static class FooException extends RuntimeException { 1952 FooException() { } 1953 FooException(Throwable cause) { super(cause); } 1954 } 1955 1956 /** 1957 * Returns the current time in milliseconds. 1958 */ 1959 private long millisTime() { 1960 long now = System.nanoTime(); 1961 return TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS); 1962 } 1963 1964 /** 1965 * Check the duration of a task 1966 * @param start start time, in milliseconds 1967 * @param min minimum expected duration, in milliseconds 1968 * @param max maximum expected duration, in milliseconds 1969 * @return the duration (now - start), in milliseconds 1970 */ 1971 private long expectDuration(long start, long min, long max) { 1972 long duration = millisTime() - start; 1973 assertTrue(duration >= min, 1974 "Duration " + duration + "ms, expected >= " + min + "ms"); 1975 assertTrue(duration <= max, 1976 "Duration " + duration + "ms, expected <= " + max + "ms"); 1977 return duration; 1978 } 1979 1980 /** 1981 * Wait for the given scope to be cancelled. 1982 */ 1983 private static void awaitCancelled(StructuredTaskScope<?, ?> scope) throws InterruptedException { 1984 while (!scope.isCancelled()) { 1985 Thread.sleep(Duration.ofMillis(20)); 1986 } 1987 } 1988 1989 /** 1990 * Interrupts a thread when it waits (timed or untimed) at location "{@code c.m}". 1991 * {@code c} is the fully qualified class name and {@code m} is the method name. 1992 */ 1993 private void interruptThreadAt(Thread target, String location) throws InterruptedException { 1994 int index = location.lastIndexOf('.'); 1995 String className = location.substring(0, index); 1996 String methodName = location.substring(index + 1); 1997 1998 boolean found = false; 1999 while (!found) { 2000 Thread.State state = target.getState(); 2001 assertTrue(state != TERMINATED); 2002 if ((state == WAITING || state == TIMED_WAITING) 2003 && contains(target.getStackTrace(), className, methodName)) { 2004 found = true; 2005 } else { 2006 Thread.sleep(20); 2007 } 2008 } 2009 target.interrupt(); 2010 } 2011 2012 /** 2013 * Schedules the current thread to be interrupted when it waits (timed or untimed) 2014 * at the given location. 2015 */ 2016 private void scheduleInterruptAt(String location) { 2017 Thread target = Thread.currentThread(); 2018 scheduler.submit(() -> { 2019 interruptThreadAt(target, location); 2020 return null; 2021 }); 2022 } 2023 2024 /** 2025 * Calls a result returning task from another thread. 2026 */ 2027 private <V> V callInOtherThread(Callable<V> task) throws Exception { 2028 var result = new AtomicReference<V>(); 2029 var exc = new AtomicReference<Exception>(); 2030 Thread thread = Thread.ofVirtual().start(() -> { 2031 try { 2032 result.set(task.call()); 2033 } catch (Exception e) { 2034 exc.set(e); 2035 } 2036 }); 2037 boolean interrupted = false; 2038 boolean terminated = false; 2039 while (!terminated) { 2040 try { 2041 thread.join(); 2042 terminated = true; 2043 } catch (InterruptedException e) { 2044 interrupted = true; 2045 } 2046 } 2047 if (interrupted) { 2048 Thread.currentThread().interrupt(); 2049 } 2050 Exception e = exc.get(); 2051 if (e != null) { 2052 throw e; 2053 } else { 2054 return result.get(); 2055 } 2056 } 2057 2058 /** 2059 * Returns true if the given stack trace contains an element for the given class 2060 * and method name. 2061 */ 2062 private boolean contains(StackTraceElement[] stack, String className, String methodName) { 2063 return Arrays.stream(stack) 2064 .anyMatch(e -> className.equals(e.getClassName()) 2065 && methodName.equals(e.getMethodName())); 2066 } 2067 }