1 /*
2 * Copyright (c) 2024, 2025, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.util.concurrent;
26
27 import java.lang.invoke.MethodHandles;
28 import java.lang.invoke.VarHandle;
29 import java.time.Duration;
30 import java.util.Objects;
31 import java.util.function.Function;
32 import jdk.internal.misc.ThreadFlock;
33 import jdk.internal.invoke.MhUtil;
34
35 /**
36 * StructuredTaskScope implementation.
37 */
38 final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
39 private static final VarHandle CANCELLED =
40 MhUtil.findVarHandle(MethodHandles.lookup(), "cancelled", boolean.class);
41
42 private final Joiner<? super T, ? extends R> joiner;
43 private final ThreadFactory threadFactory;
44 private final ThreadFlock flock;
45
46 // state, only accessed by owner thread
47 private static final int ST_NEW = 0,
48 ST_FORKED = 1, // subtasks forked, need to join
49 ST_JOIN_STARTED = 2, // join started, can no longer fork
50 ST_JOIN_COMPLETED = 3, // join completed
51 ST_CLOSED = 4; // closed
52 private int state;
53
54 // timer task, only accessed by owner thread
55 private Future<?> timerTask;
56
57 // set or read by any thread
58 private volatile boolean cancelled;
59
60 // set by the timer thread, read by the owner thread
61 private volatile boolean timeoutExpired;
62
63 @SuppressWarnings("this-escape")
64 private StructuredTaskScopeImpl(Joiner<? super T, ? extends R> joiner,
65 ThreadFactory threadFactory,
66 String name) {
67 this.joiner = joiner;
68 this.threadFactory = threadFactory;
69 this.flock = ThreadFlock.open((name != null) ? name : Objects.toIdentityString(this));
70 this.state = ST_NEW;
71 }
72
73 /**
74 * Returns a new {@code StructuredTaskScope} to use the given {@code Joiner} object
75 * and with configuration that is the result of applying the given function to the
76 * default configuration.
77 */
78 static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
79 Function<Configuration, Configuration> configFunction) {
80 Objects.requireNonNull(joiner);
81
82 var config = (ConfigImpl) configFunction.apply(ConfigImpl.defaultConfig());
83 var scope = new StructuredTaskScopeImpl<T, R>(joiner, config.threadFactory(), config.name());
84
85 // schedule timeout
86 Duration timeout = config.timeout();
87 if (timeout != null) {
88 boolean scheduled = false;
89 try {
90 scope.scheduleTimeout(timeout);
91 scheduled = true;
92 } finally {
93 if (!scheduled) {
94 scope.close(); // pop if scheduling timeout failed
95 }
96 }
97 }
98
99 return scope;
100 }
101
102 /**
103 * Throws WrongThreadException if the current thread is not the owner thread.
104 */
105 private void ensureOwner() {
106 if (Thread.currentThread() != flock.owner()) {
107 throw new WrongThreadException("Current thread not owner");
108 }
109 }
110
111 /**
112 * Throws IllegalStateException if already joined or scope is closed.
113 */
114 private void ensureNotJoined() {
115 assert Thread.currentThread() == flock.owner();
116 if (state > ST_FORKED) {
117 throw new IllegalStateException("Already joined or scope is closed");
118 }
119 }
120
121 /**
122 * Throws IllegalStateException if invoked by the owner thread and the owner thread
123 * has not joined.
124 */
125 private void ensureJoinedIfOwner() {
126 if (Thread.currentThread() == flock.owner() && state <= ST_JOIN_STARTED) {
127 throw new IllegalStateException("join not called");
128 }
129 }
130
131 /**
132 * Interrupts all threads in this scope, except the current thread.
133 */
134 private void interruptAll() {
135 flock.threads()
136 .filter(t -> t != Thread.currentThread())
137 .forEach(t -> {
138 try {
139 t.interrupt();
140 } catch (Throwable ignore) { }
141 });
142 }
143
144 /**
145 * Cancel the scope if not already cancelled.
146 */
147 private void cancel() {
148 if (!cancelled && CANCELLED.compareAndSet(this, false, true)) {
149 // prevent new threads from starting
150 flock.shutdown();
151
152 // interrupt all unfinished threads
153 interruptAll();
154
155 // wakeup join
156 flock.wakeup();
157 }
158 }
159
160 /**
161 * Schedules a task to cancel the scope on timeout.
162 */
163 private void scheduleTimeout(Duration timeout) {
164 assert Thread.currentThread() == flock.owner() && timerTask == null;
165 long nanos = TimeUnit.NANOSECONDS.convert(timeout);
166 timerTask = ForkJoinPool.commonPool().schedule(() -> {
167 if (!cancelled) {
168 timeoutExpired = true;
169 cancel();
170 }
171 }, nanos, TimeUnit.NANOSECONDS);
172 }
173
174 /**
175 * Cancels the timer task if set.
176 */
177 private void cancelTimeout() {
178 assert Thread.currentThread() == flock.owner();
179 if (timerTask != null) {
180 timerTask.cancel(false);
181 }
182 }
183
184 /**
185 * Invoked by the thread for a subtask when the subtask completes before scope is cancelled.
186 */
187 private void onComplete(SubtaskImpl<? extends T> subtask) {
188 assert subtask.state() != Subtask.State.UNAVAILABLE;
189 if (joiner.onComplete(subtask)) {
190 cancel();
191 }
192 }
193
194 @Override
195 public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
196 Objects.requireNonNull(task);
197 ensureOwner();
198 ensureNotJoined();
199
200 var subtask = new SubtaskImpl<U>(this, task);
201
202 // notify joiner, even if cancelled
203 if (joiner.onFork(subtask)) {
204 cancel();
205 }
206
207 if (!cancelled) {
208 // create thread to run task
209 Thread thread = threadFactory.newThread(subtask);
210 if (thread == null) {
211 throw new RejectedExecutionException("Rejected by thread factory");
212 }
213
214 // attempt to start the thread
215 try {
216 flock.start(thread);
217 } catch (IllegalStateException e) {
218 // shutdown by another thread, or underlying flock is shutdown due
219 // to unstructured use
220 }
221 }
222
223 // force owner to join
224 state = ST_FORKED;
225 return subtask;
226 }
227
228 @Override
229 public <U extends T> Subtask<U> fork(Runnable task) {
230 Objects.requireNonNull(task);
231 return fork(() -> { task.run(); return null; });
232 }
233
234 @Override
235 public R join() throws InterruptedException {
236 ensureOwner();
237 ensureNotJoined();
238
239 // join started
240 state = ST_JOIN_STARTED;
241
242 // wait for all subtasks, the scope to be cancelled, or interrupt
243 flock.awaitAll();
244
245 // throw if timeout expired
246 if (timeoutExpired) {
247 throw new TimeoutException();
248 }
249 cancelTimeout();
250
251 // all subtasks completed or cancelled
252 state = ST_JOIN_COMPLETED;
253
254 // invoke joiner to get result
255 try {
256 return joiner.result();
257 } catch (Throwable e) {
258 throw new FailedException(e);
259 }
260 }
261
262 @Override
263 public boolean isCancelled() {
264 return cancelled;
265 }
266
267 @Override
268 public void close() {
269 ensureOwner();
270 int s = state;
271 if (s == ST_CLOSED) {
272 return;
273 }
274
275 // cancel the scope if join did not complete
276 if (s < ST_JOIN_COMPLETED) {
277 cancel();
278 cancelTimeout();
279 }
280
281 // wait for stragglers
282 try {
283 flock.close();
284 } finally {
285 state = ST_CLOSED;
286 }
287
288 // throw ISE if the owner didn't join after forking
289 if (s == ST_FORKED) {
290 throw new IllegalStateException("Owner did not join after forking");
291 }
292 }
293
294 @Override
295 public String toString() {
296 return flock.name();
297 }
298
299 /**
300 * Subtask implementation, runs the task specified to the fork method.
301 */
302 static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
303 private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
304
305 private record AltResult(Subtask.State state, Throwable exception) {
306 AltResult(Subtask.State state) {
307 this(state, null);
308 }
309 }
310
311 private final StructuredTaskScopeImpl<? super T, ?> scope;
312 private final Callable<? extends T> task;
313 private volatile Object result;
314
315 SubtaskImpl(StructuredTaskScopeImpl<? super T, ?> scope, Callable<? extends T> task) {
316 this.scope = scope;
317 this.task = task;
318 }
319
320 @Override
321 public void run() {
322 T result = null;
323 Throwable ex = null;
324 try {
325 result = task.call();
326 } catch (Throwable e) {
327 ex = e;
328 }
329
330 // nothing to do if scope is cancelled
331 if (scope.isCancelled())
332 return;
333
334 // set result/exception and invoke onComplete
335 if (ex == null) {
336 this.result = (result != null) ? result : RESULT_NULL;
337 } else {
338 this.result = new AltResult(State.FAILED, ex);
339 }
340 scope.onComplete(this);
341 }
342
343 @Override
344 public Subtask.State state() {
345 Object result = this.result;
346 if (result == null) {
347 return State.UNAVAILABLE;
348 } else if (result instanceof AltResult alt) {
349 // null or failed
350 return alt.state();
351 } else {
352 return State.SUCCESS;
353 }
354 }
355
356 @Override
357 public T get() {
358 scope.ensureJoinedIfOwner();
359 Object result = this.result;
360 if (result instanceof AltResult) {
361 if (result == RESULT_NULL) return null;
362 } else if (result != null) {
363 @SuppressWarnings("unchecked")
364 T r = (T) result;
365 return r;
366 }
367 throw new IllegalStateException(
368 "Result is unavailable or subtask did not complete successfully");
369 }
370
371 @Override
372 public Throwable exception() {
373 scope.ensureJoinedIfOwner();
374 Object result = this.result;
375 if (result instanceof AltResult alt && alt.state() == State.FAILED) {
376 return alt.exception();
377 }
378 throw new IllegalStateException(
379 "Exception is unavailable or subtask did not complete with exception");
380 }
381
382 @Override
383 public String toString() {
384 String stateAsString = switch (state()) {
385 case UNAVAILABLE -> "[Unavailable]";
386 case SUCCESS -> "[Completed successfully]";
387 case FAILED -> "[Failed: " + ((AltResult) result).exception() + "]";
388 };
389 return Objects.toIdentityString(this) + stateAsString;
390 }
391 }
392
393 /**
394 * Configuration implementation.
395 */
396 record ConfigImpl(ThreadFactory threadFactory,
397 String name,
398 Duration timeout) implements Configuration {
399 static Configuration defaultConfig() {
400 return new ConfigImpl(Thread.ofVirtual().factory(), null, null);
401 }
402
403 @Override
404 public Configuration withThreadFactory(ThreadFactory threadFactory) {
405 return new ConfigImpl(Objects.requireNonNull(threadFactory), name, timeout);
406 }
407
408 @Override
409 public Configuration withName(String name) {
410 return new ConfigImpl(threadFactory, Objects.requireNonNull(name), timeout);
411 }
412
413 @Override
414 public Configuration withTimeout(Duration timeout) {
415 return new ConfigImpl(threadFactory, name, Objects.requireNonNull(timeout));
416 }
417 }
418 }