1 /* 2 * Copyright (c) 2024, 2025, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.concurrent; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.invoke.VarHandle; 29 import java.time.Duration; 30 import java.util.Objects; 31 import java.util.function.Function; 32 import jdk.internal.misc.ThreadFlock; 33 import jdk.internal.invoke.MhUtil; 34 35 /** 36 * StructuredTaskScope implementation. 37 */ 38 final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> { 39 private static final VarHandle CANCELLED = 40 MhUtil.findVarHandle(MethodHandles.lookup(), "cancelled", boolean.class); 41 42 private final Joiner<? super T, ? extends R> joiner; 43 private final ThreadFactory threadFactory; 44 private final ThreadFlock flock; 45 46 // state, only accessed by owner thread 47 private static final int ST_NEW = 0, 48 ST_FORKED = 1, // subtasks forked, need to join 49 ST_JOIN_STARTED = 2, // join started, can no longer fork 50 ST_JOIN_COMPLETED = 3, // join completed 51 ST_CLOSED = 4; // closed 52 private int state; 53 54 // timer task, only accessed by owner thread 55 private Future<?> timerTask; 56 57 // set or read by any thread 58 private volatile boolean cancelled; 59 60 // set by the timer thread, read by the owner thread 61 private volatile boolean timeoutExpired; 62 63 @SuppressWarnings("this-escape") 64 private StructuredTaskScopeImpl(Joiner<? super T, ? extends R> joiner, 65 ThreadFactory threadFactory, 66 String name) { 67 this.joiner = joiner; 68 this.threadFactory = threadFactory; 69 this.flock = ThreadFlock.open((name != null) ? name : Objects.toIdentityString(this)); 70 this.state = ST_NEW; 71 } 72 73 /** 74 * Returns a new {@code StructuredTaskScope} to use the given {@code Joiner} object 75 * and with configuration that is the result of applying the given function to the 76 * default configuration. 77 */ 78 static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner, 79 Function<Configuration, Configuration> configFunction) { 80 Objects.requireNonNull(joiner); 81 82 var config = (ConfigImpl) configFunction.apply(ConfigImpl.defaultConfig()); 83 var scope = new StructuredTaskScopeImpl<T, R>(joiner, config.threadFactory(), config.name()); 84 85 // schedule timeout 86 Duration timeout = config.timeout(); 87 if (timeout != null) { 88 boolean scheduled = false; 89 try { 90 scope.scheduleTimeout(timeout); 91 scheduled = true; 92 } finally { 93 if (!scheduled) { 94 scope.close(); // pop if scheduling timeout failed 95 } 96 } 97 } 98 99 return scope; 100 } 101 102 /** 103 * Throws WrongThreadException if the current thread is not the owner thread. 104 */ 105 private void ensureOwner() { 106 if (Thread.currentThread() != flock.owner()) { 107 throw new WrongThreadException("Current thread not owner"); 108 } 109 } 110 111 /** 112 * Throws IllegalStateException if already joined or scope is closed. 113 */ 114 private void ensureNotJoined() { 115 assert Thread.currentThread() == flock.owner(); 116 if (state > ST_FORKED) { 117 throw new IllegalStateException("Already joined or scope is closed"); 118 } 119 } 120 121 /** 122 * Throws IllegalStateException if invoked by the owner thread and the owner thread 123 * has not joined. 124 */ 125 private void ensureJoinedIfOwner() { 126 if (Thread.currentThread() == flock.owner() && state <= ST_JOIN_STARTED) { 127 throw new IllegalStateException("join not called"); 128 } 129 } 130 131 /** 132 * Interrupts all threads in this scope, except the current thread. 133 */ 134 private void interruptAll() { 135 flock.threads() 136 .filter(t -> t != Thread.currentThread()) 137 .forEach(t -> { 138 try { 139 t.interrupt(); 140 } catch (Throwable ignore) { } 141 }); 142 } 143 144 /** 145 * Cancel the scope if not already cancelled. 146 */ 147 private void cancel() { 148 if (!cancelled && CANCELLED.compareAndSet(this, false, true)) { 149 // prevent new threads from starting 150 flock.shutdown(); 151 152 // interrupt all unfinished threads 153 interruptAll(); 154 155 // wakeup join 156 flock.wakeup(); 157 } 158 } 159 160 /** 161 * Schedules a task to cancel the scope on timeout. 162 */ 163 private void scheduleTimeout(Duration timeout) { 164 assert Thread.currentThread() == flock.owner() && timerTask == null; 165 long nanos = TimeUnit.NANOSECONDS.convert(timeout); 166 timerTask = ForkJoinPool.commonPool().schedule(() -> { 167 if (!cancelled) { 168 timeoutExpired = true; 169 cancel(); 170 } 171 }, nanos, TimeUnit.NANOSECONDS); 172 } 173 174 /** 175 * Cancels the timer task if set. 176 */ 177 private void cancelTimeout() { 178 assert Thread.currentThread() == flock.owner(); 179 if (timerTask != null) { 180 timerTask.cancel(false); 181 } 182 } 183 184 /** 185 * Invoked by the thread for a subtask when the subtask completes before scope is cancelled. 186 */ 187 private void onComplete(SubtaskImpl<? extends T> subtask) { 188 assert subtask.state() != Subtask.State.UNAVAILABLE; 189 if (joiner.onComplete(subtask)) { 190 cancel(); 191 } 192 } 193 194 @Override 195 public <U extends T> Subtask<U> fork(Callable<? extends U> task) { 196 Objects.requireNonNull(task); 197 ensureOwner(); 198 ensureNotJoined(); 199 200 var subtask = new SubtaskImpl<U>(this, task); 201 202 // notify joiner, even if cancelled 203 if (joiner.onFork(subtask)) { 204 cancel(); 205 } 206 207 if (!cancelled) { 208 // create thread to run task 209 Thread thread = threadFactory.newThread(subtask); 210 if (thread == null) { 211 throw new RejectedExecutionException("Rejected by thread factory"); 212 } 213 214 // attempt to start the thread 215 try { 216 flock.start(thread); 217 } catch (IllegalStateException e) { 218 // shutdown by another thread, or underlying flock is shutdown due 219 // to unstructured use 220 } 221 } 222 223 // force owner to join 224 state = ST_FORKED; 225 return subtask; 226 } 227 228 @Override 229 public <U extends T> Subtask<U> fork(Runnable task) { 230 Objects.requireNonNull(task); 231 return fork(() -> { task.run(); return null; }); 232 } 233 234 @Override 235 public R join() throws InterruptedException { 236 ensureOwner(); 237 ensureNotJoined(); 238 239 // join started 240 state = ST_JOIN_STARTED; 241 242 // wait for all subtasks, the scope to be cancelled, or interrupt 243 flock.awaitAll(); 244 245 // throw if timeout expired 246 if (timeoutExpired) { 247 throw new TimeoutException(); 248 } 249 cancelTimeout(); 250 251 // all subtasks completed or cancelled 252 state = ST_JOIN_COMPLETED; 253 254 // invoke joiner to get result 255 try { 256 return joiner.result(); 257 } catch (Throwable e) { 258 throw new FailedException(e); 259 } 260 } 261 262 @Override 263 public boolean isCancelled() { 264 return cancelled; 265 } 266 267 @Override 268 public void close() { 269 ensureOwner(); 270 int s = state; 271 if (s == ST_CLOSED) { 272 return; 273 } 274 275 // cancel the scope if join did not complete 276 if (s < ST_JOIN_COMPLETED) { 277 cancel(); 278 cancelTimeout(); 279 } 280 281 // wait for stragglers 282 try { 283 flock.close(); 284 } finally { 285 state = ST_CLOSED; 286 } 287 288 // throw ISE if the owner didn't join after forking 289 if (s == ST_FORKED) { 290 throw new IllegalStateException("Owner did not join after forking"); 291 } 292 } 293 294 @Override 295 public String toString() { 296 return flock.name(); 297 } 298 299 /** 300 * Subtask implementation, runs the task specified to the fork method. 301 */ 302 static final class SubtaskImpl<T> implements Subtask<T>, Runnable { 303 private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS); 304 305 private record AltResult(Subtask.State state, Throwable exception) { 306 AltResult(Subtask.State state) { 307 this(state, null); 308 } 309 } 310 311 private final StructuredTaskScopeImpl<? super T, ?> scope; 312 private final Callable<? extends T> task; 313 private volatile Object result; 314 315 SubtaskImpl(StructuredTaskScopeImpl<? super T, ?> scope, Callable<? extends T> task) { 316 this.scope = scope; 317 this.task = task; 318 } 319 320 @Override 321 public void run() { 322 T result = null; 323 Throwable ex = null; 324 try { 325 result = task.call(); 326 } catch (Throwable e) { 327 ex = e; 328 } 329 330 // nothing to do if scope is cancelled 331 if (scope.isCancelled()) 332 return; 333 334 // set result/exception and invoke onComplete 335 if (ex == null) { 336 this.result = (result != null) ? result : RESULT_NULL; 337 } else { 338 this.result = new AltResult(State.FAILED, ex); 339 } 340 scope.onComplete(this); 341 } 342 343 @Override 344 public Subtask.State state() { 345 Object result = this.result; 346 if (result == null) { 347 return State.UNAVAILABLE; 348 } else if (result instanceof AltResult alt) { 349 // null or failed 350 return alt.state(); 351 } else { 352 return State.SUCCESS; 353 } 354 } 355 356 @Override 357 public T get() { 358 scope.ensureJoinedIfOwner(); 359 Object result = this.result; 360 if (result instanceof AltResult) { 361 if (result == RESULT_NULL) return null; 362 } else if (result != null) { 363 @SuppressWarnings("unchecked") 364 T r = (T) result; 365 return r; 366 } 367 throw new IllegalStateException( 368 "Result is unavailable or subtask did not complete successfully"); 369 } 370 371 @Override 372 public Throwable exception() { 373 scope.ensureJoinedIfOwner(); 374 Object result = this.result; 375 if (result instanceof AltResult alt && alt.state() == State.FAILED) { 376 return alt.exception(); 377 } 378 throw new IllegalStateException( 379 "Exception is unavailable or subtask did not complete with exception"); 380 } 381 382 @Override 383 public String toString() { 384 String stateAsString = switch (state()) { 385 case UNAVAILABLE -> "[Unavailable]"; 386 case SUCCESS -> "[Completed successfully]"; 387 case FAILED -> "[Failed: " + ((AltResult) result).exception() + "]"; 388 }; 389 return Objects.toIdentityString(this) + stateAsString; 390 } 391 } 392 393 /** 394 * Configuration implementation. 395 */ 396 record ConfigImpl(ThreadFactory threadFactory, 397 String name, 398 Duration timeout) implements Configuration { 399 static Configuration defaultConfig() { 400 return new ConfigImpl(Thread.ofVirtual().factory(), null, null); 401 } 402 403 @Override 404 public Configuration withThreadFactory(ThreadFactory threadFactory) { 405 return new ConfigImpl(Objects.requireNonNull(threadFactory), name, timeout); 406 } 407 408 @Override 409 public Configuration withName(String name) { 410 return new ConfigImpl(threadFactory, Objects.requireNonNull(name), timeout); 411 } 412 413 @Override 414 public Configuration withTimeout(Duration timeout) { 415 return new ConfigImpl(threadFactory, name, Objects.requireNonNull(timeout)); 416 } 417 } 418 }