< prev index next >

src/java.base/share/classes/java/util/concurrent/StructuredTaskScopeImpl.java

Print this page

 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package java.util.concurrent;
 26 
 27 import java.lang.invoke.MethodHandles;
 28 import java.lang.invoke.VarHandle;
 29 import java.time.Duration;
 30 import java.util.Objects;
 31 import java.util.function.Function;
 32 import jdk.internal.misc.ThreadFlock;
 33 import jdk.internal.invoke.MhUtil;

 34 
 35 /**
 36  * StructuredTaskScope implementation.
 37  */
 38 final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
 39     private static final VarHandle CANCELLED =
 40             MhUtil.findVarHandle(MethodHandles.lookup(), "cancelled", boolean.class);
 41 
 42     private final Joiner<? super T, ? extends R> joiner;
 43     private final ThreadFactory threadFactory;
 44     private final ThreadFlock flock;
 45 
 46     // state, only accessed by owner thread
 47     private static final int ST_NEW            = 0,
 48                              ST_FORKED         = 1,   // subtasks forked, need to join
 49                              ST_JOIN_STARTED   = 2,   // join started, can no longer fork
 50                              ST_JOIN_COMPLETED = 3,   // join completed
 51                              ST_CLOSED         = 4;   // closed
 52     private int state;
 53 
 54     // timer task, only accessed by owner thread
 55     private Future<?> timerTask;
 56 
 57     // set or read by any thread
 58     private volatile boolean cancelled;
 59 



 60     // set by the timer thread, read by the owner thread
 61     private volatile boolean timeoutExpired;
 62 
 63     @SuppressWarnings("this-escape")
 64     private StructuredTaskScopeImpl(Joiner<? super T, ? extends R> joiner,
 65                                     ThreadFactory threadFactory,
 66                                     String name) {
 67         this.joiner = joiner;
 68         this.threadFactory = threadFactory;
 69         this.flock = ThreadFlock.open((name != null) ? name : Objects.toIdentityString(this));
 70         this.state = ST_NEW;
 71     }
 72 
 73     /**
 74      * Returns a new {@code StructuredTaskScope} to use the given {@code Joiner} object
 75      * and with configuration that is the result of applying the given function to the
 76      * default configuration.
 77      */
 78     static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
 79                                                  Function<Configuration, Configuration> configFunction) {
 80         Objects.requireNonNull(joiner);
 81 
 82         var config = (ConfigImpl) configFunction.apply(ConfigImpl.defaultConfig());
 83         var scope = new StructuredTaskScopeImpl<T, R>(joiner, config.threadFactory(), config.name());
 84 
 85         // schedule timeout
 86         Duration timeout = config.timeout();
 87         if (timeout != null) {
 88             boolean scheduled = false;
 89             try {
 90                 scope.scheduleTimeout(timeout);
 91                 scheduled = true;
 92             } finally {
 93                 if (!scheduled) {
 94                     scope.close();  // pop if scheduling timeout failed
 95                 }
 96             }
 97         }
 98 
 99         return scope;
100     }
101 
102     /**
103      * Throws WrongThreadException if the current thread is not the owner thread.
104      */
105     private void ensureOwner() {
106         if (Thread.currentThread() != flock.owner()) {
107             throw new WrongThreadException("Current thread not owner");
108         }
109     }
110 
111     /**
112      * Throws IllegalStateException if already joined or scope is closed.
113      */
114     private void ensureNotJoined() {
115         assert Thread.currentThread() == flock.owner();
116         if (state > ST_FORKED) {
117             throw new IllegalStateException("Already joined or scope is closed");
118         }
119     }
120 
121     /**
122      * Throws IllegalStateException if invoked by the owner thread and the owner thread
123      * has not joined.
124      */
125     private void ensureJoinedIfOwner() {
126         if (Thread.currentThread() == flock.owner() && state <= ST_JOIN_STARTED) {
127             throw new IllegalStateException("join not called");
128         }
129     }
130 
131     /**
132      * Interrupts all threads in this scope, except the current thread.
133      */
134     private void interruptAll() {
135         flock.threads()
136                 .filter(t -> t != Thread.currentThread())
137                 .forEach(t -> {
138                     try {
139                         t.interrupt();
140                     } catch (Throwable ignore) { }
141                 });
142     }
143 
144     /**
145      * Cancel the scope if not already cancelled.
146      */
147     private void cancel() {
148         if (!cancelled && CANCELLED.compareAndSet(this, false, true)) {

167             if (!cancelled) {
168                 timeoutExpired = true;
169                 cancel();
170             }
171         }, nanos, TimeUnit.NANOSECONDS);
172     }
173 
174     /**
175      * Cancels the timer task if set.
176      */
177     private void cancelTimeout() {
178         assert Thread.currentThread() == flock.owner();
179         if (timerTask != null) {
180             timerTask.cancel(false);
181         }
182     }
183 
184     /**
185      * Invoked by the thread for a subtask when the subtask completes before scope is cancelled.
186      */
187     private void onComplete(SubtaskImpl<? extends T> subtask) {
188         assert subtask.state() != Subtask.State.UNAVAILABLE;
189         if (joiner.onComplete(subtask)) {


190             cancel();
191         }
192     }
193 
194     @Override
195     public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
196         Objects.requireNonNull(task);
197         ensureOwner();
198         ensureNotJoined();



