11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.util.concurrent;
26
27 import java.lang.invoke.MethodHandles;
28 import java.lang.invoke.VarHandle;
29 import java.time.Duration;
30 import java.util.Objects;
31 import java.util.function.Function;
32 import jdk.internal.misc.ThreadFlock;
33 import jdk.internal.invoke.MhUtil;
34
35 /**
36 * StructuredTaskScope implementation.
37 */
38 final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
39 private static final VarHandle CANCELLED =
40 MhUtil.findVarHandle(MethodHandles.lookup(), "cancelled", boolean.class);
41
42 private final Joiner<? super T, ? extends R> joiner;
43 private final ThreadFactory threadFactory;
44 private final ThreadFlock flock;
45
46 // state, only accessed by owner thread
47 private static final int ST_NEW = 0,
48 ST_FORKED = 1, // subtasks forked, need to join
49 ST_JOIN_STARTED = 2, // join started, can no longer fork
50 ST_JOIN_COMPLETED = 3, // join completed
51 ST_CLOSED = 4; // closed
52 private int state;
53
54 // timer task, only accessed by owner thread
55 private Future<?> timerTask;
56
57 // set or read by any thread
58 private volatile boolean cancelled;
59
60 // set by the timer thread, read by the owner thread
61 private volatile boolean timeoutExpired;
62
63 @SuppressWarnings("this-escape")
64 private StructuredTaskScopeImpl(Joiner<? super T, ? extends R> joiner,
65 ThreadFactory threadFactory,
66 String name) {
67 this.joiner = joiner;
68 this.threadFactory = threadFactory;
69 this.flock = ThreadFlock.open((name != null) ? name : Objects.toIdentityString(this));
70 this.state = ST_NEW;
71 }
72
73 /**
74 * Returns a new {@code StructuredTaskScope} to use the given {@code Joiner} object
75 * and with configuration that is the result of applying the given function to the
76 * default configuration.
77 */
78 static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
79 Function<Configuration, Configuration> configFunction) {
80 Objects.requireNonNull(joiner);
81
82 var config = (ConfigImpl) configFunction.apply(ConfigImpl.defaultConfig());
83 var scope = new StructuredTaskScopeImpl<T, R>(joiner, config.threadFactory(), config.name());
84
85 // schedule timeout
86 Duration timeout = config.timeout();
87 if (timeout != null) {
88 boolean scheduled = false;
89 try {
90 scope.scheduleTimeout(timeout);
91 scheduled = true;
92 } finally {
93 if (!scheduled) {
94 scope.close(); // pop if scheduling timeout failed
95 }
96 }
97 }
98
99 return scope;
100 }
101
102 /**
103 * Throws WrongThreadException if the current thread is not the owner thread.
104 */
105 private void ensureOwner() {
106 if (Thread.currentThread() != flock.owner()) {
107 throw new WrongThreadException("Current thread not owner");
108 }
109 }
110
111 /**
112 * Throws IllegalStateException if already joined or scope is closed.
113 */
114 private void ensureNotJoined() {
115 assert Thread.currentThread() == flock.owner();
116 if (state > ST_FORKED) {
117 throw new IllegalStateException("Already joined or scope is closed");
118 }
119 }
120
121 /**
122 * Throws IllegalStateException if invoked by the owner thread and the owner thread
123 * has not joined.
124 */
125 private void ensureJoinedIfOwner() {
126 if (Thread.currentThread() == flock.owner() && state <= ST_JOIN_STARTED) {
127 throw new IllegalStateException("join not called");
128 }
129 }
130
131 /**
132 * Interrupts all threads in this scope, except the current thread.
133 */
134 private void interruptAll() {
135 flock.threads()
136 .filter(t -> t != Thread.currentThread())
137 .forEach(t -> {
138 try {
139 t.interrupt();
140 } catch (Throwable ignore) { }
141 });
142 }
143
144 /**
145 * Cancel the scope if not already cancelled.
146 */
147 private void cancel() {
148 if (!cancelled && CANCELLED.compareAndSet(this, false, true)) {
167 if (!cancelled) {
168 timeoutExpired = true;
169 cancel();
170 }
171 }, nanos, TimeUnit.NANOSECONDS);
172 }
173
174 /**
175 * Cancels the timer task if set.
176 */
177 private void cancelTimeout() {
178 assert Thread.currentThread() == flock.owner();
179 if (timerTask != null) {
180 timerTask.cancel(false);
181 }
182 }
183
184 /**
185 * Invoked by the thread for a subtask when the subtask completes before scope is cancelled.
186 */
187 private void onComplete(SubtaskImpl<? extends T> subtask) {
188 assert subtask.state() != Subtask.State.UNAVAILABLE;
189 if (joiner.onComplete(subtask)) {
190 cancel();
191 }
192 }
193
194 @Override
195 public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
196 Objects.requireNonNull(task);
197 ensureOwner();
198 ensureNotJoined();
199
200 var subtask = new SubtaskImpl<U>(this, task);
201
202 // notify joiner, even if cancelled
203 if (joiner.onFork(subtask)) {
204 cancel();
205 }
206
207 if (!cancelled) {
208 // create thread to run task
209 Thread thread = threadFactory.newThread(subtask);
210 if (thread == null) {
211 throw new RejectedExecutionException("Rejected by thread factory");
212 }
213
214 // attempt to start the thread
215 try {
216 flock.start(thread);
217 } catch (IllegalStateException e) {
218 // shutdown by another thread, or underlying flock is shutdown due
219 // to unstructured use
220 }
221 }
222
223 // force owner to join
224 state = ST_FORKED;
225 return subtask;
226 }
227
228 @Override
229 public <U extends T> Subtask<U> fork(Runnable task) {
230 Objects.requireNonNull(task);
231 return fork(() -> { task.run(); return null; });
232 }
233
234 @Override
235 public R join() throws InterruptedException {
236 ensureOwner();
237 ensureNotJoined();
238
239 // join started
240 state = ST_JOIN_STARTED;
241
242 // wait for all subtasks, the scope to be cancelled, or interrupt
243 flock.awaitAll();
244
245 // throw if timeout expired
246 if (timeoutExpired) {
247 throw new TimeoutException();
248 }
249 cancelTimeout();
250
251 // all subtasks completed or cancelled
252 state = ST_JOIN_COMPLETED;
253
254 // invoke joiner to get result
255 try {
256 return joiner.result();
257 } catch (Throwable e) {
258 throw new FailedException(e);
259 }
260 }
261
262 @Override
263 public boolean isCancelled() {
264 return cancelled;
265 }
266
267 @Override
268 public void close() {
269 ensureOwner();
270 int s = state;
271 if (s == ST_CLOSED) {
272 return;
273 }
294 @Override
295 public String toString() {
296 return flock.name();
297 }
298
299 /**
300 * Subtask implementation, runs the task specified to the fork method.
301 */
302 static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
303 private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
304
305 private record AltResult(Subtask.State state, Throwable exception) {
306 AltResult(Subtask.State state) {
307 this(state, null);
308 }
309 }
310
311 private final StructuredTaskScopeImpl<? super T, ?> scope;
312 private final Callable<? extends T> task;
313 private volatile Object result;
314
315 SubtaskImpl(StructuredTaskScopeImpl<? super T, ?> scope, Callable<? extends T> task) {
316 this.scope = scope;
317 this.task = task;
318 }
319
320 @Override
321 public void run() {
322 T result = null;
323 Throwable ex = null;
324 try {
325 result = task.call();
326 } catch (Throwable e) {
327 ex = e;
328 }
329
330 // nothing to do if scope is cancelled
331 if (scope.isCancelled())
332 return;
333
334 // set result/exception and invoke onComplete
335 if (ex == null) {
336 this.result = (result != null) ? result : RESULT_NULL;
337 } else {
338 this.result = new AltResult(State.FAILED, ex);
339 }
340 scope.onComplete(this);
341 }
342
343 @Override
344 public Subtask.State state() {
345 Object result = this.result;
346 if (result == null) {
347 return State.UNAVAILABLE;
348 } else if (result instanceof AltResult alt) {
349 // null or failed
350 return alt.state();
351 } else {
352 return State.SUCCESS;
353 }
354 }
355
356 @Override
357 public T get() {
358 scope.ensureJoinedIfOwner();
359 Object result = this.result;
360 if (result instanceof AltResult) {
361 if (result == RESULT_NULL) return null;
362 } else if (result != null) {
363 @SuppressWarnings("unchecked")
364 T r = (T) result;
365 return r;
366 }
367 throw new IllegalStateException(
368 "Result is unavailable or subtask did not complete successfully");
369 }
370
371 @Override
372 public Throwable exception() {
373 scope.ensureJoinedIfOwner();
374 Object result = this.result;
375 if (result instanceof AltResult alt && alt.state() == State.FAILED) {
376 return alt.exception();
377 }
378 throw new IllegalStateException(
379 "Exception is unavailable or subtask did not complete with exception");
380 }
381
382 @Override
383 public String toString() {
384 String stateAsString = switch (state()) {
385 case UNAVAILABLE -> "[Unavailable]";
386 case SUCCESS -> "[Completed successfully]";
387 case FAILED -> "[Failed: " + ((AltResult) result).exception() + "]";
388 };
389 return Objects.toIdentityString(this) + stateAsString;
390 }
391 }
392
393 /**
|
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.util.concurrent;
26
27 import java.lang.invoke.MethodHandles;
28 import java.lang.invoke.VarHandle;
29 import java.time.Duration;
30 import java.util.Objects;
31 import java.util.function.UnaryOperator;
32 import jdk.internal.misc.ThreadFlock;
33 import jdk.internal.invoke.MhUtil;
34 import jdk.internal.vm.annotation.Stable;
35
36 /**
37 * StructuredTaskScope implementation.
38 */
39 final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
40 private static final VarHandle CANCELLED =
41 MhUtil.findVarHandle(MethodHandles.lookup(), "cancelled", boolean.class);
42
43 private final Joiner<? super T, ? extends R> joiner;
44 private final ThreadFactory threadFactory;
45 private final ThreadFlock flock;
46
47 // scope state, set by owner thread, read by any thread
48 private static final int ST_FORKED = 1, // subtasks forked, need to join
49 ST_JOIN_STARTED = 2, // join started, can no longer fork
50 ST_JOIN_COMPLETED = 3, // join completed
51 ST_CLOSED = 4; // closed
52 private volatile int state;
53
54 // set or read by any thread
55 private volatile boolean cancelled;
56
57 // timer task, only accessed by owner thread
58 private Future<?> timerTask;
59
60 // set by the timer thread, read by the owner thread
61 private volatile boolean timeoutExpired;
62
63 @SuppressWarnings("this-escape")
64 private StructuredTaskScopeImpl(Joiner<? super T, ? extends R> joiner,
65 ThreadFactory threadFactory,
66 String name) {
67 this.joiner = joiner;
68 this.threadFactory = threadFactory;
69 this.flock = ThreadFlock.open((name != null) ? name : Objects.toIdentityString(this));
70 }
71
72 /**
73 * Returns a new {@code StructuredTaskScope} to use the given {@code Joiner} object
74 * and with configuration that is the result of applying the given function to the
75 * default configuration.
76 */
77 static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
78 UnaryOperator<Configuration> configOperator) {
79 Objects.requireNonNull(joiner);
80
81 var config = (ConfigImpl) configOperator.apply(ConfigImpl.defaultConfig());
82 var scope = new StructuredTaskScopeImpl<T, R>(joiner, config.threadFactory(), config.name());
83
84 // schedule timeout
85 Duration timeout = config.timeout();
86 if (timeout != null) {
87 boolean scheduled = false;
88 try {
89 scope.scheduleTimeout(timeout);
90 scheduled = true;
91 } finally {
92 if (!scheduled) {
93 scope.close(); // pop if scheduling timeout failed
94 }
95 }
96 }
97
98 return scope;
99 }
100
101 /**
102 * Throws WrongThreadException if the current thread is not the owner thread.
103 */
104 private void ensureOwner() {
105 if (Thread.currentThread() != flock.owner()) {
106 throw new WrongThreadException("Current thread not owner");
107 }
108 }
109
110 /**
111 * Returns true if join has been invoked and there is an outcome.
112 */
113 private boolean isJoinCompleted() {
114 return state >= ST_JOIN_COMPLETED;
115 }
116
117 /**
118 * Interrupts all threads in this scope, except the current thread.
119 */
120 private void interruptAll() {
121 flock.threads()
122 .filter(t -> t != Thread.currentThread())
123 .forEach(t -> {
124 try {
125 t.interrupt();
126 } catch (Throwable ignore) { }
127 });
128 }
129
130 /**
131 * Cancel the scope if not already cancelled.
132 */
133 private void cancel() {
134 if (!cancelled && CANCELLED.compareAndSet(this, false, true)) {
153 if (!cancelled) {
154 timeoutExpired = true;
155 cancel();
156 }
157 }, nanos, TimeUnit.NANOSECONDS);
158 }
159
160 /**
161 * Cancels the timer task if set.
162 */
163 private void cancelTimeout() {
164 assert Thread.currentThread() == flock.owner();
165 if (timerTask != null) {
166 timerTask.cancel(false);
167 }
168 }
169
170 /**
171 * Invoked by the thread for a subtask when the subtask completes before scope is cancelled.
172 */
173 private <U extends T> void onComplete(SubtaskImpl<U> subtask) {
174 assert subtask.state() != Subtask.State.UNAVAILABLE;
175 @SuppressWarnings("unchecked")
176 var j = (Joiner<U, ? extends R>) joiner;
177 if (j.onComplete(subtask)) {
178 cancel();
179 }
180 }
181
182 @Override
183 public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
184 Objects.requireNonNull(task);
185 ensureOwner();
186 int s = state;
187 if (s > ST_FORKED) {
188 throw new IllegalStateException("join already called or scope is closed");
189 }
190
191 var subtask = new SubtaskImpl<U>(this, task);
192
193 // notify joiner, even if cancelled
194 @SuppressWarnings("unchecked")
195 var j = (Joiner<U, ? extends R>) joiner;
196 if (j.onFork(subtask)) {
197 cancel();
198 }
199
200 if (!cancelled) {
201 // create thread to run task
202 Thread thread = threadFactory.newThread(subtask);
203 if (thread == null) {
204 throw new RejectedExecutionException("Rejected by thread factory");
205 }
206
207 // attempt to start the thread
208 subtask.setThread(thread);
209 try {
210 flock.start(thread);
211 } catch (IllegalStateException e) {
212 // shutdown by another thread, or underlying flock is shutdown due
213 // to unstructured use
214 }
215 }
216
217 // force owner to join
218 if (s < ST_FORKED) {
219 state = ST_FORKED;
220 }
221 return subtask;
222 }
223
224 @Override
225 public <U extends T> Subtask<U> fork(Runnable task) {
226 Objects.requireNonNull(task);
227 return fork(() -> { task.run(); return null; });
228 }
229
230 @Override
231 public R join() throws InterruptedException {
232 ensureOwner();
233 if (state >= ST_JOIN_COMPLETED) {
234 throw new IllegalStateException("Already joined or scope is closed");
235 }
236
237 // wait for all subtasks, the scope to be cancelled, or interrupt
238 try {
239 flock.awaitAll();
240 } catch (InterruptedException e) {
241 state = ST_JOIN_STARTED; // joining not completed, prevent new forks
242 throw e;
243 }
244
245 // all subtasks completed or scope cancelled
246 state = ST_JOIN_COMPLETED;
247
248 // invoke joiner onTimeout if timeout expired
249 if (timeoutExpired) {
250 cancel(); // ensure cancelled before calling onTimeout
251 joiner.onTimeout();
252 } else {
253 cancelTimeout();
254 }
255
256 // invoke joiner to get result
257 try {
258 return joiner.result();
259 } catch (Throwable e) {
260 throw new FailedException(e);
261 }
262 }
263
264 @Override
265 public boolean isCancelled() {
266 return cancelled;
267 }
268
269 @Override
270 public void close() {
271 ensureOwner();
272 int s = state;
273 if (s == ST_CLOSED) {
274 return;
275 }
296 @Override
297 public String toString() {
298 return flock.name();
299 }
300
301 /**
302 * Subtask implementation, runs the task specified to the fork method.
303 */
304 static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
305 private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
306
307 private record AltResult(Subtask.State state, Throwable exception) {
308 AltResult(Subtask.State state) {
309 this(state, null);
310 }
311 }
312
313 private final StructuredTaskScopeImpl<? super T, ?> scope;
314 private final Callable<? extends T> task;
315 private volatile Object result;
316 @Stable private Thread thread;
317
318 SubtaskImpl(StructuredTaskScopeImpl<? super T, ?> scope, Callable<? extends T> task) {
319 this.scope = scope;
320 this.task = task;
321 }
322
323 /**
324 * Sets the thread for this subtask.
325 */
326 void setThread(Thread thread) {
327 assert thread.getState() == Thread.State.NEW;
328 this.thread = thread;
329 }
330
331 /**
332 * Throws IllegalStateException if the caller thread is not the subtask and
333 * the scope owner has not joined.
334 */
335 private void ensureJoinedIfNotSubtask() {
336 if (Thread.currentThread() != thread && !scope.isJoinCompleted()) {
337 throw new IllegalStateException();
338 }
339 }
340
341 @Override
342 public void run() {
343 if (Thread.currentThread() != thread) {
344 throw new WrongThreadException();
345 }
346
347 T result = null;
348 Throwable ex = null;
349 try {
350 result = task.call();
351 } catch (Throwable e) {
352 ex = e;
353 }
354
355 // nothing to do if scope is cancelled
356 if (scope.isCancelled())
357 return;
358
359 // set result/exception and invoke onComplete
360 if (ex == null) {
361 this.result = (result != null) ? result : RESULT_NULL;
362 } else {
363 this.result = new AltResult(State.FAILED, ex);
364 }
365 scope.onComplete(this);
366 }
367
368 @Override
369 public Subtask.State state() {
370 Object result = this.result;
371 if (result == null) {
372 return State.UNAVAILABLE;
373 } else if (result instanceof AltResult alt) {
374 // null or failed
375 return alt.state();
376 } else {
377 return State.SUCCESS;
378 }
379 }
380
381 @Override
382 public T get() {
383 ensureJoinedIfNotSubtask();
384 Object result = this.result;
385 if (result instanceof AltResult) {
386 if (result == RESULT_NULL) return null;
387 } else if (result != null) {
388 @SuppressWarnings("unchecked")
389 T r = (T) result;
390 return r;
391 }
392 throw new IllegalStateException(
393 "Result is unavailable or subtask did not complete successfully");
394 }
395
396 @Override
397 public Throwable exception() {
398 ensureJoinedIfNotSubtask();
399 Object result = this.result;
400 if (result instanceof AltResult alt && alt.state() == State.FAILED) {
401 return alt.exception();
402 }
403 throw new IllegalStateException(
404 "Exception is unavailable or subtask did not complete with exception");
405 }
406
407 @Override
408 public String toString() {
409 String stateAsString = switch (state()) {
410 case UNAVAILABLE -> "[Unavailable]";
411 case SUCCESS -> "[Completed successfully]";
412 case FAILED -> "[Failed: " + ((AltResult) result).exception() + "]";
413 };
414 return Objects.toIdentityString(this) + stateAsString;
415 }
416 }
417
418 /**
|