1 /* 2 * Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.concurrent; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.invoke.VarHandle; 29 import java.util.ArrayList; 30 import java.util.Comparator; 31 import java.util.List; 32 import java.util.NoSuchElementException; 33 import java.util.Objects; 34 import java.util.concurrent.StructuredTaskScope.Joiner; 35 import java.util.concurrent.StructuredTaskScope.Subtask; 36 import java.util.function.Predicate; 37 import java.util.stream.Stream; 38 import jdk.internal.invoke.MhUtil; 39 40 /** 41 * Built-in StructuredTaskScope.Joiner implementations. 42 */ 43 class Joiners { 44 private Joiners() { } 45 46 /** 47 * A joiner that returns a stream of all subtasks when all subtasks complete 48 * successfully. Cancels the scope if any subtask fails. 49 */ 50 static final class AllSuccessful<T> implements Joiner<T, Stream<Subtask<T>>> { 51 private static final VarHandle FIRST_EXCEPTION = 52 MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class); 53 54 // list of forked subtasks, only accessed by owner thread 55 private final List<Subtask<T>> subtasks = new ArrayList<>(); 56 57 private volatile Throwable firstException; 58 59 @Override 60 public boolean onFork(Subtask<? extends T> subtask) { 61 if (subtask.state() != Subtask.State.UNAVAILABLE) { 62 throw new IllegalArgumentException(); 63 } 64 @SuppressWarnings("unchecked") 65 var s = (Subtask<T>) subtask; 66 subtasks.add(s); 67 return false; 68 } 69 70 @Override 71 public boolean onComplete(Subtask<? extends T> subtask) { 72 Subtask.State state = subtask.state(); 73 if (state == Subtask.State.UNAVAILABLE) { 74 throw new IllegalArgumentException(); 75 } 76 return (state == Subtask.State.FAILED) 77 && (firstException == null) 78 && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception()); 79 } 80 81 @Override 82 public Stream<Subtask<T>> result() throws Throwable { 83 Throwable ex = firstException; 84 if (ex != null) { 85 throw ex; 86 } else { 87 return subtasks.stream(); 88 } 89 } 90 } 91 92 /** 93 * A joiner that returns the result of the first subtask to complete successfully. 94 * Cancels the scope if any subtasks succeeds. 95 */ 96 static final class AnySuccessful<T> implements Joiner<T, T> { 97 private static final VarHandle SUBTASK = 98 MhUtil.findVarHandle(MethodHandles.lookup(), "subtask", Subtask.class); 99 100 // UNAVAILABLE < FAILED < SUCCESS 101 private static final Comparator<Subtask.State> SUBTASK_STATE_COMPARATOR = 102 Comparator.comparingInt(AnySuccessful::stateToInt); 103 104 private volatile Subtask<T> subtask; 105 106 /** 107 * Maps a Subtask.State to an int that can be compared. 108 */ 109 private static int stateToInt(Subtask.State s) { 110 return switch (s) { 111 case UNAVAILABLE -> 0; 112 case FAILED -> 1; 113 case SUCCESS -> 2; 114 }; 115 } 116 117 @Override 118 public boolean onComplete(Subtask<? extends T> subtask) { 119 Subtask.State state = subtask.state(); 120 if (state == Subtask.State.UNAVAILABLE) { 121 throw new IllegalArgumentException(); 122 } 123 Subtask<T> s; 124 while (((s = this.subtask) == null) 125 || SUBTASK_STATE_COMPARATOR.compare(s.state(), state) < 0) { 126 if (SUBTASK.compareAndSet(this, s, subtask)) { 127 return (state == Subtask.State.SUCCESS); 128 } 129 } 130 return false; 131 } 132 133 @Override 134 public T result() throws Throwable { 135 Subtask<T> subtask = this.subtask; 136 if (subtask == null) { 137 throw new NoSuchElementException("No subtasks completed"); 138 } 139 return switch (subtask.state()) { 140 case SUCCESS -> subtask.get(); 141 case FAILED -> throw subtask.exception(); 142 default -> throw new InternalError(); 143 }; 144 } 145 } 146 147 /** 148 * A joiner that that waits for all successful subtasks. Cancels the scope if any 149 * subtask fails. 150 */ 151 static final class AwaitSuccessful<T> implements Joiner<T, Void> { 152 private static final VarHandle FIRST_EXCEPTION = 153 MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class); 154 private volatile Throwable firstException; 155 156 @Override 157 public boolean onComplete(Subtask<? extends T> subtask) { 158 return (subtask.state() == Subtask.State.FAILED) 159 && (firstException == null) 160 && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception()); 161 } 162 163 @Override 164 public Void result() throws Throwable { 165 Throwable ex = firstException; 166 if (ex != null) { 167 throw ex; 168 } else { 169 return null; 170 } 171 } 172 } 173 174 /** 175 * A joiner that returns a stream of all subtasks. 176 */ 177 static class AllSubtasks<T> implements Joiner<T, Stream<Subtask<T>>> { 178 private final Predicate<Subtask<? extends T>> isDone; 179 180 // list of forked subtasks, only accessed by owner thread 181 private final List<Subtask<T>> subtasks = new ArrayList<>(); 182 183 AllSubtasks(Predicate<Subtask<? extends T>> isDone) { 184 this.isDone = Objects.requireNonNull(isDone); 185 } 186 187 @Override 188 public boolean onFork(Subtask<? extends T> subtask) { 189 if (subtask.state() != Subtask.State.UNAVAILABLE) { 190 throw new IllegalArgumentException(); 191 } 192 @SuppressWarnings("unchecked") 193 var s = (Subtask<T>) subtask; 194 subtasks.add(s); 195 return false; 196 } 197 198 @Override 199 public boolean onComplete(Subtask<? extends T> subtask) { 200 if (subtask.state() == Subtask.State.UNAVAILABLE) { 201 throw new IllegalArgumentException(); 202 } 203 return isDone.test(subtask); 204 } 205 206 @Override 207 public Stream<Subtask<T>> result() { 208 return subtasks.stream(); 209 } 210 } 211 }