1 /*
2 * Copyright (c) 2024, 2025, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.util.concurrent;
26
27 import java.lang.invoke.MethodHandles;
28 import java.lang.invoke.VarHandle;
29 import java.util.ArrayList;
30 import java.util.Comparator;
31 import java.util.List;
32 import java.util.NoSuchElementException;
33 import java.util.Objects;
34 import java.util.concurrent.StructuredTaskScope.Joiner;
35 import java.util.concurrent.StructuredTaskScope.Subtask;
36 import java.util.function.Predicate;
37 import java.util.stream.Stream;
38 import jdk.internal.invoke.MhUtil;
39
40 /**
41 * Built-in StructuredTaskScope.Joiner implementations.
42 */
43 class Joiners {
44 private Joiners() { }
45
46 /**
47 * Throws IllegalArgumentException if the subtask is not in the UNAVAILABLE state.
48 */
49 private static void ensureUnavailable(Subtask<?> subtask) {
50 if (subtask.state() != Subtask.State.UNAVAILABLE) {
51 throw new IllegalArgumentException("Subtask not in UNAVAILABLE state");
52 }
53 }
54
55 /**
56 * Throws IllegalArgumentException if the subtask has not completed.
57 */
58 private static Subtask.State ensureCompleted(Subtask<?> subtask) {
59 Subtask.State state = subtask.state();
60 if (state == Subtask.State.UNAVAILABLE) {
61 throw new IllegalArgumentException("Subtask has not completed");
62 }
63 return state;
64 }
65
66 /**
67 * A joiner that returns a stream of all subtasks when all subtasks complete
68 * successfully. Cancels the scope if any subtask fails.
69 */
70 static final class AllSuccessful<T> implements Joiner<T, Stream<Subtask<T>>> {
71 private static final VarHandle FIRST_EXCEPTION =
72 MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class);
73
74 // list of forked subtasks, only accessed by owner thread
75 private final List<Subtask<T>> subtasks = new ArrayList<>();
76
77 private volatile Throwable firstException;
78
79 @Override
80 public boolean onFork(Subtask<? extends T> subtask) {
81 ensureUnavailable(subtask);
82 @SuppressWarnings("unchecked")
83 var s = (Subtask<T>) subtask;
84 subtasks.add(s);
85 return false;
86 }
87
88 @Override
89 public boolean onComplete(Subtask<? extends T> subtask) {
90 Subtask.State state = ensureCompleted(subtask);
91 return (state == Subtask.State.FAILED)
92 && (firstException == null)
93 && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
94 }
95
96 @Override
97 public Stream<Subtask<T>> result() throws Throwable {
98 Throwable ex = firstException;
99 if (ex != null) {
100 throw ex;
101 } else {
102 return subtasks.stream();
103 }
104 }
105 }
106
107 /**
108 * A joiner that returns the result of the first subtask to complete successfully.
109 * Cancels the scope if any subtasks succeeds.
110 */
111 static final class AnySuccessful<T> implements Joiner<T, T> {
112 private static final VarHandle SUBTASK =
113 MhUtil.findVarHandle(MethodHandles.lookup(), "subtask", Subtask.class);
114
115 // UNAVAILABLE < FAILED < SUCCESS
116 private static final Comparator<Subtask.State> SUBTASK_STATE_COMPARATOR =
117 Comparator.comparingInt(AnySuccessful::stateToInt);
118
119 private volatile Subtask<T> subtask;
120
121 /**
122 * Maps a Subtask.State to an int that can be compared.
123 */
124 private static int stateToInt(Subtask.State s) {
125 return switch (s) {
126 case UNAVAILABLE -> 0;
127 case FAILED -> 1;
128 case SUCCESS -> 2;
129 };
130 }
131
132 @Override
133 public boolean onComplete(Subtask<? extends T> subtask) {
134 Subtask.State state = ensureCompleted(subtask);
135 Subtask<T> s;
136 while (((s = this.subtask) == null)
137 || SUBTASK_STATE_COMPARATOR.compare(s.state(), state) < 0) {
138 if (SUBTASK.compareAndSet(this, s, subtask)) {
139 return (state == Subtask.State.SUCCESS);
140 }
141 }
142 return false;
143 }
144
145 @Override
146 public T result() throws Throwable {
147 Subtask<T> subtask = this.subtask;
148 if (subtask == null) {
149 throw new NoSuchElementException("No subtasks completed");
150 }
151 return switch (subtask.state()) {
152 case SUCCESS -> subtask.get();
153 case FAILED -> throw subtask.exception();
154 default -> throw new InternalError();
155 };
156 }
157 }
158
159 /**
160 * A joiner that that waits for all successful subtasks. Cancels the scope if any
161 * subtask fails.
162 */
163 static final class AwaitSuccessful<T> implements Joiner<T, Void> {
164 private static final VarHandle FIRST_EXCEPTION =
165 MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class);
166 private volatile Throwable firstException;
167
168 @Override
169 public boolean onComplete(Subtask<? extends T> subtask) {
170 Subtask.State state = ensureCompleted(subtask);
171 return (state == Subtask.State.FAILED)
172 && (firstException == null)
173 && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
174 }
175
176 @Override
177 public Void result() throws Throwable {
178 Throwable ex = firstException;
179 if (ex != null) {
180 throw ex;
181 } else {
182 return null;
183 }
184 }
185 }
186
187 /**
188 * A joiner that returns a stream of all subtasks.
189 */
190 static final class AllSubtasks<T> implements Joiner<T, Stream<Subtask<T>>> {
191 private final Predicate<Subtask<? extends T>> isDone;
192
193 // list of forked subtasks, only accessed by owner thread
194 private final List<Subtask<T>> subtasks = new ArrayList<>();
195
196 AllSubtasks(Predicate<Subtask<? extends T>> isDone) {
197 this.isDone = Objects.requireNonNull(isDone);
198 }
199
200 @Override
201 public boolean onFork(Subtask<? extends T> subtask) {
202 ensureUnavailable(subtask);
203 @SuppressWarnings("unchecked")
204 var s = (Subtask<T>) subtask;
205 subtasks.add(s);
206 return false;
207 }
208
209 @Override
210 public boolean onComplete(Subtask<? extends T> subtask) {
211 ensureCompleted(subtask);
212 return isDone.test(subtask);
213 }
214
215 @Override
216 public Stream<Subtask<T>> result() {
217 return subtasks.stream();
218 }
219 }
220 }