1 /* 2 * Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.concurrent; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.invoke.VarHandle; 29 import java.util.ArrayList; 30 import java.util.Comparator; 31 import java.util.List; 32 import java.util.NoSuchElementException; 33 import java.util.Objects; 34 import java.util.concurrent.StructuredTaskScope.Joiner; 35 import java.util.concurrent.StructuredTaskScope.Subtask; 36 import java.util.function.Predicate; 37 import java.util.stream.Stream; 38 import jdk.internal.invoke.MhUtil; 39 40 /** 41 * Built-in StructuredTaskScope.Joiner implementations. 42 */ 43 class Joiners { 44 private Joiners() { } 45 46 /** 47 * A joiner that returns a stream of all subtasks when all subtasks complete 48 * successfully. Cancels the scope if any subtask fails. 49 */ 50 static final class AllSuccessful<T> implements Joiner<T, Stream<Subtask<T>>> { 51 private static final VarHandle FIRST_EXCEPTION = 52 MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class); 53 54 // list of forked subtasks, only accessed by owner thread 55 private final List<Subtask<T>> subtasks = new ArrayList<>(); 56 57 private volatile Throwable firstException; 58 59 @Override 60 public boolean onFork(Subtask<? extends T> subtask) { 61 if (subtask.state() != Subtask.State.UNAVAILABLE) { 62 throw new IllegalArgumentException(); 63 } 64 @SuppressWarnings("unchecked") 65 var s = (Subtask<T>) subtask; 66 subtasks.add(s); 67 return false; 68 } 69 70 @Override 71 public boolean onComplete(Subtask<? extends T> subtask) { 72 Subtask.State state = subtask.state(); 73 if (state == Subtask.State.UNAVAILABLE) { 74 throw new IllegalArgumentException(); 75 } 76 return (state == Subtask.State.FAILED) 77 && (firstException == null) 78 && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception()); 79 } 80 81 @Override 82 public Stream<Subtask<T>> result() throws Throwable { 83 Throwable ex = firstException; 84 if (ex != null) { 85 throw ex; 86 } else { 87 return subtasks.stream(); 88 } 89 } 90 } 91 92 /** 93 * A joiner that returns the result of the first subtask to complete successfully. 94 * Cancels the scope if any subtasks succeeds. 95 */ 96 static final class AnySuccessful<T> implements Joiner<T, T> { 97 private static final VarHandle SUBTASK = 98 MhUtil.findVarHandle(MethodHandles.lookup(), "subtask", Subtask.class); 99 100 // UNAVAILABLE < FAILED < SUCCESS 101 private static final Comparator<Subtask.State> SUBTASK_STATE_COMPARATOR = 102 Comparator.comparingInt(AnySuccessful::stateToInt); 103 104 private volatile Subtask<T> subtask; 105 106 /** 107 * Maps a Subtask.State to an int that can be compared. 108 */ 109 private static int stateToInt(Subtask.State s) { 110 return switch (s) { 111 case UNAVAILABLE -> 0; 112 case FAILED -> 1; 113 case SUCCESS -> 2; 114 }; 115 } 116 117 @Override 118 public boolean onComplete(Subtask<? extends T> subtask) { 119 Subtask.State state = subtask.state(); 120 if (state == Subtask.State.UNAVAILABLE) { 121 throw new IllegalArgumentException(); 122 } 123 Subtask<T> s; 124 while (((s = this.subtask) == null) 125 || SUBTASK_STATE_COMPARATOR.compare(s.state(), state) < 0) { 126 if (SUBTASK.compareAndSet(this, s, subtask)) { 127 return (state == Subtask.State.SUCCESS); 128 } 129 } 130 return false; 131 } 132 133 @Override 134 public T result() throws Throwable { 135 Subtask<T> subtask = this.subtask; 136 if (subtask == null) { 137 throw new NoSuchElementException("No subtasks completed"); 138 } 139 return switch (subtask.state()) { 140 case SUCCESS -> subtask.get(); 141 case FAILED -> throw subtask.exception(); 142 default -> throw new InternalError(); 143 }; 144 } 145 } 146 147 /** 148 * A joiner that that waits for all successful subtasks. Cancels the scope if any 149 * subtask fails. 150 */ 151 static final class AwaitSuccessful<T> implements Joiner<T, Void> { 152 private static final VarHandle FIRST_EXCEPTION = 153 MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class); 154 private volatile Throwable firstException; 155 156 @Override 157 public boolean onComplete(Subtask<? extends T> subtask) { 158 Subtask.State state = subtask.state(); 159 if (state == Subtask.State.UNAVAILABLE) { 160 throw new IllegalArgumentException(); 161 } 162 return (state == Subtask.State.FAILED) 163 && (firstException == null) 164 && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception()); 165 } 166 167 @Override 168 public Void result() throws Throwable { 169 Throwable ex = firstException; 170 if (ex != null) { 171 throw ex; 172 } else { 173 return null; 174 } 175 } 176 } 177 178 /** 179 * A joiner that returns a stream of all subtasks. 180 */ 181 static class AllSubtasks<T> implements Joiner<T, Stream<Subtask<T>>> { 182 private final Predicate<Subtask<? extends T>> isDone; 183 184 // list of forked subtasks, only accessed by owner thread 185 private final List<Subtask<T>> subtasks = new ArrayList<>(); 186 187 AllSubtasks(Predicate<Subtask<? extends T>> isDone) { 188 this.isDone = Objects.requireNonNull(isDone); 189 } 190 191 @Override 192 public boolean onFork(Subtask<? extends T> subtask) { 193 if (subtask.state() != Subtask.State.UNAVAILABLE) { 194 throw new IllegalArgumentException(); 195 } 196 @SuppressWarnings("unchecked") 197 var s = (Subtask<T>) subtask; 198 subtasks.add(s); 199 return false; 200 } 201 202 @Override 203 public boolean onComplete(Subtask<? extends T> subtask) { 204 if (subtask.state() == Subtask.State.UNAVAILABLE) { 205 throw new IllegalArgumentException(); 206 } 207 return isDone.test(subtask); 208 } 209 210 @Override 211 public Stream<Subtask<T>> result() { 212 return subtasks.stream(); 213 } 214 } 215 }