1 /* 2 * Copyright (c) 2024, 2025, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.concurrent; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.invoke.VarHandle; 29 import java.util.ArrayList; 30 import java.util.Comparator; 31 import java.util.List; 32 import java.util.NoSuchElementException; 33 import java.util.Objects; 34 import java.util.concurrent.StructuredTaskScope.Joiner; 35 import java.util.concurrent.StructuredTaskScope.Subtask; 36 import java.util.function.Predicate; 37 import java.util.stream.Stream; 38 import jdk.internal.invoke.MhUtil; 39 40 /** 41 * Built-in StructuredTaskScope.Joiner implementations. 42 */ 43 class Joiners { 44 private Joiners() { } 45 46 /** 47 * Throws IllegalArgumentException if the subtask is not in the UNAVAILABLE state. 48 */ 49 private static void ensureUnavailable(Subtask<?> subtask) { 50 if (subtask.state() != Subtask.State.UNAVAILABLE) { 51 throw new IllegalArgumentException("Subtask not in UNAVAILABLE state"); 52 } 53 } 54 55 /** 56 * Throws IllegalArgumentException if the subtask has not completed. 57 */ 58 private static Subtask.State ensureCompleted(Subtask<?> subtask) { 59 Subtask.State state = subtask.state(); 60 if (state == Subtask.State.UNAVAILABLE) { 61 throw new IllegalArgumentException("Subtask has not completed"); 62 } 63 return state; 64 } 65 66 /** 67 * A joiner that returns a stream of all subtasks when all subtasks complete 68 * successfully. Cancels the scope if any subtask fails. 69 */ 70 static final class AllSuccessful<T> implements Joiner<T, Stream<Subtask<T>>> { 71 private static final VarHandle FIRST_EXCEPTION = 72 MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class); 73 74 // list of forked subtasks, only accessed by owner thread 75 private final List<Subtask<T>> subtasks = new ArrayList<>(); 76 77 private volatile Throwable firstException; 78 79 @Override 80 public boolean onFork(Subtask<? extends T> subtask) { 81 ensureUnavailable(subtask); 82 @SuppressWarnings("unchecked") 83 var s = (Subtask<T>) subtask; 84 subtasks.add(s); 85 return false; 86 } 87 88 @Override 89 public boolean onComplete(Subtask<? extends T> subtask) { 90 Subtask.State state = ensureCompleted(subtask); 91 return (state == Subtask.State.FAILED) 92 && (firstException == null) 93 && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception()); 94 } 95 96 @Override 97 public Stream<Subtask<T>> result() throws Throwable { 98 Throwable ex = firstException; 99 if (ex != null) { 100 throw ex; 101 } else { 102 return subtasks.stream(); 103 } 104 } 105 } 106 107 /** 108 * A joiner that returns the result of the first subtask to complete successfully. 109 * Cancels the scope if any subtasks succeeds. 110 */ 111 static final class AnySuccessful<T> implements Joiner<T, T> { 112 private static final VarHandle SUBTASK = 113 MhUtil.findVarHandle(MethodHandles.lookup(), "subtask", Subtask.class); 114 115 // UNAVAILABLE < FAILED < SUCCESS 116 private static final Comparator<Subtask.State> SUBTASK_STATE_COMPARATOR = 117 Comparator.comparingInt(AnySuccessful::stateToInt); 118 119 private volatile Subtask<T> subtask; 120 121 /** 122 * Maps a Subtask.State to an int that can be compared. 123 */ 124 private static int stateToInt(Subtask.State s) { 125 return switch (s) { 126 case UNAVAILABLE -> 0; 127 case FAILED -> 1; 128 case SUCCESS -> 2; 129 }; 130 } 131 132 @Override 133 public boolean onComplete(Subtask<? extends T> subtask) { 134 Subtask.State state = ensureCompleted(subtask); 135 Subtask<T> s; 136 while (((s = this.subtask) == null) 137 || SUBTASK_STATE_COMPARATOR.compare(s.state(), state) < 0) { 138 if (SUBTASK.compareAndSet(this, s, subtask)) { 139 return (state == Subtask.State.SUCCESS); 140 } 141 } 142 return false; 143 } 144 145 @Override 146 public T result() throws Throwable { 147 Subtask<T> subtask = this.subtask; 148 if (subtask == null) { 149 throw new NoSuchElementException("No subtasks completed"); 150 } 151 return switch (subtask.state()) { 152 case SUCCESS -> subtask.get(); 153 case FAILED -> throw subtask.exception(); 154 default -> throw new InternalError(); 155 }; 156 } 157 } 158 159 /** 160 * A joiner that that waits for all successful subtasks. Cancels the scope if any 161 * subtask fails. 162 */ 163 static final class AwaitSuccessful<T> implements Joiner<T, Void> { 164 private static final VarHandle FIRST_EXCEPTION = 165 MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class); 166 private volatile Throwable firstException; 167 168 @Override 169 public boolean onComplete(Subtask<? extends T> subtask) { 170 Subtask.State state = ensureCompleted(subtask); 171 return (state == Subtask.State.FAILED) 172 && (firstException == null) 173 && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception()); 174 } 175 176 @Override 177 public Void result() throws Throwable { 178 Throwable ex = firstException; 179 if (ex != null) { 180 throw ex; 181 } else { 182 return null; 183 } 184 } 185 } 186 187 /** 188 * A joiner that returns a stream of all subtasks. 189 */ 190 static class AllSubtasks<T> implements Joiner<T, Stream<Subtask<T>>> { 191 private final Predicate<Subtask<? extends T>> isDone; 192 193 // list of forked subtasks, only accessed by owner thread 194 private final List<Subtask<T>> subtasks = new ArrayList<>(); 195 196 AllSubtasks(Predicate<Subtask<? extends T>> isDone) { 197 this.isDone = Objects.requireNonNull(isDone); 198 } 199 200 @Override 201 public boolean onFork(Subtask<? extends T> subtask) { 202 ensureUnavailable(subtask); 203 @SuppressWarnings("unchecked") 204 var s = (Subtask<T>) subtask; 205 subtasks.add(s); 206 return false; 207 } 208 209 @Override 210 public boolean onComplete(Subtask<? extends T> subtask) { 211 ensureCompleted(subtask); 212 return isDone.test(subtask); 213 } 214 215 @Override 216 public Stream<Subtask<T>> result() { 217 return subtasks.stream(); 218 } 219 } 220 }