199 
200         var subtask = new SubtaskImpl<U>(this, task);
201 
202         // notify joiner, even if cancelled
203         if (joiner.onFork(subtask)) {


204             cancel();
205         }
206 
207         if (!cancelled) {
208             // create thread to run task
209             Thread thread = threadFactory.newThread(subtask);
210             if (thread == null) {
211                 throw new RejectedExecutionException("Rejected by thread factory");
212             }
213 
214             // attempt to start the thread

215             try {
216                 flock.start(thread);
217             } catch (IllegalStateException e) {
218                 // shutdown by another thread, or underlying flock is shutdown due
219                 // to unstructured use
220             }
221         }
222 
223         // force owner to join
224         state = ST_FORKED;


225         return subtask;
226     }
227 
228     @Override
229     public <U extends T> Subtask<U> fork(Runnable task) {
230         Objects.requireNonNull(task);
231         return fork(() -> { task.run(); return null; });
232     }
233 
234     @Override
235     public R join() throws InterruptedException {
236         ensureOwner();
237         ensureNotJoined();
238 
239         // join started
240         state = ST_JOIN_STARTED;
241 
242         // wait for all subtasks, the scope to be cancelled, or interrupt
243         flock.awaitAll();
244 
245         // throw if timeout expired
246         if (timeoutExpired) {
247             throw new TimeoutException();
248         }
249         cancelTimeout();
250 
251         // all subtasks completed or cancelled
252         state = ST_JOIN_COMPLETED;
253 








254         // invoke joiner to get result
255         try {
256             return joiner.result();
257         } catch (Throwable e) {
258             throw new FailedException(e);
259         }
260     }
261 
262     @Override
263     public boolean isCancelled() {
264         return cancelled;
265     }
266 
267     @Override
268     public void close() {
269         ensureOwner();
270         int s = state;
271         if (s == ST_CLOSED) {
272             return;
273         }

294     @Override
295     public String toString() {
296         return flock.name();
297     }
298 
299     /**
300      * Subtask implementation, runs the task specified to the fork method.
301      */
302     static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
303         private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
304 
305         private record AltResult(Subtask.State state, Throwable exception) {
306             AltResult(Subtask.State state) {
307                 this(state, null);
308             }
309         }
310 
311         private final StructuredTaskScopeImpl<? super T, ?> scope;
312         private final Callable<? extends T> task;
313         private volatile Object result;

314 
315         SubtaskImpl(StructuredTaskScopeImpl<? super T, ?> scope, Callable<? extends T> task) {
316             this.scope = scope;
317             this.task = task;
318         }
319 


















320         @Override
321         public void run() {




322             T result = null;
323             Throwable ex = null;
324             try {
325                 result = task.call();
326             } catch (Throwable e) {
327                 ex = e;
328             }
329 
330             // nothing to do if scope is cancelled
331             if (scope.isCancelled())
332                 return;
333 
334             // set result/exception and invoke onComplete
335             if (ex == null) {
336                 this.result = (result != null) ? result : RESULT_NULL;
337             } else {
338                 this.result = new AltResult(State.FAILED, ex);
339             }
340             scope.onComplete(this);
341         }
342 
343         @Override
344         public Subtask.State state() {
345             Object result = this.result;
346             if (result == null) {
347                 return State.UNAVAILABLE;
348             } else if (result instanceof AltResult alt) {
349                 // null or failed
350                 return alt.state();
351             } else {
352                 return State.SUCCESS;
353             }
354         }
355 
356         @Override
357         public T get() {
358             scope.ensureJoinedIfOwner();
359             Object result = this.result;
360             if (result instanceof AltResult) {
361                 if (result == RESULT_NULL) return null;
362             } else if (result != null) {
363                 @SuppressWarnings("unchecked")
364                 T r = (T) result;
365                 return r;
366             }
367             throw new IllegalStateException(
368                     "Result is unavailable or subtask did not complete successfully");
369         }
370 
371         @Override
372         public Throwable exception() {
373             scope.ensureJoinedIfOwner();
374             Object result = this.result;
375             if (result instanceof AltResult alt && alt.state() == State.FAILED) {
376                 return alt.exception();
377             }
378             throw new IllegalStateException(
379                     "Exception is unavailable or subtask did not complete with exception");
380         }
381 
382         @Override
383         public String toString() {
384             String stateAsString = switch (state()) {
385                 case UNAVAILABLE -> "[Unavailable]";
386                 case SUCCESS     -> "[Completed successfully]";
387                 case FAILED      -> "[Failed: " + ((AltResult) result).exception() + "]";
388             };
389             return Objects.toIdentityString(this) + stateAsString;
390         }
391     }
392 
393     /**

 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package java.util.concurrent;
 26 
 27 import java.lang.invoke.MethodHandles;
 28 import java.lang.invoke.VarHandle;
 29 import java.time.Duration;
 30 import java.util.Objects;
 31 import java.util.function.UnaryOperator;
 32 import jdk.internal.misc.ThreadFlock;
 33 import jdk.internal.invoke.MhUtil;
 34 import jdk.internal.vm.annotation.Stable;
 35 
 36 /**
 37  * StructuredTaskScope implementation.
 38  */
 39 final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
 40     private static final VarHandle CANCELLED =
 41             MhUtil.findVarHandle(MethodHandles.lookup(), "cancelled", boolean.class);
 42 
 43     private final Joiner<? super T, ? extends R> joiner;
 44     private final ThreadFactory threadFactory;
 45     private final ThreadFlock flock;
 46 
 47     // scope state, set by owner thread, read by any thread
 48     private static final int ST_FORKED         = 1,   // subtasks forked, need to join

 49                              ST_JOIN_STARTED   = 2,   // join started, can no longer fork
 50                              ST_JOIN_COMPLETED = 3,   // join completed
 51                              ST_CLOSED         = 4;   // closed
 52     private volatile int state;



 53 
 54     // set or read by any thread
 55     private volatile boolean cancelled;
 56 
 57     // timer task, only accessed by owner thread
 58     private Future<?> timerTask;
 59 
 60     // set by the timer thread, read by the owner thread
 61     private volatile boolean timeoutExpired;
 62 
 63     @SuppressWarnings("this-escape")
 64     private StructuredTaskScopeImpl(Joiner<? super T, ? extends R> joiner,
 65                                     ThreadFactory threadFactory,
 66                                     String name) {
 67         this.joiner = joiner;
 68         this.threadFactory = threadFactory;
 69         this.flock = ThreadFlock.open((name != null) ? name : Objects.toIdentityString(this));

 70     }
 71 
 72     /**
 73      * Returns a new {@code StructuredTaskScope} to use the given {@code Joiner} object
 74      * and with configuration that is the result of applying the given function to the
 75      * default configuration.
 76      */
 77     static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
 78                                                  UnaryOperator<Configuration> configOperator) {
 79         Objects.requireNonNull(joiner);
 80 
 81         var config = (ConfigImpl) configOperator.apply(ConfigImpl.defaultConfig());
 82         var scope = new StructuredTaskScopeImpl<T, R>(joiner, config.threadFactory(), config.name());
 83 
 84         // schedule timeout
 85         Duration timeout = config.timeout();
 86         if (timeout != null) {
 87             boolean scheduled = false;
 88             try {
 89                 scope.scheduleTimeout(timeout);
 90                 scheduled = true;
 91             } finally {
 92                 if (!scheduled) {
 93                     scope.close();  // pop if scheduling timeout failed
 94                 }
 95             }
 96         }
 97 
 98         return scope;
 99     }
