11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.util.concurrent;
26
27 import java.lang.invoke.MethodHandles;
28 import java.lang.invoke.VarHandle;
29 import java.time.Duration;
30 import java.util.Objects;
31 import java.util.function.Function;
32 import jdk.internal.misc.ThreadFlock;
33 import jdk.internal.invoke.MhUtil;
34
35 /**
36 * StructuredTaskScope implementation.
37 */
38 final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
39 private static final VarHandle CANCELLED =
40 MhUtil.findVarHandle(MethodHandles.lookup(), "cancelled", boolean.class);
41
42 private final Joiner<? super T, ? extends R> joiner;
43 private final ThreadFactory threadFactory;
44 private final ThreadFlock flock;
45
46 // state, only accessed by owner thread
47 private static final int ST_NEW = 0,
48 ST_FORKED = 1, // subtasks forked, need to join
49 ST_JOIN_STARTED = 2, // join started, can no longer fork
50 ST_JOIN_COMPLETED = 3, // join completed
51 ST_CLOSED = 4; // closed
59
60 // set by the timer thread, read by the owner thread
61 private volatile boolean timeoutExpired;
62
63 @SuppressWarnings("this-escape")
64 private StructuredTaskScopeImpl(Joiner<? super T, ? extends R> joiner,
65 ThreadFactory threadFactory,
66 String name) {
67 this.joiner = joiner;
68 this.threadFactory = threadFactory;
69 this.flock = ThreadFlock.open((name != null) ? name : Objects.toIdentityString(this));
70 this.state = ST_NEW;
71 }
72
73 /**
74 * Returns a new {@code StructuredTaskScope} to use the given {@code Joiner} object
75 * and with configuration that is the result of applying the given function to the
76 * default configuration.
77 */
78 static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
79 Function<Configuration, Configuration> configFunction) {
80 Objects.requireNonNull(joiner);
81
82 var config = (ConfigImpl) configFunction.apply(ConfigImpl.defaultConfig());
83 var scope = new StructuredTaskScopeImpl<T, R>(joiner, config.threadFactory(), config.name());
84
85 // schedule timeout
86 Duration timeout = config.timeout();
87 if (timeout != null) {
88 boolean scheduled = false;
89 try {
90 scope.scheduleTimeout(timeout);
91 scheduled = true;
92 } finally {
93 if (!scheduled) {
94 scope.close(); // pop if scheduling timeout failed
95 }
96 }
97 }
98
99 return scope;
100 }
101
102 /**
103 * Throws WrongThreadException if the current thread is not the owner thread.
104 */
105 private void ensureOwner() {
106 if (Thread.currentThread() != flock.owner()) {
107 throw new WrongThreadException("Current thread not owner");
108 }
109 }
110
111 /**
112 * Throws IllegalStateException if already joined or scope is closed.
113 */
114 private void ensureNotJoined() {
115 assert Thread.currentThread() == flock.owner();
116 if (state > ST_FORKED) {
117 throw new IllegalStateException("Already joined or scope is closed");
118 }
119 }
120
121 /**
122 * Throws IllegalStateException if invoked by the owner thread and the owner thread
123 * has not joined.
124 */
125 private void ensureJoinedIfOwner() {
126 if (Thread.currentThread() == flock.owner() && state <= ST_JOIN_STARTED) {
127 throw new IllegalStateException("join not called");
128 }
129 }
130
131 /**
132 * Interrupts all threads in this scope, except the current thread.
133 */
134 private void interruptAll() {
135 flock.threads()
136 .filter(t -> t != Thread.currentThread())
137 .forEach(t -> {
138 try {
139 t.interrupt();
140 } catch (Throwable ignore) { }
141 });
142 }
143
144 /**
145 * Cancel the scope if not already cancelled.
146 */
178 assert Thread.currentThread() == flock.owner();
179 if (timerTask != null) {
180 timerTask.cancel(false);
181 }
182 }
183
184 /**
185 * Invoked by the thread for a subtask when the subtask completes before scope is cancelled.
186 */
187 private void onComplete(SubtaskImpl<? extends T> subtask) {
188 assert subtask.state() != Subtask.State.UNAVAILABLE;
189 if (joiner.onComplete(subtask)) {
190 cancel();
191 }
192 }
193
194 @Override
195 public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
196 Objects.requireNonNull(task);
197 ensureOwner();
198 ensureNotJoined();
199
200 var subtask = new SubtaskImpl<U>(this, task);
201
202 // notify joiner, even if cancelled
203 if (joiner.onFork(subtask)) {
204 cancel();
205 }
206
207 if (!cancelled) {
208 // create thread to run task
209 Thread thread = threadFactory.newThread(subtask);
210 if (thread == null) {
211 throw new RejectedExecutionException("Rejected by thread factory");
212 }
213
214 // attempt to start the thread
215 try {
216 flock.start(thread);
217 } catch (IllegalStateException e) {
218 // shutdown by another thread, or underlying flock is shutdown due
219 // to unstructured use
220 }
221 }
222
223 // force owner to join
224 state = ST_FORKED;
225 return subtask;
226 }
227
228 @Override
229 public <U extends T> Subtask<U> fork(Runnable task) {
230 Objects.requireNonNull(task);
231 return fork(() -> { task.run(); return null; });
232 }
233
234 @Override
235 public R join() throws InterruptedException {
236 ensureOwner();
237 ensureNotJoined();
238
239 // join started
240 state = ST_JOIN_STARTED;
241
242 // wait for all subtasks, the scope to be cancelled, or interrupt
243 flock.awaitAll();
244
245 // throw if timeout expired
246 if (timeoutExpired) {
247 throw new TimeoutException();
248 }
249 cancelTimeout();
250
251 // all subtasks completed or cancelled
252 state = ST_JOIN_COMPLETED;
253
254 // invoke joiner to get result
255 try {
256 return joiner.result();
257 } catch (Throwable e) {
258 throw new FailedException(e);
259 }
260 }
261
262 @Override
263 public boolean isCancelled() {
264 return cancelled;
265 }
266
267 @Override
268 public void close() {
269 ensureOwner();
270 int s = state;
271 if (s == ST_CLOSED) {
272 return;
|
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.util.concurrent;
26
27 import java.lang.invoke.MethodHandles;
28 import java.lang.invoke.VarHandle;
29 import java.time.Duration;
30 import java.util.Objects;
31 import java.util.function.UnaryOperator;
32 import jdk.internal.misc.ThreadFlock;
33 import jdk.internal.invoke.MhUtil;
34
35 /**
36 * StructuredTaskScope implementation.
37 */
38 final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
39 private static final VarHandle CANCELLED =
40 MhUtil.findVarHandle(MethodHandles.lookup(), "cancelled", boolean.class);
41
42 private final Joiner<? super T, ? extends R> joiner;
43 private final ThreadFactory threadFactory;
44 private final ThreadFlock flock;
45
46 // state, only accessed by owner thread
47 private static final int ST_NEW = 0,
48 ST_FORKED = 1, // subtasks forked, need to join
49 ST_JOIN_STARTED = 2, // join started, can no longer fork
50 ST_JOIN_COMPLETED = 3, // join completed
51 ST_CLOSED = 4; // closed
59
60 // set by the timer thread, read by the owner thread
61 private volatile boolean timeoutExpired;
62
63 @SuppressWarnings("this-escape")
64 private StructuredTaskScopeImpl(Joiner<? super T, ? extends R> joiner,
65 ThreadFactory threadFactory,
66 String name) {
67 this.joiner = joiner;
68 this.threadFactory = threadFactory;
69 this.flock = ThreadFlock.open((name != null) ? name : Objects.toIdentityString(this));
70 this.state = ST_NEW;
71 }
72
73 /**
74 * Returns a new {@code StructuredTaskScope} to use the given {@code Joiner} object
75 * and with configuration that is the result of applying the given function to the
76 * default configuration.
77 */
78 static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
79 UnaryOperator<Configuration> configOperator) {
80 Objects.requireNonNull(joiner);
81
82 var config = (ConfigImpl) configOperator.apply(ConfigImpl.defaultConfig());
83 var scope = new StructuredTaskScopeImpl<T, R>(joiner, config.threadFactory(), config.name());
84
85 // schedule timeout
86 Duration timeout = config.timeout();
87 if (timeout != null) {
88 boolean scheduled = false;
89 try {
90 scope.scheduleTimeout(timeout);
91 scheduled = true;
92 } finally {
93 if (!scheduled) {
94 scope.close(); // pop if scheduling timeout failed
95 }
96 }
97 }
98
99 return scope;
100 }
101
102 /**
103 * Throws WrongThreadException if the current thread is not the owner thread.
104 */
105 private void ensureOwner() {
106 if (Thread.currentThread() != flock.owner()) {
107 throw new WrongThreadException("Current thread not owner");
108 }
109 }
110
111 /**
112 * Throws IllegalStateException if invoked by the owner thread and the owner thread
113 * has not joined.
114 */
115 private void ensureJoinedIfOwner() {
116 if (Thread.currentThread() == flock.owner() && state < ST_JOIN_STARTED) {
117 throw new IllegalStateException("join not called");
118 }
119 }
120
121 /**
122 * Interrupts all threads in this scope, except the current thread.
123 */
124 private void interruptAll() {
125 flock.threads()
126 .filter(t -> t != Thread.currentThread())
127 .forEach(t -> {
128 try {
129 t.interrupt();
130 } catch (Throwable ignore) { }
131 });
132 }
133
134 /**
135 * Cancel the scope if not already cancelled.
136 */
168 assert Thread.currentThread() == flock.owner();
169 if (timerTask != null) {
170 timerTask.cancel(false);
171 }
172 }
173
174 /**
175 * Invoked by the thread for a subtask when the subtask completes before scope is cancelled.
176 */
177 private void onComplete(SubtaskImpl<? extends T> subtask) {
178 assert subtask.state() != Subtask.State.UNAVAILABLE;
179 if (joiner.onComplete(subtask)) {
180 cancel();
181 }
182 }
183
184 @Override
185 public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
186 Objects.requireNonNull(task);
187 ensureOwner();
188 if (state > ST_FORKED) {
189 throw new IllegalStateException("join already called or scope is closed");
190 }
191
192 var subtask = new SubtaskImpl<U>(this, task);
193
194 // notify joiner, even if cancelled
195 if (joiner.onFork(subtask)) {
196 cancel();
197 }
198
199 if (!cancelled) {
200 // create thread to run task
201 Thread thread = threadFactory.newThread(subtask);
202 if (thread == null) {
203 throw new RejectedExecutionException("Rejected by thread factory");
204 }
205
206 // attempt to start the thread
207 try {
208 flock.start(thread);
209 } catch (IllegalStateException e) {
210 // shutdown by another thread, or underlying flock is shutdown due
211 // to unstructured use
212 }
213 }
214
215 // force owner to join
216 state = ST_FORKED;
217 return subtask;
218 }
219
220 @Override
221 public <U extends T> Subtask<U> fork(Runnable task) {
222 Objects.requireNonNull(task);
223 return fork(() -> { task.run(); return null; });
224 }
225
226 @Override
227 public R join() throws InterruptedException {
228 ensureOwner();
229 if (state >= ST_JOIN_COMPLETED) {
230 throw new IllegalStateException("Already joined or scope is closed");
231 }
232
233 // join started
234 state = ST_JOIN_STARTED;
235
236 // wait for all subtasks, the scope to be cancelled, or interrupt
237 flock.awaitAll();
238
239 // all subtasks completed or scope cancelled
240 state = ST_JOIN_COMPLETED;
241
242 // invoke joiner onTimeout if timeout expired
243 if (timeoutExpired) {
244 cancel(); // ensure cancelled before calling onTimeout
245 joiner.onTimeout();
246 } else {
247 cancelTimeout();
248 }
249
250 // invoke joiner to get result
251 try {
252 return joiner.result();
253 } catch (Throwable e) {
254 throw new FailedException(e);
255 }
256 }
257
258 @Override
259 public boolean isCancelled() {
260 return cancelled;
261 }
262
263 @Override
264 public void close() {
265 ensureOwner();
266 int s = state;
267 if (s == ST_CLOSED) {
268 return;
|