100 
101     /**
102      * Throws WrongThreadException if the current thread is not the owner thread.
103      */
104     private void ensureOwner() {
105         if (Thread.currentThread() != flock.owner()) {
106             throw new WrongThreadException("Current thread not owner");
107         }
108     }
109 
110     /**
111      * Returns true if join has been invoked and there is an outcome.
112      */
113     private boolean isJoinCompleted() {
114         return state >= ST_JOIN_COMPLETED;













115     }
116 
117     /**
118      * Interrupts all threads in this scope, except the current thread.
119      */
120     private void interruptAll() {
121         flock.threads()
122                 .filter(t -> t != Thread.currentThread())
123                 .forEach(t -> {
124                     try {
125                         t.interrupt();
126                     } catch (Throwable ignore) { }
127                 });
128     }
129 
130     /**
131      * Cancel the scope if not already cancelled.
132      */
133     private void cancel() {
134         if (!cancelled && CANCELLED.compareAndSet(this, false, true)) {

153             if (!cancelled) {
154                 timeoutExpired = true;
155                 cancel();
156             }
157         }, nanos, TimeUnit.NANOSECONDS);
158     }
159 
160     /**
161      * Cancels the timer task if set.
162      */
163     private void cancelTimeout() {
164         assert Thread.currentThread() == flock.owner();
165         if (timerTask != null) {
166             timerTask.cancel(false);
167         }
168     }
169 
170     /**
171      * Invoked by the thread for a subtask when the subtask completes before scope is cancelled.
172      */
173     private <U extends T> void onComplete(SubtaskImpl<U> subtask) {
174         assert subtask.state() != Subtask.State.UNAVAILABLE;
175         @SuppressWarnings("unchecked")
176         var j = (Joiner<U, ? extends R>) joiner;
177         if (j.onComplete(subtask)) {
178             cancel();
179         }
180     }
181 
182     @Override
183     public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
184         Objects.requireNonNull(task);
185         ensureOwner();
186         int s = state;
187         if (s > ST_FORKED) {
188             throw new IllegalStateException("join already called or scope is closed");
189         }
190 
191         var subtask = new SubtaskImpl<U>(this, task);
192 
193         // notify joiner, even if cancelled
194         @SuppressWarnings("unchecked")
195         var j = (Joiner<U, ? extends R>) joiner;
196         if (j.onFork(subtask)) {
197             cancel();
198         }
199 
200         if (!cancelled) {
201             // create thread to run task
202             Thread thread = threadFactory.newThread(subtask);
203             if (thread == null) {
204                 throw new RejectedExecutionException("Rejected by thread factory");
205             }
206 
207             // attempt to start the thread
208             subtask.setThread(thread);
209             try {
210                 flock.start(thread);
211             } catch (IllegalStateException e) {
212                 // shutdown by another thread, or underlying flock is shutdown due
213                 // to unstructured use
214             }
215         }
216 
217         // force owner to join
218         if (s < ST_FORKED) {
219             state = ST_FORKED;
220         }
221         return subtask;
222     }
223 
224     @Override
225     public <U extends T> Subtask<U> fork(Runnable task) {
226         Objects.requireNonNull(task);
227         return fork(() -> { task.run(); return null; });
228     }
229 
230     @Override
231     public R join() throws InterruptedException {
232         ensureOwner();
233         if (state >= ST_JOIN_COMPLETED) {
234             throw new IllegalStateException("Already joined or scope is closed");
235         }

236 
237         // wait for all subtasks, the scope to be cancelled, or interrupt
238         try {
239             flock.awaitAll();
240         } catch (InterruptedException e) {
241             state = ST_JOIN_STARTED;  // joining not completed, prevent new forks
242             throw e;
243         }

244 
245         // all subtasks completed or scope cancelled
246         state = ST_JOIN_COMPLETED;
247 
248         // invoke joiner onTimeout if timeout expired
249         if (timeoutExpired) {
250             cancel();  // ensure cancelled before calling onTimeout
251             joiner.onTimeout();
252         } else {
253             cancelTimeout();
254         }
255 
256         // invoke joiner to get result
257         try {
258             return joiner.result();
259         } catch (Throwable e) {
260             throw new FailedException(e);
261         }
262     }
263 
264     @Override
265     public boolean isCancelled() {
266         return cancelled;
267     }
268 
269     @Override
270     public void close() {
271         ensureOwner();
272         int s = state;
273         if (s == ST_CLOSED) {
274             return;
275         }

296     @Override
297     public String toString() {
298         return flock.name();
299     }
300 
301     /**
302      * Subtask implementation, runs the task specified to the fork method.
303      */
304     static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
305         private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
306 
307         private record AltResult(Subtask.State state, Throwable exception) {
308             AltResult(Subtask.State state) {
309                 this(state, null);
310             }
311         }
312 
313         private final StructuredTaskScopeImpl<? super T, ?> scope;
314         private final Callable<? extends T> task;
315         private volatile Object result;
316         @Stable private Thread thread;
317 
318         SubtaskImpl(StructuredTaskScopeImpl<? super T, ?> scope, Callable<? extends T> task) {
319             this.scope = scope;
320             this.task = task;
321         }
322 
323         /**
324          * Sets the thread for this subtask.
325          */
326         void setThread(Thread thread) {
327             assert thread.getState() == Thread.State.NEW;
328             this.thread = thread;
329         }
330 
331         /**
332          * Throws IllegalStateException if the caller thread is not the subtask and
333          * the scope owner has not joined.
334          */
335         private void ensureJoinedIfNotSubtask() {
336             if (Thread.currentThread() != thread && !scope.isJoinCompleted()) {
337                 throw new IllegalStateException();
338             }
339         }
340 
341         @Override
342         public void run() {
343             if (Thread.currentThread() != thread) {
344                 throw new WrongThreadException();
345             }
346 
347             T result = null;
348             Throwable ex = null;
349             try {
350                 result = task.call();
351             } catch (Throwable e) {
352                 ex = e;
353             }
354 
355             // nothing to do if scope is cancelled
356             if (scope.isCancelled())
357                 return;
358 
359             // set result/exception and invoke onComplete
360             if (ex == null) {
361                 this.result = (result != null) ? result : RESULT_NULL;
362             } else {
363                 this.result = new AltResult(State.FAILED, ex);
364             }
365             scope.onComplete(this);
366         }
367 
368         @Override
369         public Subtask.State state() {
370             Object result = this.result;
371             if (result == null) {
372                 return State.UNAVAILABLE;
373             } else if (result instanceof AltResult alt) {
374                 // null or failed
375                 return alt.state();
376             } else {
377                 return State.SUCCESS;
378             }
379         }
380 
381         @Override
382         public T get() {
383             ensureJoinedIfNotSubtask();
384             Object result = this.result;
385             if (result instanceof AltResult) {
386                 if (result == RESULT_NULL) return null;
387             } else if (result != null) {
388                 @SuppressWarnings("unchecked")
389                 T r = (T) result;
390                 return r;
391             }
392             throw new IllegalStateException(
393                     "Result is unavailable or subtask did not complete successfully");
394         }
395 
396         @Override
397         public Throwable exception() {
398             ensureJoinedIfNotSubtask();
399             Object result = this.result;
400             if (result instanceof AltResult alt && alt.state() == State.FAILED) {
401                 return alt.exception();
402             }
403             throw new IllegalStateException(
404                     "Exception is unavailable or subtask did not complete with exception");
405         }
406 
407         @Override
408         public String toString() {
409             String stateAsString = switch (state()) {
410                 case UNAVAILABLE -> "[Unavailable]";
411                 case SUCCESS     -> "[Completed successfully]";
412                 case FAILED      -> "[Failed: " + ((AltResult) result).exception() + "]";
413             };
414             return Objects.toIdentityString(this) + stateAsString;
415         }
416     }
417 
418     /**
< prev index next >