1 /*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation. Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36 package java.util.concurrent;
37
38 import java.lang.Thread.UncaughtExceptionHandler;
39 import java.lang.reflect.Field;
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.Collections;
43 import java.util.List;
44 import java.util.Objects;
45 import java.util.function.Consumer;
46 import java.util.function.Predicate;
47 import java.util.concurrent.CountDownLatch;
48 import java.util.concurrent.locks.LockSupport;
49 import jdk.internal.access.JavaLangAccess;
50 import jdk.internal.access.JavaUtilConcurrentFJPAccess;
51 import jdk.internal.access.SharedSecrets;
52 import jdk.internal.misc.Unsafe;
53 import jdk.internal.vm.SharedThreadContainer;
54 import static java.util.concurrent.DelayScheduler.ScheduledForkJoinTask;
55
56 /**
57 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
58 * A {@code ForkJoinPool} provides the entry point for submissions
59 * from non-{@code ForkJoinTask} clients, as well as management and
60 * monitoring operations.
61 *
62 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
63 * ExecutorService} mainly by virtue of employing
64 * <em>work-stealing</em>: all threads in the pool attempt to find and
65 * execute tasks submitted to the pool and/or created by other active
66 * tasks (eventually blocking waiting for work if none exist). This
67 * enables efficient processing when most tasks spawn other subtasks
68 * (as do most {@code ForkJoinTask}s), as well as when many small
69 * tasks are submitted to the pool from external clients. Especially
70 * when setting <em>asyncMode</em> to true in constructors, {@code
71 * ForkJoinPool}s may also be appropriate for use with event-style
72 * tasks that are never joined. All worker threads are initialized
73 * with {@link Thread#isDaemon} set {@code true}.
74 *
75 * <p>A static {@link #commonPool()} is available and appropriate for
76 * most applications. The common pool is used by any ForkJoinTask that
77 * is not explicitly submitted to a specified pool. Using the common
78 * pool normally reduces resource usage (its threads are slowly
79 * reclaimed during periods of non-use, and reinstated upon subsequent
80 * use).
81 *
82 * <p>For applications that require separate or custom pools, a {@code
83 * ForkJoinPool} may be constructed with a given target parallelism
84 * level; by default, equal to the number of available processors.
85 * The pool attempts to maintain enough active (or available) threads
86 * by dynamically adding, suspending, or resuming internal worker
87 * threads, even if some tasks are stalled waiting to join others.
88 * However, no such adjustments are guaranteed in the face of blocked
89 * I/O or other unmanaged synchronization. The nested {@link
90 * ManagedBlocker} interface enables extension of the kinds of
91 * synchronization accommodated. The default policies may be
92 * overridden using a constructor with parameters corresponding to
93 * those documented in class {@link ThreadPoolExecutor}.
94 *
95 * <p>In addition to execution and lifecycle control methods, this
96 * class provides status check methods (for example
97 * {@link #getStealCount}) that are intended to aid in developing,
98 * tuning, and monitoring fork/join applications. Also, method
99 * {@link #toString} returns indications of pool state in a
100 * convenient form for informal monitoring.
101 *
102 * <p>As is the case with other ExecutorServices, there are three
103 * main task execution methods summarized in the following table.
104 * These are designed to be used primarily by clients not already
105 * engaged in fork/join computations in the current pool. The main
106 * forms of these methods accept instances of {@code ForkJoinTask},
107 * but overloaded forms also allow mixed execution of plain {@code
108 * Runnable}- or {@code Callable}- based activities as well. However,
109 * tasks that are already executing in a pool should normally instead
110 * use the within-computation forms listed in the table unless using
111 * async event-style tasks that are not usually joined, in which case
112 * there is little difference among choice of methods.
113 *
114 * <table class="plain">
115 * <caption>Summary of task execution methods</caption>
116 * <tr>
117 * <td></td>
118 * <th scope="col"> Call from non-fork/join clients</th>
119 * <th scope="col"> Call from within fork/join computations</th>
120 * </tr>
121 * <tr>
122 * <th scope="row" style="text-align:left"> Arrange async execution</th>
123 * <td> {@link #execute(ForkJoinTask)}</td>
124 * <td> {@link ForkJoinTask#fork}</td>
125 * </tr>
126 * <tr>
127 * <th scope="row" style="text-align:left"> Await and obtain result</th>
128 * <td> {@link #invoke(ForkJoinTask)}</td>
129 * <td> {@link ForkJoinTask#invoke}</td>
130 * </tr>
131 * <tr>
132 * <th scope="row" style="text-align:left"> Arrange exec and obtain Future</th>
133 * <td> {@link #submit(ForkJoinTask)}</td>
134 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
135 * </tr>
136 * </table>
137 *
138 * <p>Additionally, this class supports {@link
139 * ScheduledExecutorService} methods to delay or periodically execute
140 * tasks, as well as method {@link #submitWithTimeout} to cancel tasks
141 * that take too long. The scheduled functions or actions may create
142 * and invoke other {@linkplain ForkJoinTask ForkJoinTasks}. Delayed
143 * actions become enabled for execution and behave as ordinary submitted
144 * tasks when their delays elapse. Scheduling methods return
145 * {@linkplain ForkJoinTask ForkJoinTasks} that implement the {@link
146 * ScheduledFuture} interface. Resource exhaustion encountered after
147 * initial submission results in task cancellation. When time-based
148 * methods are used, shutdown policies match the default policies of
149 * class {@link ScheduledThreadPoolExecutor}: upon {@link #shutdown},
150 * existing periodic tasks will not re-execute, and the pool
151 * terminates when quiescent and existing delayed tasks
152 * complete. Method {@link #cancelDelayedTasksOnShutdown} may be used
153 * to disable all delayed tasks upon shutdown, and method {@link
154 * #shutdownNow} may be used to instead unconditionally initiate pool
155 * termination. Monitoring methods such as {@link #getQueuedTaskCount}
156 * do not include scheduled tasks that are not yet enabled for execution,
157 * which are reported separately by method {@link
158 * #getDelayedTaskCount}.
159 *
160 * <p>The parameters used to construct the common pool may be controlled by
161 * setting the following {@linkplain System#getProperty system properties}:
162 * <ul>
163 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.parallelism}
164 * - the parallelism level, a non-negative integer. Usage is discouraged.
165 * Use {@link #setParallelism} instead.
166 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.threadFactory}
167 * - the class name of a {@link ForkJoinWorkerThreadFactory}.
168 * The {@linkplain ClassLoader#getSystemClassLoader() system class loader}
169 * is used to load this class.
170 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.exceptionHandler}
171 * - the class name of a {@link UncaughtExceptionHandler}.
172 * The {@linkplain ClassLoader#getSystemClassLoader() system class loader}
173 * is used to load this class.
174 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.maximumSpares}
175 * - the maximum number of allowed extra threads to maintain target
176 * parallelism (default 256).
177 * </ul>
178 * If no thread factory is supplied via a system property, then the
179 * common pool uses a factory that uses the system class loader as the
180 * {@linkplain Thread#getContextClassLoader() thread context class loader}.
181 *
182 * Upon any error in establishing these settings, default parameters
183 * are used. It is possible to disable use of threads by using a
184 * factory that may return {@code null}, in which case some tasks may
185 * never execute. While possible, it is strongly discouraged to set
186 * the parallelism property to zero, which may be internally
187 * overridden in the presence of intrinsically async tasks.
188 *
189 * @implNote This implementation restricts the maximum number of
190 * running threads to 32767. Attempts to create pools with greater
191 * than the maximum number result in {@code
192 * IllegalArgumentException}. Also, this implementation rejects
193 * submitted tasks (that is, by throwing {@link
194 * RejectedExecutionException}) only when the pool is shut down or
195 * internal resources have been exhausted.
196 *
197 * @since 1.7
198 * @author Doug Lea
199 */
200 public class ForkJoinPool extends AbstractExecutorService
201 implements ScheduledExecutorService {
202
203 /*
204 * Implementation Overview
205 *
206 * This class and its nested classes provide the main
207 * functionality and control for a set of worker threads. Because
208 * most internal methods and nested classes are interrelated,
209 * their main rationale and descriptions are presented here;
210 * individual methods and nested classes contain only brief
211 * comments about details. Broadly: submissions from non-FJ
212 * threads enter into submission queues. Workers take these tasks
213 * and typically split them into subtasks that may be stolen by
214 * other workers. Work-stealing based on randomized scans
215 * generally leads to better throughput than "work dealing" in
216 * which producers assign tasks to idle threads, in part because
217 * threads that have finished other tasks before the signalled
218 * thread wakes up (which can be a long time) can take the task
219 * instead. Preference rules give first priority to processing
220 * tasks from their own queues (LIFO or FIFO, depending on mode),
221 * then to randomized FIFO steals of tasks in other queues.
222 *
223 * This framework began as vehicle for supporting structured
224 * parallelism using work-stealing, designed to work best when
225 * tasks are dag-structured (wrt completion dependencies), nested
226 * (generated using recursion or completions), of reasonable
227 * granularity, independent (wrt memory and resources) and where
228 * callers participate in task execution. These are properties
229 * that anyone aiming for efficient parallel multicore execution
230 * should design for. Over time, the scalability advantages of
231 * this framework led to extensions to better support more diverse
232 * usage contexts, amounting to weakenings or violations of each
233 * of these properties. Accommodating them may compromise
234 * performance, but mechanics discussed below include tradeoffs
235 * attempting to arrange that no single performance issue dominates.
236 *
237 * Here's a brief history of major revisions, each also with other
238 * minor features and changes.
239 *
240 * 1. Only handle recursively structured computational tasks
241 * 2. Async (FIFO) mode and striped submission queues
242 * 3. Completion-based tasks (mainly CountedCompleters)
243 * 4. CommonPool and parallelStream support
244 * 5. InterruptibleTasks for externally submitted tasks
245 * 6. Support ScheduledExecutorService methods
246 *
247 * Most changes involve adaptions of base algorithms using
248 * combinations of static and dynamic bitwise mode settings (both
249 * here and in ForkJoinTask), and subclassing of ForkJoinTask.
250 * There are a fair number of odd code constructions and design
251 * decisions for components that reside at the edge of Java vs JVM
252 * functionality.
253 *
254 * WorkQueues
255 * ==========
256 *
257 * Most operations occur within work-stealing queues (in nested
258 * class WorkQueue). These are special forms of Deques that
259 * support only three of the four possible end-operations -- push,
260 * pop, and poll (aka steal), under the further constraints that
261 * push and pop are called only from the owning thread (or, as
262 * extended here, under a lock), while poll may be called from
263 * other threads. (If you are unfamiliar with them, you probably
264 * want to read Herlihy and Shavit's book "The Art of
265 * Multiprocessor programming", chapter 16 describing these in
266 * more detail before proceeding.) The main work-stealing queue
267 * design is roughly similar to those in the papers "Dynamic
268 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
269 * (http://research.sun.com/scalable/pubs/index.html) and
270 * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
271 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
272 * The main differences ultimately stem from GC requirements that
273 * we null out taken slots as soon as we can, to maintain as small
274 * a footprint as possible even in programs generating huge
275 * numbers of tasks. To accomplish this, we shift the CAS
276 * arbitrating pop vs poll (steal) from being on the indices
277 * ("base" and "top") to the slots themselves. These provide the
278 * primary required memory ordering -- see "Correct and Efficient
279 * Work-Stealing for Weak Memory Models" by Le, Pop, Cohen, and
280 * Nardelli, PPoPP 2013
281 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
282 * analysis of memory ordering requirements in work-stealing
283 * algorithms similar to the one used here. We use per-operation
284 * ordered writes of various kinds for accesses when required.
285 *
286 * We also support a user mode in which local task processing is
287 * in FIFO, not LIFO order, simply by using a local version of
288 * poll rather than pop. This can be useful in message-passing
289 * frameworks in which tasks are never joined, although with
290 * increased contention among task producers and consumers. Also,
291 * the same data structure (and class) is used for "submission
292 * queues" (described below) holding externally submitted tasks,
293 * that differ only in that a lock (using field "phase"; see below) is
294 * required by external callers to push and pop tasks.
295 *
296 * Adding tasks then takes the form of a classic array push(task)
297 * in a circular buffer:
298 * q.array[q.top++ % length] = task;
299 *
300 * The actual code needs to null-check and size-check the array,
301 * uses masking, not mod, for indexing a power-of-two-sized array,
302 * enforces memory ordering, supports resizing, and possibly
303 * signals waiting workers to start scanning (described below),
304 * which requires stronger forms of order accesses.
305 *
306 * The pop operation (always performed by owner) is of the form:
307 * if ((task = getAndSet(q.array, (q.top-1) % length, null)) != null)
308 * decrement top and return task;
309 * If this fails, the queue is empty. This operation is one part
310 * of the nextLocalTask method, that instead does a local-poll
311 * in FIFO mode.
312 *
313 * The poll operation is, basically:
314 * if (CAS nonnull task t = q.array[k = q.base % length] to null)
315 * increment base and return task;
316 *
317 * However, there are several more cases that must be dealt with.
318 * Some of them are just due to asynchrony; others reflect
319 * contention and stealing policies. Stepping through them
320 * illustrates some of the implementation decisions in this class.
321 *
322 * * Slot k must be read with an acquiring read, which it must
323 * anyway to dereference and run the task if the (acquiring)
324 * CAS succeeds.
325 *
326 * * q.base may change between reading and using its value to
327 * index the slot. To avoid trying to use the wrong t, the
328 * index and slot must be reread (not necessarily immediately)
329 * until consistent, unless this is a local poll by owner, in
330 * which case this form of inconsistency can only appear as t
331 * being null, below.
332 *
333 * * Similarly, q.array may change (due to a resize), unless this
334 * is a local poll by owner. Otherwise, when t is present, this
335 * only needs consideration on CAS failure (since a CAS
336 * confirms the non-resized case.)
337 *
338 * * t may appear null because a previous poll operation has not
339 * yet incremented q.base, so the read is from an already-taken
340 * index. This form of stall reflects the non-lock-freedom of
341 * the poll operation. Stalls can be detected by observing that
342 * q.base doesn't change on repeated reads of null t and when
343 * no other alternatives apply, spin-wait for it to settle. To
344 * reduce producing these kinds of stalls by other stealers, we
345 * encourage timely writes to indices using otherwise
346 * unnecessarily strong writes.
347 *
348 * * The CAS may fail, in which case we may want to retry unless
349 * there is too much contention. One goal is to balance and
350 * spread out the many forms of contention that may be
351 * encountered across polling and other operations to avoid
352 * sustained performance degradations. Across all cases where
353 * alternatives exist, a bounded number of CAS misses or stalls
354 * are tolerated (for slots, ctl, and elsewhere described
355 * below) before taking alternative action. These may move
356 * contention or retries elsewhere, which is still preferable
357 * to single-point bottlenecks.
358 *
359 * * Even though the check "top == base" is quiescently accurate
360 * to determine whether a queue is empty, it is not of much use
361 * when deciding whether to try to poll or repoll after a
362 * failure. Both top and base may move independently, and both
363 * lag updates to the underlying array. To reduce memory
364 * contention, non-owners avoid reading the "top" when
365 * possible, by using one-ahead reads to check whether to
366 * repoll, relying on the fact that a non-empty queue does not
367 * have two null slots in a row, except in cases (resizes and
368 * shifts) that can be detected with a secondary recheck that
369 * is less likely to conflict with owner writes.
370 *
371 * The poll operations in q.poll(), runWorker(), helpJoin(), and
372 * elsewhere differ with respect to whether other queues are
373 * available to try, and the presence or nature of screening steps
374 * when only some kinds of tasks can be taken. When alternatives
375 * (or failing) is an option, they uniformly give up after
376 * bounded numbers of stalls and/or CAS failures, which reduces
377 * contention when too many workers are polling too few tasks.
378 * Overall, in the aggregate, we ensure probabilistic
379 * non-blockingness of work-stealing at least until checking
380 * quiescence (which is intrinsically blocking): If an attempted
381 * steal fails in these ways, a scanning thief chooses a different
382 * target to try next. In contexts where alternatives aren't
383 * available, and when progress conditions can be isolated to
384 * values of a single variable, simple spinloops (using
385 * Thread.onSpinWait) are used to reduce memory traffic.
386 *
387 * WorkQueues are also used in a similar way for tasks submitted
388 * to the pool. We cannot mix these tasks in the same queues used
389 * by workers. Instead, we randomly associate submission queues
390 * with submitting threads (or carriers when using VirtualThreads)
391 * using a form of hashing. The ThreadLocalRandom probe value
392 * serves as a hash code for choosing existing queues, and may be
393 * randomly repositioned upon contention with other submitters.
394 * In essence, submitters act like workers except that they are
395 * restricted to executing local tasks that they submitted (or
396 * when known, subtasks thereof). Insertion of tasks in shared
397 * mode requires a lock. We use only a simple spinlock (as one
398 * role of field "phase") because submitters encountering a busy
399 * queue move to a different position to use or create other
400 * queues. They (spin) block when registering new queues, or
401 * indirectly elsewhere, by revisiting later.
402 *
403 * Management
404 * ==========
405 *
406 * The main throughput advantages of work-stealing stem from
407 * decentralized control -- workers mostly take tasks from
408 * themselves or each other, at rates that can exceed a billion
409 * per second. Most non-atomic control is performed by some form
410 * of scanning across or within queues. The pool itself creates,
411 * activates (enables scanning for and running tasks),
412 * deactivates, blocks, and terminates threads, all with minimal
413 * central information. There are only a few properties that we
414 * can globally track or maintain, so we pack them into a small
415 * number of variables, often maintaining atomicity without
416 * blocking or locking. Nearly all essentially atomic control
417 * state is held in a few variables that are by far most often
418 * read (not written) as status and consistency checks. We pack as
419 * much information into them as we can.
420 *
421 * Field "ctl" contains 64 bits holding information needed to
422 * atomically decide to add, enqueue (on an event queue), and
423 * dequeue and release workers. To enable this packing, we
424 * restrict maximum parallelism to (1<<15)-1 (which is far in
425 * excess of normal operating range) to allow ids, counts, and
426 * their negations (used for thresholding) to fit into 16bit
427 * subfields.
428 *
429 * Field "runState" and per-WorkQueue field "phase" play similar
430 * roles, as lockable, versioned counters. Field runState also
431 * includes monotonic event bits:
432 * * SHUTDOWN: no more external tasks accepted; STOP when quiescent
433 * * STOP: no more tasks run, and deregister all workers
434 * * CLEANED: all unexecuted tasks have been cancelled
435 * * TERMINATED: all workers deregistered and all queues cleaned
436 * The version tags enable detection of state changes (by
437 * comparing two reads) modulo bit wraparound. The bit range in
438 * each case suffices for purposes of determining quiescence,
439 * termination, avoiding ABA-like errors, and signal control, most
440 * of which are ultimately based on at most 15bit ranges (due to
441 * 32767 max total workers). RunState updates do not need to be
442 * atomic with respect to ctl updates, but because they are not,
443 * some care is required to avoid stalls. The seqLock properties
444 * detect changes and conditionally upgrade to coordinate with
445 * updates. It is typically held for less than a dozen
446 * instructions unless the queue array is being resized, during
447 * which contention is rare. To be conservative, lockRunState is
448 * implemented as a spin/sleep loop. Here and elsewhere spin
449 * constants are short enough to apply even on systems with few
450 * available processors. In addition to checking pool status,
451 * reads of runState sometimes serve as acquire fences before
452 * reading other fields.
453 *
454 * Field "parallelism" holds the target parallelism (normally
455 * corresponding to pool size). Users can dynamically reset target
456 * parallelism, but is only accessed when signalling or awaiting
457 * work, so only slowly has an effect in creating threads or
458 * letting them time out and terminate when idle.
459 *
460 * Array "queues" holds references to WorkQueues. It is updated
461 * (only during worker creation and termination) under the
462 * runState lock. It is otherwise concurrently readable but reads
463 * for use in scans (see below) are always prefaced by a volatile
464 * read of runState (or equivalent constructions), ensuring that
465 * its state is current at the point it is used (which is all we
466 * require). To simplify index-based operations, the array size is
467 * always a power of two, and all readers must tolerate null
468 * slots. Worker queues are at odd indices. Worker phase ids
469 * masked with SMASK match their index. Shared (submission) queues
470 * are at even indices. Grouping them together in this way aids in
471 * task scanning: At top-level, both kinds of queues should be
472 * sampled with approximately the same probability, which is
473 * simpler if they are all in the same array. But we also need to
474 * identify what kind they are without looking at them, leading to
475 * this odd/even scheme. One disadvantage is that there are
476 * usually many fewer submission queues, so there can be many
477 * wasted probes (null slots). But this is still cheaper than
478 * alternatives. Other loops over the queues array vary in origin
479 * and stride depending on whether they cover only submission
480 * (even) or worker (odd) queues or both, and whether they require
481 * randomness (in which case cyclically exhaustive strides may be
482 * used).
483 *
484 * All worker thread creation is on-demand, triggered by task
485 * submissions, replacement of terminated workers, and/or
486 * compensation for blocked workers. However, all other support
487 * code is set up to work with other policies. To ensure that we
488 * do not hold on to worker or task references that would prevent
489 * GC, all accesses to workQueues in waiting, signalling, and
490 * control methods are via indices into the queues array (which is
491 * one source of some of the messy code constructions here). In
492 * essence, the queues array serves as a weak reference
493 * mechanism. In particular, the stack top subfield of ctl stores
494 * indices, not references. Operations on queues obtained from
495 * these indices remain valid (with at most some unnecessary extra
496 * work) even if an underlying worker failed and was replaced by
497 * another at the same index. During termination, worker queue
498 * array updates are disabled.
499 *
500 * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
501 * cannot let workers spin indefinitely scanning for tasks when
502 * none can be found immediately, and we cannot start/resume
503 * workers unless there appear to be tasks available. On the
504 * other hand, we must quickly prod them into action when new
505 * tasks are submitted or generated. These latencies are mainly a
506 * function of JVM park/unpark (and underlying OS) performance,
507 * which can be slow and variable (even though usages are
508 * streamlined as much as possible). In many usages, ramp-up time
509 * is the main limiting factor in overall performance, which is
510 * compounded at program start-up by JIT compilation and
511 * allocation. On the other hand, throughput degrades when too
512 * many threads poll for too few tasks. (See below.)
513 *
514 * The "ctl" field atomically maintains total and "released"
515 * worker counts, plus the head of the available worker queue
516 * (actually stack, represented by the lower 32bit subfield of
517 * ctl). Released workers are those known to be scanning for
518 * and/or running tasks (we cannot accurately determine
519 * which). Unreleased ("available") workers are recorded in the
520 * ctl stack. These workers are made eligible for signalling by
521 * enqueuing in ctl (see method deactivate). This "queue" is a
522 * form of Treiber stack. This is ideal for activating threads in
523 * most-recently used order, and improves performance and
524 * locality, outweighing the disadvantages of being prone to
525 * contention and inability to release a worker unless it is
526 * topmost on stack. The top stack state holds the value of the
527 * "phase" field of the worker: its index and status, plus a
528 * version counter that, in addition to the count subfields (also
529 * serving as version stamps) provide protection against Treiber
530 * stack ABA effects.
531 *
532 * Creating workers. To create a worker, we pre-increment counts
533 * (serving as a reservation), and attempt to construct a
534 * ForkJoinWorkerThread via its factory. On starting, the new
535 * thread first invokes registerWorker, where it is assigned an
536 * index in the queues array (expanding the array if necessary).
537 * Upon any exception across these steps, or null return from
538 * factory, deregisterWorker adjusts counts and records
539 * accordingly. If a null return, the pool continues running with
540 * fewer than the target number workers. If exceptional, the
541 * exception is propagated, generally to some external caller.
542 *
543 * WorkQueue field "phase" encodes the queue array id in lower
544 * bits, and otherwise acts similarly to the pool runState field:
545 * The "IDLE" bit is clear while active (either a released worker
546 * or a locked external queue), with other bits serving as a
547 * version counter to distinguish changes across multiple reads.
548 * Note that phase field updates lag queue CAS releases; seeing a
549 * non-idle phase does not guarantee that the worker is available
550 * (and so is never checked in this way).
551 *
552 * The ctl field also serves as the basis for memory
553 * synchronization surrounding activation. This uses a more
554 * efficient version of a Dekker-like rule that task producers and
555 * consumers sync with each other by both writing/CASing ctl (even
556 * if to its current value). However, rather than CASing ctl to
557 * its current value in the common case where no action is
558 * required, we reduce write contention by ensuring that
559 * signalWork invocations are prefaced with a fully fenced memory
560 * access (which is usually needed anyway).
561 *
562 * Signalling. Signals (in signalWork) cause new or reactivated
563 * workers to scan for tasks. Method signalWork and its callers
564 * try to approximate the unattainable goal of having the right
565 * number of workers activated for the tasks at hand, but must err
566 * on the side of too many workers vs too few to avoid stalls:
567 *
568 * * If computations are purely tree structured, it suffices for
569 * every worker to activate another when it pushes a task into
570 * an empty queue, resulting in O(log(#threads)) steps to full
571 * activation. Emptiness must be conservatively approximated,
572 * which may result in unnecessary signals. Also, to reduce
573 * resource usages in some cases, at the expense of slower
574 * startup in others, activation of an idle thread is preferred
575 * over creating a new one, here and elsewhere.
576 *
577 * * At the other extreme, if "flat" tasks (those that do not in
578 * turn generate others) come in serially from only a single
579 * producer, each worker taking a task from a queue should
580 * propagate a signal if there are more tasks in that
581 * queue. This is equivalent to, but generally faster than,
582 * arranging the stealer take multiple tasks, re-pushing one or
583 * more on its own queue, and signalling (because its queue is
584 * empty), also resulting in logarithmic full activation
585 * time. If tasks do not not engage in unbounded loops based on
586 * the actions of other workers with unknown dependencies loop,
587 * this form of proagation can be limited to one signal per
588 * activation (phase change). We distinguish the cases by
589 * further signalling only if the task is an InterruptibleTask
590 * (see below), which are the only supported forms of task that
591 * may do so.
592 *
593 * * Because we don't know about usage patterns (or most commonly,
594 * mixtures), we use both approaches, which present even more
595 * opportunities to over-signal. (Failure to distinguish these
596 * cases in terms of submission methods was arguably an early
597 * design mistake.) Note that in either of these contexts,
598 * signals may be (and often are) unnecessary because active
599 * workers continue scanning after running tasks without the
600 * need to be signalled (which is one reason work stealing is
601 * often faster than alternatives), so additional workers
602 * aren't needed.
603 *
604 * * For rapidly branching tasks that require full pool resources,
605 * oversignalling is OK, because signalWork will soon have no
606 * more workers to create or reactivate. But for others (mainly
607 * externally submitted tasks), overprovisioning may cause very
608 * noticeable slowdowns due to contention and resource
609 * wastage. We reduce impact by deactivating workers when
610 * queues don't have accessible tasks, but reactivating and
611 * rescanning if other tasks remain.
612 *
613 * * Despite these, signal contention and overhead effects still
614 * occur during ramp-up and ramp-down of small computations.
615 *
616 * Scanning. Method runWorker performs top-level scanning for (and
617 * execution of) tasks by polling a pseudo-random permutation of
618 * the array (by starting at a given index, and using a constant
619 * cyclically exhaustive stride.) It uses the same basic polling
620 * method as WorkQueue.poll(), but restarts with a different
621 * permutation on each invocation. The pseudorandom generator
622 * need not have high-quality statistical properties in the long
623 * term. We use Marsaglia XorShifts, seeded with the Weyl sequence
624 * from ThreadLocalRandom probes, which are cheap and
625 * suffice. Each queue's polling attempts to avoid becoming stuck
626 * when other scanners/pollers stall. Scans do not otherwise
627 * explicitly take into account core affinities, loads, cache
628 * localities, etc, However, they do exploit temporal locality
629 * (which usually approximates these) by preferring to re-poll
630 * from the same queue after a successful poll before trying
631 * others, which also reduces bookkeeping, cache traffic, and
632 * scanning overhead. But it also reduces fairness, which is
633 * partially counteracted by giving up on detected interference
634 * (which also reduces contention when too many workers try to
635 * take small tasks from the same queue).
636 *
637 * Deactivation. When no tasks are found by a worker in runWorker,
638 * it tries to deactivate()), giving up (and rescanning) on "ctl"
639 * contention. To avoid missed signals during deactivation, the
640 * method rescans and reactivates if there may have been a missed
641 * signal during deactivation. To reduce false-alarm reactivations
642 * while doing so, we scan multiple times (analogously to method
643 * quiescent()) before trying to reactivate. Because idle workers
644 * are often not yet blocked (parked), we use a WorkQueue field to
645 * advertise that a waiter actually needs unparking upon signal.
646 *
647 * Quiescence. Workers scan looking for work, giving up when they
648 * don't find any, without being sure that none are available.
649 * However, some required functionality relies on consensus about
650 * quiescence (also termination, discussed below). The count
651 * fields in ctl allow accurate discovery of states in which all
652 * workers are idle. However, because external (asynchronous)
653 * submitters are not part of this vote, these mechanisms
654 * themselves do not guarantee that the pool is in a quiescent
655 * state with respect to methods isQuiescent, shutdown (which
656 * begins termination when quiescent), helpQuiesce, and indirectly
657 * others including tryCompensate. Method quiescent() is used in
658 * all of these contexts. It provides checks that all workers are
659 * idle and there are no submissions that they could poll if they
660 * were not idle, retrying on inconsistent reads of queues and
661 * using the runState seqLock to retry on queue array updates.
662 * (It also reports quiescence if the pool is terminating.) A true
663 * report means only that there was a moment at which quiescence
664 * held. False negatives are inevitable (for example when queues
665 * indices lag updates, as described above), which is accommodated
666 * when (tentatively) idle by scanning for work etc, and then
667 * re-invoking. This includes cases in which the final unparked
668 * thread (in deactivate()) uses quiescent() to check for tasks
669 * that could have been added during a race window that would not
670 * be accompanied by a signal, in which case re-activating itself
671 * (or any other worker) to rescan. Method helpQuiesce acts
672 * similarly but cannot rely on ctl counts to determine that all
673 * workers are inactive because the caller and any others
674 * executing helpQuiesce are not included in counts.
675 *
676 * Termination. Termination is initiated by setting STOP in one of
677 * three ways (via methods tryTerminate and quiescent):
678 * * A call to shutdownNow, in which case all workers are
679 * interrupted, first ensuring that the queues array is stable,
680 * to avoid missing any workers.
681 * * A call to shutdown when quiescent, in which case method
682 * releaseWaiters is used to dequeue them, at which point they notice
683 * STOP state and return from runWorker to deregister();
684 * * The pool becomes quiescent() sometime after shutdown has
685 * been called, in which case releaseWaiters is also used to
686 * propagate as they deregister.
687 * Upon STOP, each worker, as well as external callers to
688 * tryTerminate (via close() etc) race to set CLEANED, indicating
689 * that all tasks have been cancelled. The implementation (method
690 * cleanQueues) balances cases in which there may be many tasks to
691 * cancel (benefitting from parallelism) versus contention and
692 * interference when many threads try to poll remaining queues,
693 * while also avoiding unnecessary rechecks, by using
694 * pseudorandom scans and giving up upon interference. This may be
695 * retried by the same caller only when there are no more
696 * registered workers, using the same criteria as method
697 * quiescent. When CLEANED and all workers have deregistered,
698 * TERMINATED is set, also signalling any caller of
699 * awaitTermination or close. Because shutdownNow-based
700 * termination relies on interrupts, there is no guarantee that
701 * workers will stop if their tasks ignore interrupts. Class
702 * InterruptibleTask (see below) further arranges runState checks
703 * before executing task bodies, and ensures interrupts while
704 * terminating. Even so, there are no guarantees because tasks may
705 * internally enter unbounded loops.
706 *
707 * Trimming workers. To release resources after periods of lack of
708 * use, a worker starting to wait when the pool is quiescent will
709 * time out and terminate if the pool has remained quiescent for
710 * period given by field keepAlive (default 60sec), which applies
711 * to the first timeout of a quiescent pool. Subsequent cases use
712 * minimal delays such that, if still quiescent, all will be
713 * released soon thereafter. This is checked by setting the
714 * "source" field of signallee to an invalid value, that will
715 * remain invalid only if it did not process any tasks.
716 *
717 * Joining Tasks
718 * =============
719 *
720 * The "Join" part of ForkJoinPools consists of a set of
721 * mechanisms that sometimes or always (depending on the kind of
722 * task) avoid context switching or adding worker threads when one
723 * task would otherwise be blocked waiting for completion of
724 * another, basically, just by running that task or one of its
725 * subtasks if not already taken. These mechanics are disabled for
726 * InterruptibleTasks, that guarantee that callers do not execute
727 * submitted tasks.
728 *
729 * The basic structure of joining is an extended spin/block scheme
730 * in which workers check for task completions status between
731 * steps to find other work, until relevant pool state stabilizes
732 * enough to believe that no such tasks are available, at which
733 * point blocking. This is usually a good choice of when to block
734 * that would otherwise be harder to approximate.
735 *
736 * These forms of helping may increase stack space usage, but that
737 * space is bounded in tree/dag structured procedurally parallel
738 * designs to be no more than that if a task were executed only by
739 * the joining thread. This is arranged by associated task
740 * subclasses that also help detect and control the ways in which
741 * this may occur.
742 *
743 * Normally, the first option when joining a task that is not done
744 * is to try to take it from the local queue and run it. Method
745 * tryRemoveAndExec tries to do so. For tasks with any form of
746 * subtasks that must be completed first, we try to locate these
747 * subtasks and run them as well. This is easy when local, but
748 * when stolen, steal-backs are restricted to the same rules as
749 * stealing (polling), which requires additional bookkeeping and
750 * scanning. This cost is still very much worthwhile because of
751 * its impact on task scheduling and resource control.
752 *
753 * The two methods for finding and executing subtasks vary in
754 * details. The algorithm in helpJoin entails a form of "linear
755 * helping". Each worker records (in field "source") the index of
756 * the internal queue from which it last stole a task. (Note:
757 * because chains cannot include even-numbered external queues,
758 * they are ignored, and 0 is an OK default. However, the source
759 * field is set anyway, or eventually to DROPPED, to ensure
760 * volatile memory synchronization effects.) The scan in method
761 * helpJoin uses these markers to try to find a worker to help
762 * (i.e., steal back a task from and execute it) that could make
763 * progress toward completion of the actively joined task. Thus,
764 * the joiner executes a task that would be on its own local deque
765 * if the to-be-joined task had not been stolen. This is a
766 * conservative variant of the approach described in Wagner &
767 * Calder "Leapfrogging: a portable technique for implementing
768 * efficient futures" SIGPLAN Notices, 1993
769 * (http://portal.acm.org/citation.cfm?id=155354). It differs
770 * mainly in that we only record queues, not full dependency
771 * links. This requires a linear scan of the queues to locate
772 * stealers, but isolates cost to when it is needed, rather than
773 * adding to per-task overhead. For CountedCompleters, the
774 * analogous method helpComplete doesn't need stealer-tracking,
775 * but requires a similar (but simpler) check of completion
776 * chains.
777 *
778 * In either case, searches can fail to locate stealers when
779 * stalls delay recording sources or issuing subtasks. We avoid
780 * some of these cases by using snapshotted values of ctl as a
781 * check that the numbers of workers are not changing, along with
782 * rescans to deal with contention and stalls. But even when
783 * accurately identified, stealers might not ever produce a task
784 * that the joiner can in turn help with.
785 *
786 * Related method helpAsyncBlocker does not directly rely on
787 * subtask structure, but instead avoids or postpones blocking of
788 * tagged tasks (CompletableFuture.AsynchronousCompletionTask) by
789 * executing other asyncs that can be processed in any order.
790 * This is currently invoked only in non-join-based blocking
791 * contexts from classes CompletableFuture and
792 * SubmissionPublisher, that could be further generalized.
793 *
794 * When any of the above fail to avoid blocking, we rely on
795 * "compensation" -- an indirect form of context switching that
796 * either activates an existing worker to take the place of the
797 * blocked one, or expands the number of workers.
798 *
799 * Compensation does not by default aim to keep exactly the target
800 * parallelism number of unblocked threads running at any given
801 * time. Some previous versions of this class employed immediate
802 * compensations for any blocked join. However, in practice, the
803 * vast majority of blockages are transient byproducts of GC and
804 * other JVM or OS activities that are made worse by replacement
805 * by causing longer-term oversubscription. These are inevitable
806 * without (unobtainably) perfect information about whether worker
807 * creation is actually necessary. False alarms are common enough
808 * to negatively impact performance, so compensation is by default
809 * attempted only when it appears possible that the pool could
810 * stall due to lack of any unblocked workers. However, we allow
811 * users to override defaults using the long form of the
812 * ForkJoinPool constructor. The compensation mechanism may also
813 * be bounded. Bounds for the commonPool better enable JVMs to
814 * cope with programming errors and abuse before running out of
815 * resources to do so.
816 *
817 * The ManagedBlocker extension API can't use helping so relies
818 * only on compensation in method awaitBlocker. This API was
819 * designed to highlight the uncertainty of compensation decisions
820 * by requiring implementation of method isReleasable to abort
821 * compensation during attempts to obtain a stable snapshot. But
822 * users now rely upon the fact that if isReleasable always
823 * returns false, the API can be used to obtain precautionary
824 * compensation, which is sometimes the only reasonable option
825 * when running unknown code in tasks; which is now supported more
826 * simply (see method beginCompensatedBlock).
827 *
828 * Common Pool
829 * ===========
830 *
831 * The static common pool always exists after static
832 * initialization. Since it (or any other created pool) need
833 * never be used, we minimize initial construction overhead and
834 * footprint to the setup of about a dozen fields, although with
835 * some System property parsing properties are set. The common pool is
836 * distinguished by having a null workerNamePrefix (which is an
837 * odd convention, but avoids the need to decode status in factory
838 * classes). It also has PRESET_SIZE config set if parallelism
839 * was configured by system property.
840 *
841 * When external threads use the common pool, they can perform
842 * subtask processing (see helpComplete and related methods) upon
843 * joins, unless they are submitted using ExecutorService
844 * submission methods, which implicitly disallow this. This
845 * caller-helps policy makes it sensible to set common pool
846 * parallelism level to one (or more) less than the total number
847 * of available cores, or even zero for pure caller-runs. External
848 * threads waiting for joins first check the common pool for their
849 * task, which fails quickly if the caller did not fork to common
850 * pool.
851 *
852 * Guarantees for common pool parallelism zero are limited to
853 * tasks that are joined by their callers in a tree-structured
854 * fashion or use CountedCompleters (as is true for jdk
855 * parallelStreams). Support infiltrates several methods,
856 * including those that retry helping steps until we are sure that
857 * none apply if there are no workers. To deal with conflicting
858 * requirements, uses of the commonPool that require async because
859 * caller-runs need not apply, ensure threads are enabled (by
860 * setting parallelism) via method asyncCommonPool before
861 * proceeding. (In principle, overriding zero parallelism needs to
862 * ensure at least one worker, but due to other backward
863 * compatibility contraints, ensures two.)
864 *
865 * As a more appropriate default in managed environments, unless
866 * overridden by system properties, we use workers of subclass
867 * InnocuousForkJoinWorkerThread for the commonPool. These
868 * workers do not belong to any user-defined ThreadGroup, and
869 * clear all ThreadLocals and reset the ContextClassLoader before
870 * (re)activating to execute top-level tasks. The associated
871 * mechanics may be JVM-dependent and must access particular
872 * Thread class fields to achieve this effect.
873 *
874 * InterruptibleTasks
875 * ====================
876 *
877 * Regular ForkJoinTasks manage task cancellation (method cancel)
878 * independently from the interrupt status of threads running
879 * tasks. Interrupts are issued internally only while
880 * terminating, to wake up workers and cancel queued tasks. By
881 * default, interrupts are cleared only when necessary to ensure
882 * that calls to LockSupport.park do not loop indefinitely (park
883 * returns immediately if the current thread is interrupted).
884 *
885 * To comply with ExecutorService specs, we use subclasses of
886 * abstract class InterruptibleTask for tasks that require
887 * stronger interruption and cancellation guarantees. External
888 * submitters never run these tasks, even if in the common pool
889 * (as indicated by ForkJoinTask.noUserHelp status bit).
890 * InterruptibleTasks include a "runner" field (implemented
891 * similarly to FutureTask) to support cancel(true). Upon pool
892 * shutdown, runners are interrupted so they can cancel. Since
893 * external joining callers never run these tasks, they must await
894 * cancellation by others, which can occur along several different
895 * paths. The inability to rely on caller-runs may also require
896 * extra signalling (resulting in scanning and contention) so is
897 * done only conditionally in methods push and runworker.
898 *
899 * Across these APIs, rules for reporting exceptions for tasks
900 * with results accessed via join() differ from those via get(),
901 * which differ from those invoked using pool submit methods by
902 * non-workers (which comply with Future.get() specs). Internal
903 * usages of ForkJoinTasks ignore interrupt status when executing
904 * or awaiting completion. Otherwise, reporting task results or
905 * exceptions is preferred to throwing InterruptedExceptions,
906 * which are in turn preferred to timeouts. Similarly, completion
907 * status is preferred to reporting cancellation. Cancellation is
908 * reported as an unchecked exception by join(), and by worker
909 * calls to get(), but is otherwise wrapped in a (checked)
910 * ExecutionException.
911 *
912 * Worker Threads cannot be VirtualThreads, as enforced by
913 * requiring ForkJoinWorkerThreads in factories. There are
914 * several constructions relying on this. However as of this
915 * writing, virtual thread bodies are by default run as some form
916 * of InterruptibleTask.
917 *
918 * DelayScheduler
919 * ================
920 *
921 * This class supports ScheduledExecutorService methods by
922 * creating and starting a DelayScheduler on first use of these
923 * methods (via startDelayScheduler). The scheduler operates
924 * independently in its own thread, relaying tasks to the pool to
925 * execute when their delays elapse (see method
926 * executeEnabledScheduledTask). The only other interactions with
927 * the delayScheduler are to control shutdown and maintain
928 * shutdown-related policies in methods quiescent() and
929 * tryTerminate(). In particular, processing must deal with cases
930 * in which tasks are submitted before shutdown, but not enabled
931 * until afterwards, in which case they must bypass some screening
932 * to be allowed to run. Conversely, the DelayScheduler checks
933 * runState status and when enabled, completes termination, using
934 * only methods shutdownStatus and tryStopIfShutdown. All of these
935 * methods are final and have signatures referencing
936 * DelaySchedulers, so cannot conflict with those of any existing
937 * FJP subclasses.
938 *
939 * Memory placement
940 * ================
941 *
942 * Performance is very sensitive to placement of instances of
943 * ForkJoinPool and WorkQueues and their queue arrays, as well as
944 * the placement of their fields. Caches misses and contention due
945 * to false-sharing have been observed to slow down some programs
946 * by more than a factor of four. Effects may vary across initial
947 * memory configuarations, applications, and different garbage
948 * collectors and GC settings, so there is no perfect solution.
949 * Too much isolation may generate more cache misses in common
950 * cases (because some fields snd slots are usually read at the
951 * same time). The @Contended annotation provides only rough
952 * control (for good reason). Similarly for relying on fields
953 * being placed in size-sorted declaration order.
954 *
955 * We isolate the ForkJoinPool.ctl field that otherwise causes the
956 * most false-sharing misses with respect to other fields. Also,
957 * ForkJoinPool fields are ordered such that fields less prone to
958 * contention effects are first, offsetting those that otherwise
959 * would be, while also reducing total footprint vs using
960 * multiple @Contended regions, which tends to slow down
961 * less-contended applications. To help arrange this, some
962 * non-reference fields are declared as "long" even when ints or
963 * shorts would suffice. For class WorkQueue, an
964 * embedded @Contended region segregates fields most heavily
965 * updated by owners from those most commonly read by stealers or
966 * other management.
967 *
968 * Initial sizing and resizing of WorkQueue arrays is an even more
969 * delicate tradeoff because the best strategy systematically
970 * varies across garbage collectors. Small arrays are better for
971 * locality and reduce GC scan time, but large arrays reduce both
972 * direct false-sharing and indirect cases due to GC bookkeeping
973 * (cardmarks etc), and reduce the number of resizes, which are
974 * not especially fast because they require atomic transfers.
975 * Currently, arrays for workers are initialized to be just large
976 * enough to avoid resizing in most tree-structured tasks, but
977 * larger for external queues where both false-sharing problems
978 * and the need for resizing are more common. (Maintenance note:
979 * any changes in fields, queues, or their uses, or JVM layout
980 * policies, must be accompanied by re-evaluation of these
981 * placement and sizing decisions.)
982 *
983 * Style notes
984 * ===========
985 *
986 * Memory ordering relies mainly on atomic operations (CAS,
987 * getAndSet, getAndAdd) along with moded accesses. These use
988 * jdk-internal Unsafe for atomics and special memory modes,
989 * rather than VarHandles, to avoid initialization dependencies in
990 * other jdk components that require early parallelism. This can
991 * be awkward and ugly, but also reflects the need to control
992 * outcomes across the unusual cases that arise in very racy code
993 * with very few invariants. All atomic task slot updates use
994 * Unsafe operations requiring offset positions, not indices, as
995 * computed by method slotOffset. All fields are read into locals
996 * before use, and null-checked if they are references, even if
997 * they can never be null under current usages. Usually,
998 * computations (held in local variables) are defined as soon as
999 * logically enabled, sometimes to convince compilers that they
1000 * may be performed despite memory ordering constraints. Array
1001 * accesses using masked indices include checks (that are always
1002 * true) that the array length is non-zero to avoid compilers
1003 * inserting more expensive traps. This is usually done in a
1004 * "C"-like style of listing declarations at the heads of methods
1005 * or blocks, and using inline assignments on first encounter.
1006 * Nearly all explicit checks lead to bypass/return, not exception
1007 * throws, because they may legitimately arise during shutdown. A
1008 * few unusual loop constructions encourage (with varying
1009 * effectiveness) JVMs about where (not) to place safepoints. All
1010 * public methods screen arguments (mainly null checks) before
1011 * creating or executing tasks.
1012 *
1013 * There is a lot of representation-level coupling among classes
1014 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
1015 * fields of WorkQueue maintain data structures managed by
1016 * ForkJoinPool, so are directly accessed. There is little point
1017 * trying to reduce this, since any associated future changes in
1018 * representations will need to be accompanied by algorithmic
1019 * changes anyway. Several methods intrinsically sprawl because
1020 * they must accumulate sets of consistent reads of fields held in
1021 * local variables. Some others are artificially broken up to
1022 * reduce producer/consumer imbalances due to dynamic compilation.
1023 * There are also other coding oddities (including several
1024 * unnecessary-looking hoisted null checks) that help some methods
1025 * perform reasonably even when interpreted (not compiled).
1026 *
1027 * The order of declarations in this file is (with a few exceptions):
1028 * (1) Static configuration constants
1029 * (2) Static utility functions
1030 * (3) Nested (static) classes
1031 * (4) Fields, along with constants used when unpacking some of them
1032 * (5) Internal control methods
1033 * (6) Callbacks and other support for ForkJoinTask methods
1034 * (7) Exported methods
1035 * (8) Static block initializing statics in minimally dependent order
1036 *
1037 */
1038
1039 // static configuration constants
1040
1041 /**
1042 * Default idle timeout value (in milliseconds) for idle threads
1043 * to park waiting for new work before terminating.
1044 */
1045 static final long DEFAULT_KEEPALIVE = 60_000L;
1046
1047 /**
1048 * Undershoot tolerance for idle timeouts, also serving as the
1049 * minimum allowed timeout value.
1050 */
1051 static final long TIMEOUT_SLOP = 20L;
1052
1053 /**
1054 * The default value for common pool maxSpares. Overridable using
1055 * the "java.util.concurrent.ForkJoinPool.common.maximumSpares"
1056 * system property. The default value is far in excess of normal
1057 * requirements, but also far short of maximum capacity and typical OS
1058 * thread limits, so allows JVMs to catch misuse/abuse before
1059 * running out of resources needed to do so.
1060 */
1061 static final int DEFAULT_COMMON_MAX_SPARES = 256;
1062
1063 /**
1064 * Initial capacity of work-stealing queue array for workers.
1065 * Must be a power of two, at least 2. See above.
1066 */
1067 static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
1068
1069 /**
1070 * Initial capacity of work-stealing queue array for external queues.
1071 * Must be a power of two, at least 2. See above.
1072 */
1073 static final int INITIAL_EXTERNAL_QUEUE_CAPACITY = 1 << 9;
1074
1075 // conversions among short, int, long
1076 static final int SMASK = 0xffff; // (unsigned) short bits
1077 static final long LMASK = 0xffffffffL; // lower 32 bits of long
1078 static final long UMASK = ~LMASK; // upper 32 bits
1079
1080 // masks and sentinels for queue indices
1081 static final int MAX_CAP = 0x7fff; // max # workers
1082 static final int EXTERNAL_ID_MASK = 0x3ffe; // max external queue id
1083 static final int INVALID_ID = 0x4000; // unused external queue id
1084
1085 // pool.runState bits
1086 static final long STOP = 1L << 0; // terminating
1087 static final long SHUTDOWN = 1L << 1; // terminate when quiescent
1088 static final long CLEANED = 1L << 2; // stopped and queues cleared
1089 static final long TERMINATED = 1L << 3; // only set if STOP also set
1090 static final long RS_LOCK = 1L << 4; // lowest seqlock bit
1091
1092 // spin/sleep limits for runState locking and elsewhere
1093 static final int SPIN_WAITS = 1 << 7; // max calls to onSpinWait
1094 static final int MIN_SLEEP = 1 << 10; // approx 1 usec as nanos
1095 static final int MAX_SLEEP = 1 << 20; // approx 1 sec as nanos
1096
1097 // {pool, workQueue} config bits
1098 static final int FIFO = 1 << 0; // fifo queue or access mode
1099 static final int CLEAR_TLS = 1 << 1; // set for Innocuous workers
1100 static final int PRESET_SIZE = 1 << 2; // size was set by property
1101
1102 // others
1103 static final int DROPPED = 1 << 16; // removed from ctl counts
1104 static final int UNCOMPENSATE = 1 << 16; // tryCompensate return
1105 static final int IDLE = 1 << 16; // phase seqlock/version count
1106 static final int MIN_QUEUES_SIZE = 1 << 4; // ensure external slots
1107
1108 /*
1109 * Bits and masks for ctl and bounds are packed with 4 16 bit subfields:
1110 * RC: Number of released (unqueued) workers
1111 * TC: Number of total workers
1112 * SS: version count and status of top waiting thread
1113 * ID: poolIndex of top of Treiber stack of waiters
1114 *
1115 * When convenient, we can extract the lower 32 stack top bits
1116 * (including version bits) as sp=(int)ctl. When sp is non-zero,
1117 * there are waiting workers. Count fields may be transiently
1118 * negative during termination because of out-of-order updates.
1119 * To deal with this, we use casts in and out of "short" and/or
1120 * signed shifts to maintain signedness. Because it occupies
1121 * uppermost bits, we can add one release count using getAndAdd of
1122 * RC_UNIT, rather than CAS, when returning from a blocked join.
1123 * Other updates of multiple subfields require CAS.
1124 */
1125
1126 // Release counts
1127 static final int RC_SHIFT = 48;
1128 static final long RC_UNIT = 0x0001L << RC_SHIFT;
1129 static final long RC_MASK = 0xffffL << RC_SHIFT;
1130 // Total counts
1131 static final int TC_SHIFT = 32;
1132 static final long TC_UNIT = 0x0001L << TC_SHIFT;
1133 static final long TC_MASK = 0xffffL << TC_SHIFT;
1134
1135 /*
1136 * All atomic operations on task arrays (queues) use Unsafe
1137 * operations that take array offsets versus indices, based on
1138 * array base and shift constants established during static
1139 * initialization.
1140 */
1141 static final long ABASE;
1142 static final int ASHIFT;
1143
1144 // Static utilities
1145
1146 /**
1147 * Returns the array offset corresponding to the given index for
1148 * Unsafe task queue operations
1149 */
1150 static long slotOffset(int index) {
1151 return ((long)index << ASHIFT) + ABASE;
1152 }
1153
1154 // Nested classes
1155
1156 /**
1157 * Factory for creating new {@link ForkJoinWorkerThread}s.
1158 * A {@code ForkJoinWorkerThreadFactory} must be defined and used
1159 * for {@code ForkJoinWorkerThread} subclasses that extend base
1160 * functionality or initialize threads with different contexts.
1161 */
1162 public static interface ForkJoinWorkerThreadFactory {
1163 /**
1164 * Returns a new worker thread operating in the given pool.
1165 * Returning null or throwing an exception may result in tasks
1166 * never being executed. If this method throws an exception,
1167 * it is relayed to the caller of the method (for example
1168 * {@code execute}) causing attempted thread creation. If this
1169 * method returns null or throws an exception, it is not
1170 * retried until the next attempted creation (for example
1171 * another call to {@code execute}).
1172 *
1173 * @param pool the pool this thread works in
1174 * @return the new worker thread, or {@code null} if the request
1175 * to create a thread is rejected
1176 * @throws NullPointerException if the pool is null
1177 */
1178 public ForkJoinWorkerThread newThread(ForkJoinPool pool);
1179 }
1180
1181 /**
1182 * Default ForkJoinWorkerThreadFactory implementation; creates a
1183 * new ForkJoinWorkerThread using the system class loader as the
1184 * thread context class loader.
1185 */
1186 static final class DefaultForkJoinWorkerThreadFactory
1187 implements ForkJoinWorkerThreadFactory {
1188 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
1189 return ((pool.workerNamePrefix == null) ? // is commonPool
1190 new ForkJoinWorkerThread.InnocuousForkJoinWorkerThread(pool) :
1191 new ForkJoinWorkerThread(null, pool, true, false));
1192 }
1193 }
1194
1195 /**
1196 * Queues supporting work-stealing as well as external task
1197 * submission. See above for descriptions and algorithms.
1198 */
1199 static final class WorkQueue {
1200 // fields declared in order of their likely layout on most VMs
1201 final ForkJoinWorkerThread owner; // null if shared
1202 ForkJoinTask<?>[] array; // the queued tasks; power of 2 size
1203 int base; // index of next slot for poll
1204 final int config; // mode bits
1205
1206 // fields otherwise causing more unnecessary false-sharing cache misses
1207 @jdk.internal.vm.annotation.Contended("w")
1208 int top; // index of next slot for push
1209 @jdk.internal.vm.annotation.Contended("w")
1210 volatile int phase; // versioned active status
1211 @jdk.internal.vm.annotation.Contended("w")
1212 int stackPred; // pool stack (ctl) predecessor link
1213 @jdk.internal.vm.annotation.Contended("w")
1214 volatile int source; // source queue id (or DROPPED)
1215 @jdk.internal.vm.annotation.Contended("w")
1216 int nsteals; // number of steals from other queues
1217 @jdk.internal.vm.annotation.Contended("w")
1218 volatile int parking; // nonzero if parked in awaitWork
1219
1220 // Support for atomic operations
1221 private static final Unsafe U;
1222 private static final long PHASE;
1223 private static final long BASE;
1224 private static final long TOP;
1225 private static final long ARRAY;
1226
1227 final void updateBase(int v) {
1228 U.putIntVolatile(this, BASE, v);
1229 }
1230 final void updateTop(int v) {
1231 U.putIntOpaque(this, TOP, v);
1232 }
1233 final void updateArray(ForkJoinTask<?>[] a) {
1234 U.getAndSetReference(this, ARRAY, a);
1235 }
1236 final void unlockPhase() {
1237 U.getAndAddInt(this, PHASE, IDLE);
1238 }
1239 final boolean tryLockPhase() { // seqlock acquire
1240 int p;
1241 return (((p = phase) & IDLE) != 0 &&
1242 U.compareAndSetInt(this, PHASE, p, p + IDLE));
1243 }
1244
1245 /**
1246 * Constructor. For internal queues, most fields are initialized
1247 * upon thread start in pool.registerWorker.
1248 */
1249 WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
1250 boolean clearThreadLocals) {
1251 array = new ForkJoinTask<?>[owner == null ?
1252 INITIAL_EXTERNAL_QUEUE_CAPACITY :
1253 INITIAL_QUEUE_CAPACITY];
1254 this.owner = owner;
1255 this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg;
1256 }
1257
1258 /**
1259 * Returns an exportable index (used by ForkJoinWorkerThread).
1260 */
1261 final int getPoolIndex() {
1262 return (phase & 0xffff) >>> 1; // ignore odd/even tag bit
1263 }
1264
1265 /**
1266 * Returns the approximate number of tasks in the queue.
1267 */
1268 final int queueSize() {
1269 int unused = phase; // for ordering effect
1270 return Math.max(top - base, 0); // ignore transient negative
1271 }
1272
1273 /**
1274 * Pushes a task. Called only by owner or if already locked
1275 *
1276 * @param task the task; no-op if null
1277 * @param pool the pool to signal if was previously empty, else null
1278 * @param internal if caller owns this queue
1279 * @throws RejectedExecutionException if array could not be resized
1280 */
1281 final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
1282 int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a;
1283 if ((a = array) != null && (cap = a.length) > 0 && // else disabled
1284 task != null) {
1285 int pk = task.noUserHelp() + 1; // prev slot offset
1286 if ((room = (m = cap - 1) - (s - b)) >= 0) {
1287 top = s + 1;
1288 long pos = slotOffset(m & s);
1289 if (!internal)
1290 U.putReference(a, pos, task); // inside lock
1291 else
1292 U.getAndSetReference(a, pos, task); // fully fenced
1293 if (room == 0) // resize
1294 growArray(a, cap, s);
1295 }
1296 if (!internal)
1297 unlockPhase();
1298 if (room < 0)
1299 throw new RejectedExecutionException("Queue capacity exceeded");
1300 if ((room == 0 || a[m & (s - pk)] == null) &&
1301 pool != null)
1302 pool.signalWork(); // may have appeared empty
1303 }
1304 }
1305
1306 /**
1307 * Resizes the queue array unless out of memory.
1308 * @param a old array
1309 * @param cap old array capacity
1310 * @param s current top
1311 */
1312 private void growArray(ForkJoinTask<?>[] a, int cap, int s) {
1313 int newCap = cap << 1;
1314 if (a != null && a.length == cap && cap > 0 && newCap > 0) {
1315 ForkJoinTask<?>[] newArray = null;
1316 try {
1317 newArray = new ForkJoinTask<?>[newCap];
1318 } catch (OutOfMemoryError ex) {
1319 }
1320 if (newArray != null) { // else throw on next push
1321 int mask = cap - 1, newMask = newCap - 1;
1322 for (int k = s, j = cap; j > 0; --j, --k) {
1323 ForkJoinTask<?> u; // poll old, push to new
1324 if ((u = (ForkJoinTask<?>)U.getAndSetReference(
1325 a, slotOffset(k & mask), null)) == null)
1326 break; // lost to pollers
1327 newArray[k & newMask] = u;
1328 }
1329 updateArray(newArray); // fully fenced
1330 }
1331 }
1332 }
1333
1334 /**
1335 * Takes next task, if one exists, in order specified by mode,
1336 * so acts as either local-pop or local-poll. Called only by owner.
1337 * @param fifo nonzero if FIFO mode
1338 */
1339 private ForkJoinTask<?> nextLocalTask(int fifo) {
1340 ForkJoinTask<?> t = null;
1341 ForkJoinTask<?>[] a = array;
1342 int b = base, p = top, cap;
1343 if (p - b > 0 && a != null && (cap = a.length) > 0) {
1344 for (int m = cap - 1, s, nb;;) {
1345 if (fifo == 0 || (nb = b + 1) == p) {
1346 if ((t = (ForkJoinTask<?>)U.getAndSetReference(
1347 a, slotOffset(m & (s = p - 1)), null)) != null)
1348 updateTop(s); // else lost race for only task
1349 break;
1350 }
1351 if ((t = (ForkJoinTask<?>)U.getAndSetReference(
1352 a, slotOffset(m & b), null)) != null) {
1353 updateBase(nb);
1354 break;
1355 }
1356 while (b == (b = U.getIntAcquire(this, BASE)))
1357 Thread.onSpinWait(); // spin to reduce memory traffic
1358 if (p - b <= 0)
1359 break;
1360 }
1361 }
1362 return t;
1363 }
1364
1365 /**
1366 * Takes next task, if one exists, using configured mode.
1367 * (Always internal, never called for Common pool.)
1368 */
1369 final ForkJoinTask<?> nextLocalTask() {
1370 return nextLocalTask(config & FIFO);
1371 }
1372
1373 /**
1374 * Pops the given task only if it is at the current top.
1375 * @param task the task. Caller must ensure non-null.
1376 * @param internal if caller owns this queue
1377 */
1378 final boolean tryUnpush(ForkJoinTask<?> task, boolean internal) {
1379 boolean taken = false;
1380 ForkJoinTask<?>[] a = array;
1381 int p = top, s = p - 1, cap; long k;
1382 if (a != null && (cap = a.length) > 0 &&
1383 U.getReference(a, k = slotOffset((cap - 1) & s)) == task &&
1384 (internal || tryLockPhase())) {
1385 if (top == p && U.compareAndSetReference(a, k, task, null)) {
1386 taken = true;
1387 updateTop(s);
1388 }
1389 if (!internal)
1390 unlockPhase();
1391 }
1392 return taken;
1393 }
1394
1395 /**
1396 * Returns next task, if one exists, in order specified by mode.
1397 */
1398 final ForkJoinTask<?> peek() {
1399 ForkJoinTask<?>[] a = array;
1400 int b = base, cfg = config, p = top, cap;
1401 if (p != b && a != null && (cap = a.length) > 0) {
1402 if ((cfg & FIFO) == 0)
1403 return a[(cap - 1) & (p - 1)];
1404 else { // skip over in-progress removals
1405 ForkJoinTask<?> t;
1406 for ( ; p - b > 0; ++b) {
1407 if ((t = a[(cap - 1) & b]) != null)
1408 return t;
1409 }
1410 }
1411 }
1412 return null;
1413 }
1414
1415 /**
1416 * Polls for a task. Used only by non-owners.
1417 */
1418 final ForkJoinTask<?> poll() {
1419 for (int pb = -1, b; ; pb = b) { // track progress
1420 ForkJoinTask<?> t; int cap, nb; long k; ForkJoinTask<?>[] a;
1421 if ((a = array) == null || (cap = a.length) <= 0)
1422 break;
1423 t = (ForkJoinTask<?>)U.getReferenceAcquire(
1424 a, k = slotOffset((cap - 1) & (b = base)));
1425 Object u = U.getReference( // next slot
1426 a, slotOffset((cap - 1) & (nb = b + 1)));
1427 if (base != b) // inconsistent
1428 ;
1429 else if (t == null) {
1430 if (u == null && top - b <= 0)
1431 break; // empty
1432 if (pb == b)
1433 Thread.onSpinWait(); // stalled
1434 }
1435 else if (U.compareAndSetReference(a, k, t, null)) {
1436 updateBase(nb);
1437 return t;
1438 }
1439 }
1440 return null;
1441 }
1442
1443 // specialized execution methods
1444
1445 /**
1446 * Runs the given task, as well as remaining local tasks.
1447 */
1448 final void topLevelExec(ForkJoinTask<?> task, int fifo) {
1449 while (task != null) {
1450 task.doExec();
1451 task = nextLocalTask(fifo);
1452 }
1453 }
1454
1455 /**
1456 * Deep form of tryUnpush: Traverses from top and removes and
1457 * runs task if present.
1458 */
1459 final void tryRemoveAndExec(ForkJoinTask<?> task, boolean internal) {
1460 ForkJoinTask<?>[] a = array;
1461 int b = base, p = top, s = p - 1, d = p - b, cap;
1462 if (a != null && (cap = a.length) > 0) {
1463 for (int m = cap - 1, i = s; d > 0; --i, --d) {
1464 long k; boolean taken;
1465 ForkJoinTask<?> t = (ForkJoinTask<?>)U.getReference(
1466 a, k = slotOffset(i & m));
1467 if (t == null)
1468 break;
1469 if (t == task) {
1470 if (!internal && !tryLockPhase())
1471 break; // fail if locked
1472 if (taken =
1473 (top == p &&
1474 U.compareAndSetReference(a, k, task, null))) {
1475 if (i == s) // act as pop
1476 updateTop(s);
1477 else if (i == base) // act as poll
1478 updateBase(i + 1);
1479 else { // swap with top
1480 U.putReferenceVolatile(
1481 a, k, (ForkJoinTask<?>)
1482 U.getAndSetReference(
1483 a, slotOffset(s & m), null));
1484 updateTop(s);
1485 }
1486 }
1487 if (!internal)
1488 unlockPhase();
1489 if (taken)
1490 task.doExec();
1491 break;
1492 }
1493 }
1494 }
1495 }
1496
1497 /**
1498 * Tries to pop and run tasks within the target's computation
1499 * until done, not found, or limit exceeded.
1500 *
1501 * @param task root of computation
1502 * @param limit max runs, or zero for no limit
1503 * @return task status if known to be done
1504 */
1505 final int helpComplete(ForkJoinTask<?> task, boolean internal, int limit) {
1506 int status = 0;
1507 if (task != null) {
1508 outer: for (;;) {
1509 ForkJoinTask<?>[] a; boolean taken; Object o;
1510 int stat, p, s, cap;
1511 if ((stat = task.status) < 0) {
1512 status = stat;
1513 break;
1514 }
1515 if ((a = array) == null || (cap = a.length) <= 0)
1516 break;
1517 long k = slotOffset((cap - 1) & (s = (p = top) - 1));
1518 if (!((o = U.getReference(a, k)) instanceof CountedCompleter))
1519 break;
1520 CountedCompleter<?> t = (CountedCompleter<?>)o, f = t;
1521 for (int steps = cap;;) { // bound path
1522 if (f == task)
1523 break;
1524 if ((f = f.completer) == null || --steps == 0)
1525 break outer;
1526 }
1527 if (!internal && !tryLockPhase())
1528 break;
1529 if (taken =
1530 (top == p &&
1531 U.compareAndSetReference(a, k, t, null)))
1532 updateTop(s);
1533 if (!internal)
1534 unlockPhase();
1535 if (!taken)
1536 break;
1537 t.doExec();
1538 if (limit != 0 && --limit == 0)
1539 break;
1540 }
1541 }
1542 return status;
1543 }
1544
1545 /**
1546 * Tries to poll and run AsynchronousCompletionTasks until
1547 * none found or blocker is released
1548 *
1549 * @param blocker the blocker
1550 */
1551 final void helpAsyncBlocker(ManagedBlocker blocker) {
1552 for (;;) {
1553 ForkJoinTask<?> t; ForkJoinTask<?>[] a; int b, cap; long k;
1554 if ((a = array) == null || (cap = a.length) <= 0)
1555 break;
1556 t = (ForkJoinTask<?>)U.getReferenceAcquire(
1557 a, k = slotOffset((cap - 1) & (b = base)));
1558 if (t == null) {
1559 if (top - b <= 0)
1560 break;
1561 }
1562 else if (!(t instanceof CompletableFuture
1563 .AsynchronousCompletionTask))
1564 break;
1565 if (blocker != null && blocker.isReleasable())
1566 break;
1567 if (base == b && t != null &&
1568 U.compareAndSetReference(a, k, t, null)) {
1569 updateBase(b + 1);
1570 t.doExec();
1571 }
1572 }
1573 }
1574
1575 // misc
1576
1577 /**
1578 * Cancels all local tasks. Called only by owner.
1579 */
1580 final void cancelTasks() {
1581 for (ForkJoinTask<?> t; (t = nextLocalTask(0)) != null; ) {
1582 try {
1583 t.cancel(false);
1584 } catch (Throwable ignore) {
1585 }
1586 }
1587 }
1588
1589 /**
1590 * Returns true if internal and not known to be blocked.
1591 */
1592 final boolean isApparentlyUnblocked() {
1593 Thread wt; Thread.State s;
1594 return ((wt = owner) != null && (phase & IDLE) != 0 &&
1595 (s = wt.getState()) != Thread.State.BLOCKED &&
1596 s != Thread.State.WAITING &&
1597 s != Thread.State.TIMED_WAITING);
1598 }
1599
1600 static {
1601 U = Unsafe.getUnsafe();
1602 Class<WorkQueue> klass = WorkQueue.class;
1603 PHASE = U.objectFieldOffset(klass, "phase");
1604 BASE = U.objectFieldOffset(klass, "base");
1605 TOP = U.objectFieldOffset(klass, "top");
1606 ARRAY = U.objectFieldOffset(klass, "array");
1607 }
1608 }
1609
1610 // static fields (initialized in static initializer below)
1611
1612 /**
1613 * Creates a new ForkJoinWorkerThread. This factory is used unless
1614 * overridden in ForkJoinPool constructors.
1615 */
1616 public static final ForkJoinWorkerThreadFactory
1617 defaultForkJoinWorkerThreadFactory;
1618
1619 /**
1620 * Common (static) pool. Non-null for public use unless a static
1621 * construction exception, but internal usages null-check on use
1622 * to paranoically avoid potential initialization circularities
1623 * as well as to simplify generated code.
1624 */
1625 static final ForkJoinPool common;
1626
1627 /**
1628 * Sequence number for creating worker names
1629 */
1630 private static volatile int poolIds;
1631
1632 /**
1633 * For VirtualThread intrinsics
1634 */
1635 private static final JavaLangAccess JLA;
1636
1637 // fields declared in order of their likely layout on most VMs
1638 volatile CountDownLatch termination; // lazily constructed
1639 final Predicate<? super ForkJoinPool> saturate;
1640 final ForkJoinWorkerThreadFactory factory;
1641 final UncaughtExceptionHandler ueh; // per-worker UEH
1642 final SharedThreadContainer container;
1643 final String workerNamePrefix; // null for common pool
1644 final String poolName;
1645 volatile DelayScheduler delayScheduler; // lazily constructed
1646 WorkQueue[] queues; // main registry
1647 volatile long runState; // versioned, lockable
1648 final long keepAlive; // milliseconds before dropping if idle
1649 final long config; // static configuration bits
1650 volatile long stealCount; // collects worker nsteals
1651 volatile long threadIds; // for worker thread names
1652
1653 @jdk.internal.vm.annotation.Contended("fjpctl") // segregate
1654 volatile long ctl; // main pool control
1655 @jdk.internal.vm.annotation.Contended("fjpctl") // colocate
1656 int parallelism; // target number of workers
1657
1658 // Support for atomic operations
1659 private static final Unsafe U;
1660 private static final long CTL;
1661 private static final long RUNSTATE;
1662 private static final long PARALLELISM;
1663 private static final long THREADIDS;
1664 private static final long TERMINATION;
1665 private static final Object POOLIDS_BASE;
1666 private static final long POOLIDS;
1667
1668 private boolean compareAndSetCtl(long c, long v) {
1669 return U.compareAndSetLong(this, CTL, c, v);
1670 }
1671 private long compareAndExchangeCtl(long c, long v) {
1672 return U.compareAndExchangeLong(this, CTL, c, v);
1673 }
1674 private long getAndAddCtl(long v) {
1675 return U.getAndAddLong(this, CTL, v);
1676 }
1677 private long incrementThreadIds() {
1678 return U.getAndAddLong(this, THREADIDS, 1L);
1679 }
1680 private static int getAndAddPoolIds(int x) {
1681 return U.getAndAddInt(POOLIDS_BASE, POOLIDS, x);
1682 }
1683 private int getAndSetParallelism(int v) {
1684 return U.getAndSetInt(this, PARALLELISM, v);
1685 }
1686 private int getParallelismOpaque() {
1687 return U.getIntOpaque(this, PARALLELISM);
1688 }
1689 private CountDownLatch cmpExTerminationSignal(CountDownLatch x) {
1690 return (CountDownLatch)
1691 U.compareAndExchangeReference(this, TERMINATION, null, x);
1692 }
1693
1694 // runState operations
1695
1696 private long getAndBitwiseOrRunState(long v) { // for status bits
1697 return U.getAndBitwiseOrLong(this, RUNSTATE, v);
1698 }
1699 private boolean casRunState(long c, long v) {
1700 return U.compareAndSetLong(this, RUNSTATE, c, v);
1701 }
1702 private void unlockRunState() { // increment lock bit
1703 U.getAndAddLong(this, RUNSTATE, RS_LOCK);
1704 }
1705 private long lockRunState() { // lock and return current state
1706 long s, u; // locked when RS_LOCK set
1707 if (((s = runState) & RS_LOCK) == 0L && casRunState(s, u = s + RS_LOCK))
1708 return u;
1709 else
1710 return spinLockRunState();
1711 }
1712 private long spinLockRunState() { // spin/sleep
1713 for (int waits = 0;;) {
1714 long s, u;
1715 if (((s = runState) & RS_LOCK) == 0L) {
1716 if (casRunState(s, u = s + RS_LOCK))
1717 return u;
1718 waits = 0;
1719 } else if (waits < SPIN_WAITS) {
1720 ++waits;
1721 Thread.onSpinWait();
1722 } else {
1723 if (waits < MIN_SLEEP)
1724 waits = MIN_SLEEP;
1725 LockSupport.parkNanos(this, (long)waits);
1726 if (waits < MAX_SLEEP)
1727 waits <<= 1;
1728 }
1729 }
1730 }
1731
1732 static boolean poolIsStopping(ForkJoinPool p) { // Used by ForkJoinTask
1733 return p != null && (p.runState & STOP) != 0L;
1734 }
1735
1736 // Creating, registering, and deregistering workers
1737
1738 /**
1739 * Tries to construct and start one worker. Assumes that total
1740 * count has already been incremented as a reservation. Invokes
1741 * deregisterWorker on any failure.
1742 *
1743 * @return true if successful
1744 */
1745 private boolean createWorker() {
1746 ForkJoinWorkerThreadFactory fac = factory;
1747 SharedThreadContainer ctr = container;
1748 Throwable ex = null;
1749 ForkJoinWorkerThread wt = null;
1750 try {
1751 if ((runState & STOP) == 0L && // avoid construction if terminating
1752 fac != null && (wt = fac.newThread(this)) != null) {
1753 if (ctr != null)
1754 ctr.start(wt);
1755 else
1756 wt.start();
1757 return true;
1758 }
1759 } catch (Throwable rex) {
1760 ex = rex;
1761 }
1762 deregisterWorker(wt, ex);
1763 return false;
1764 }
1765
1766 /**
1767 * Provides a name for ForkJoinWorkerThread constructor.
1768 */
1769 final String nextWorkerThreadName() {
1770 String prefix = workerNamePrefix;
1771 long tid = incrementThreadIds() + 1L;
1772 if (prefix == null) // commonPool has no prefix
1773 prefix = "ForkJoinPool.commonPool-worker-";
1774 return prefix.concat(Long.toString(tid));
1775 }
1776
1777 /**
1778 * Finishes initializing and records internal queue.
1779 *
1780 * @param w caller's WorkQueue
1781 */
1782 final void registerWorker(WorkQueue w) {
1783 if (w != null && (runState & STOP) == 0L) {
1784 ThreadLocalRandom.localInit();
1785 int seed = w.stackPred = ThreadLocalRandom.getProbe();
1786 int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag
1787 int id = ((seed << 1) | 1) & SMASK; // base of linear-probe-like scan
1788 long stop = lockRunState() & STOP;
1789 try {
1790 WorkQueue[] qs; int n;
1791 if (stop == 0L && (qs = queues) != null && (n = qs.length) > 0) {
1792 for (int k = n, m = n - 1; ; id += 2) {
1793 if (qs[id &= m] == null)
1794 break;
1795 if ((k -= 2) <= 0) {
1796 id |= n;
1797 break;
1798 }
1799 }
1800 w.phase = id | phaseSeq; // now publishable
1801 if (id < n)
1802 qs[id] = w;
1803 else { // expand
1804 int an = n << 1, am = an - 1;
1805 WorkQueue[] as = new WorkQueue[an];
1806 as[id & am] = w;
1807 for (int j = 1; j < n; j += 2)
1808 as[j] = qs[j];
1809 for (int j = 0; j < n; j += 2) {
1810 WorkQueue q; // shared queues may move
1811 if ((q = qs[j]) != null)
1812 as[q.phase & EXTERNAL_ID_MASK & am] = q;
1813 }
1814 U.storeFence(); // fill before publish
1815 queues = as;
1816 }
1817 }
1818 } finally {
1819 unlockRunState();
1820 }
1821 }
1822 }
1823
1824 /**
1825 * Final callback from terminating worker, as well as upon failure
1826 * to construct or start a worker. Removes record of worker from
1827 * array, and adjusts counts. If pool is shutting down, tries to
1828 * complete termination.
1829 *
1830 * @param wt the worker thread, or null if construction failed
1831 * @param ex the exception causing failure, or null if none
1832 */
1833 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1834 WorkQueue w = null; // null if not created
1835 int phase = 0; // 0 if not registered
1836 if (wt != null && (w = wt.workQueue) != null &&
1837 (phase = w.phase) != 0 && (phase & IDLE) != 0)
1838 releaseWaiters(); // ensure released
1839 if (w == null || w.source != DROPPED) {
1840 long c = ctl; // decrement counts
1841 do {} while (c != (c = compareAndExchangeCtl(
1842 c, ((RC_MASK & (c - RC_UNIT)) |
1843 (TC_MASK & (c - TC_UNIT)) |
1844 (LMASK & c)))));
1845 }
1846 if (phase != 0 && w != null) { // remove index unless terminating
1847 long ns = w.nsteals & 0xffffffffL;
1848 if ((runState & STOP) == 0L) {
1849 WorkQueue[] qs; int n, i;
1850 if ((lockRunState() & STOP) == 0L &&
1851 (qs = queues) != null && (n = qs.length) > 0 &&
1852 qs[i = phase & SMASK & (n - 1)] == w) {
1853 qs[i] = null;
1854 stealCount += ns; // accumulate steals
1855 }
1856 unlockRunState();
1857 }
1858 }
1859 if ((tryTerminate(false, false) & STOP) == 0L &&
1860 phase != 0 && w != null && w.source != DROPPED) {
1861 signalWork(); // possibly replace
1862 w.cancelTasks(); // clean queue
1863 }
1864 if (ex != null)
1865 ForkJoinTask.rethrow(ex);
1866 }
1867
1868 /**
1869 * Releases an idle worker, or creates one if not enough exist.
1870 */
1871 final void signalWork() {
1872 int pc = parallelism;
1873 for (long c = ctl;;) {
1874 WorkQueue[] qs = queues;
1875 long ac = (c + RC_UNIT) & RC_MASK, nc;
1876 int sp = (int)c, i = sp & SMASK;
1877 if ((short)(c >>> RC_SHIFT) >= pc)
1878 break;
1879 if (qs == null)
1880 break;
1881 if (qs.length <= i)
1882 break;
1883 WorkQueue w = qs[i], v = null;
1884 if (sp == 0) {
1885 if ((short)(c >>> TC_SHIFT) >= pc)
1886 break;
1887 nc = ((c + TC_UNIT) & TC_MASK);
1888 }
1889 else if ((v = w) == null)
1890 break;
1891 else
1892 nc = (v.stackPred & LMASK) | (c & TC_MASK);
1893 if (c == (c = compareAndExchangeCtl(c, nc | ac))) {
1894 if (v == null)
1895 createWorker();
1896 else {
1897 v.phase = sp;
1898 if (v.parking != 0)
1899 U.unpark(v.owner);
1900 }
1901 break;
1902 }
1903 }
1904 }
1905
1906 /**
1907 * Releases all waiting workers. Called only during shutdown.
1908 */
1909 private void releaseWaiters() {
1910 for (long c = ctl;;) {
1911 WorkQueue[] qs; WorkQueue v; int sp, i;
1912 if ((sp = (int)c) == 0 || (qs = queues) == null ||
1913 qs.length <= (i = sp & SMASK) || (v = qs[i]) == null)
1914 break;
1915 if (c == (c = compareAndExchangeCtl(
1916 c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) |
1917 (v.stackPred & LMASK))))) {
1918 v.phase = sp;
1919 if (v.parking != 0)
1920 U.unpark(v.owner);
1921 }
1922 }
1923 }
1924
1925 /**
1926 * Internal version of isQuiescent and related functionality.
1927 * @return positive if stopping, nonnegative if terminating or all
1928 * workers are inactive and submission queues are empty and
1929 * unlocked; if so, setting STOP if shutdown is enabled
1930 */
1931 private int quiescent() {
1932 for (;;) {
1933 long phaseSum = 0L;
1934 boolean swept = false;
1935 for (long e, prevRunState = 0L; ; prevRunState = e) {
1936 DelayScheduler ds;
1937 long c = ctl;
1938 if (((e = runState) & STOP) != 0L)
1939 return 1; // terminating
1940 else if ((c & RC_MASK) > 0L)
1941 return -1; // at least one active
1942 else if (!swept || e != prevRunState || (e & RS_LOCK) != 0) {
1943 long sum = c;
1944 WorkQueue[] qs = queues;
1945 int n = (qs == null) ? 0 : qs.length;
1946 for (int i = 0; i < n; ++i) { // scan queues
1947 WorkQueue q;
1948 if ((q = qs[i]) != null) {
1949 int p = q.phase, s = q.top, b = q.base;
1950 sum += (p & 0xffffffffL) | ((long)b << 32);
1951 if ((p & IDLE) == 0 || s - b > 0)
1952 return -1;
1953 }
1954 }
1955 swept = (phaseSum == (phaseSum = sum));
1956 }
1957 else if ((e & SHUTDOWN) == 0)
1958 return 0;
1959 else if ((ds = delayScheduler) != null && !ds.canShutDown())
1960 return 0;
1961 else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP))
1962 return 1; // enable termination
1963 else
1964 break; // restart
1965 }
1966 }
1967 }
1968
1969 /**
1970 * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1971 * See above for explanation.
1972 *
1973 * @param w caller's WorkQueue (may be null on failed initialization)
1974 */
1975 final void runWorker(WorkQueue w) {
1976 if (w != null) {
1977 int phase = w.phase, r = w.stackPred; // seed from registerWorker
1978 int fifo = w.config & FIFO, nsteals = 0, src = -1;
1979 for (;;) {
1980 WorkQueue[] qs;
1981 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1982 if ((runState & STOP) != 0L || (qs = queues) == null)
1983 break;
1984 int n = qs.length, i = r, step = (r >>> 16) | 1;
1985 boolean rescan = false;
1986 scan: for (int l = n; l > 0; --l, i += step) { // scan queues
1987 int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
1988 if ((q = qs[j = i & (n - 1)]) != null &&
1989 (a = q.array) != null && (cap = a.length) > 0) {
1990 for (int m = cap - 1, pb = -1, b = q.base;;) {
1991 ForkJoinTask<?> t; long k;
1992 t = (ForkJoinTask<?>)U.getReferenceAcquire(
1993 a, k = slotOffset(m & b));
1994 if (b != (b = q.base) || t == null ||
1995 !U.compareAndSetReference(a, k, t, null)) {
1996 if (a[b & m] == null) {
1997 if (rescan) // end of run
1998 break scan;
1999 if (a[(b + 1) & m] == null &&
2000 a[(b + 2) & m] == null) {
2001 break; // probably empty
2002 }
2003 if (pb == (pb = b)) { // track progress
2004 rescan = true; // stalled; reorder scan
2005 break scan;
2006 }
2007 }
2008 }
2009 else {
2010 boolean propagate;
2011 int nb = q.base = b + 1, prevSrc = src;
2012 w.nsteals = ++nsteals;
2013 w.source = src = j; // volatile
2014 rescan = true;
2015 int nh = t.noUserHelp();
2016 if (propagate =
2017 (prevSrc != src || nh != 0) && a[nb & m] != null)
2018 signalWork();
2019 w.topLevelExec(t, fifo);
2020 if ((b = q.base) != nb && !propagate)
2021 break scan; // reduce interference
2022 }
2023 }
2024 }
2025 }
2026 if (!rescan) {
2027 if (((phase = deactivate(w, phase)) & IDLE) != 0)
2028 break;
2029 src = -1; // re-enable propagation
2030 }
2031 }
2032 }
2033 }
2034
2035 /**
2036 * Deactivates and if necessary awaits signal or termination.
2037 *
2038 * @param w the worker
2039 * @param phase current phase
2040 * @return current phase, with IDLE set if worker should exit
2041 */
2042 private int deactivate(WorkQueue w, int phase) {
2043 if (w == null) // currently impossible
2044 return IDLE;
2045 int p = phase | IDLE, activePhase = phase + (IDLE << 1);
2046 long pc = ctl, qc = (activePhase & LMASK) | ((pc - RC_UNIT) & UMASK);
2047 int sp = w.stackPred = (int)pc; // set ctl stack link
2048 w.phase = p;
2049 if (!compareAndSetCtl(pc, qc)) // try to enqueue
2050 return w.phase = phase; // back out on possible signal
2051 int ac = (short)(qc >>> RC_SHIFT), n; long e; WorkQueue[] qs;
2052 if (((e = runState) & STOP) != 0L ||
2053 ((e & SHUTDOWN) != 0L && ac == 0 && quiescent() > 0) ||
2054 (qs = queues) == null || (n = qs.length) <= 0)
2055 return IDLE; // terminating
2056
2057 for (int prechecks = Math.min(ac, 2), // reactivation threshold
2058 k = Math.max(n + (n << 1), SPIN_WAITS << 1);;) {
2059 WorkQueue q; int cap; ForkJoinTask<?>[] a; long c;
2060 if (w.phase == activePhase)
2061 return activePhase;
2062 if (--k < 0)
2063 return awaitWork(w, p); // block, drop, or exit
2064 if ((q = qs[k & (n - 1)]) == null)
2065 Thread.onSpinWait();
2066 else if ((a = q.array) != null && (cap = a.length) > 0 &&
2067 a[q.base & (cap - 1)] != null && --prechecks < 0 &&
2068 (int)(c = ctl) == activePhase &&
2069 compareAndSetCtl(c, (sp & LMASK) | ((c + RC_UNIT) & UMASK)))
2070 return w.phase = activePhase; // reactivate
2071 }
2072 }
2073
2074 /**
2075 * Awaits signal or termination.
2076 *
2077 * @param w the work queue
2078 * @param p current phase (known to be idle)
2079 * @return current phase, with IDLE set if worker should exit
2080 */
2081 private int awaitWork(WorkQueue w, int p) {
2082 if (w != null) {
2083 ForkJoinWorkerThread t; long deadline;
2084 if ((w.config & CLEAR_TLS) != 0 && (t = w.owner) != null)
2085 t.resetThreadLocals(); // clear before reactivate
2086 if ((ctl & RC_MASK) > 0L)
2087 deadline = 0L;
2088 else if ((deadline =
2089 (((w.source != INVALID_ID) ? keepAlive : TIMEOUT_SLOP)) +
2090 System.currentTimeMillis()) == 0L)
2091 deadline = 1L; // avoid zero
2092 int activePhase = p + IDLE;
2093 if ((p = w.phase) != activePhase && (runState & STOP) == 0L) {
2094 LockSupport.setCurrentBlocker(this);
2095 w.parking = 1; // enable unpark
2096 while ((p = w.phase) != activePhase) {
2097 boolean trimmable = false; int trim;
2098 Thread.interrupted(); // clear status
2099 if ((runState & STOP) != 0L)
2100 break;
2101 if (deadline != 0L) {
2102 if ((trim = tryTrim(w, p, deadline)) > 0)
2103 break;
2104 else if (trim < 0)
2105 deadline = 0L;
2106 else
2107 trimmable = true;
2108 }
2109 U.park(trimmable, deadline);
2110 }
2111 w.parking = 0;
2112 LockSupport.setCurrentBlocker(null);
2113 }
2114 }
2115 return p;
2116 }
2117
2118 /**
2119 * Tries to remove and deregister worker after timeout, and release
2120 * another to do the same.
2121 * @return > 0: trimmed, < 0 : not trimmable, else 0
2122 */
2123 private int tryTrim(WorkQueue w, int phase, long deadline) {
2124 long c, nc; int stat, activePhase, vp, i; WorkQueue[] vs; WorkQueue v;
2125 if ((activePhase = phase + IDLE) != (int)(c = ctl) || w == null)
2126 stat = -1; // no longer ctl top
2127 else if (deadline - System.currentTimeMillis() >= TIMEOUT_SLOP)
2128 stat = 0; // spurious wakeup
2129 else if (!compareAndSetCtl(
2130 c, nc = ((w.stackPred & LMASK) | (RC_MASK & c) |
2131 (TC_MASK & (c - TC_UNIT)))))
2132 stat = -1; // lost race to signaller
2133 else {
2134 stat = 1;
2135 w.source = DROPPED;
2136 w.phase = activePhase;
2137 if ((vp = (int)nc) != 0 && (vs = queues) != null &&
2138 vs.length > (i = vp & SMASK) && (v = vs[i]) != null &&
2139 compareAndSetCtl( // try to wake up next waiter
2140 nc, ((UMASK & (nc + RC_UNIT)) |
2141 (nc & TC_MASK) | (v.stackPred & LMASK)))) {
2142 v.source = INVALID_ID; // enable cascaded timeouts
2143 v.phase = vp;
2144 U.unpark(v.owner);
2145 }
2146 }
2147 return stat;
2148 }
2149
2150 /**
2151 * Scans for and returns a polled task, if available. Used only
2152 * for untracked polls. Begins scan at a random index to avoid
2153 * systematic unfairness.
2154 *
2155 * @param submissionsOnly if true, only scan submission queues
2156 */
2157 private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
2158 if ((runState & STOP) == 0L) {
2159 WorkQueue[] qs; int n; WorkQueue q; ForkJoinTask<?> t;
2160 int r = ThreadLocalRandom.nextSecondarySeed();
2161 if (submissionsOnly) // even indices only
2162 r &= ~1;
2163 int step = (submissionsOnly) ? 2 : 1;
2164 if ((qs = queues) != null && (n = qs.length) > 0) {
2165 for (int i = n; i > 0; i -= step, r += step) {
2166 if ((q = qs[r & (n - 1)]) != null &&
2167 (t = q.poll()) != null)
2168 return t;
2169 }
2170 }
2171 }
2172 return null;
2173 }
2174
2175 /**
2176 * Tries to decrement counts (sometimes implicitly) and possibly
2177 * arrange for a compensating worker in preparation for
2178 * blocking. May fail due to interference, in which case -1 is
2179 * returned so caller may retry. A zero return value indicates
2180 * that the caller doesn't need to re-adjust counts when later
2181 * unblocked.
2182 *
2183 * @param c incoming ctl value
2184 * @return UNCOMPENSATE: block then adjust, 0: block, -1 : retry
2185 */
2186 private int tryCompensate(long c) {
2187 Predicate<? super ForkJoinPool> sat;
2188 long b = config;
2189 int pc = parallelism, // unpack fields
2190 minActive = (short)(b >>> RC_SHIFT),
2191 maxTotal = (short)(b >>> TC_SHIFT) + pc,
2192 active = (short)(c >>> RC_SHIFT),
2193 total = (short)(c >>> TC_SHIFT),
2194 sp = (int)c,
2195 stat = -1; // default retry return
2196 if (sp != 0 && active <= pc) { // activate idle worker
2197 WorkQueue[] qs; WorkQueue v; int i;
2198 if ((qs = queues) != null && qs.length > (i = sp & SMASK) &&
2199 (v = qs[i]) != null &&
2200 compareAndSetCtl(c, (c & UMASK) | (v.stackPred & LMASK))) {
2201 v.phase = sp;
2202 if (v.parking != 0)
2203 U.unpark(v.owner);
2204 stat = UNCOMPENSATE;
2205 }
2206 }
2207 else if (active > minActive && total >= pc) { // reduce active workers
2208 if (compareAndSetCtl(c, ((c - RC_UNIT) & RC_MASK) | (c & ~RC_MASK)))
2209 stat = UNCOMPENSATE;
2210 }
2211 else if (total < maxTotal && total < MAX_CAP) { // try to expand pool
2212 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
2213 if ((runState & STOP) != 0L) // terminating
2214 stat = 0;
2215 else if (compareAndSetCtl(c, nc))
2216 stat = createWorker() ? UNCOMPENSATE : 0;
2217 }
2218 else if (!compareAndSetCtl(c, c)) // validate
2219 ;
2220 else if ((sat = saturate) != null && sat.test(this))
2221 stat = 0;
2222 else
2223 throw new RejectedExecutionException(
2224 "Thread limit exceeded replacing blocked worker");
2225 return stat;
2226 }
2227
2228 /**
2229 * Readjusts RC count; called from ForkJoinTask after blocking.
2230 */
2231 final void uncompensate() {
2232 getAndAddCtl(RC_UNIT);
2233 }
2234
2235 /**
2236 * Helps if possible until the given task is done. Processes
2237 * compatible local tasks and scans other queues for task produced
2238 * by w's stealers; returning compensated blocking sentinel if
2239 * none are found.
2240 *
2241 * @param task the task
2242 * @param w caller's WorkQueue
2243 * @param internal true if w is owned by a ForkJoinWorkerThread
2244 * @return task status on exit, or UNCOMPENSATE for compensated blocking
2245 */
2246 final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean internal) {
2247 if (w != null)
2248 w.tryRemoveAndExec(task, internal);
2249 int s = 0;
2250 if (task != null && (s = task.status) >= 0 && internal && w != null) {
2251 int wid = w.phase & SMASK, r = wid + 2, wsrc = w.source;
2252 long sctl = 0L; // track stability
2253 outer: for (boolean rescan = true;;) {
2254 if ((s = task.status) < 0)
2255 break;
2256 if (!rescan) {
2257 if ((runState & STOP) != 0L)
2258 break;
2259 if (sctl == (sctl = ctl) && (s = tryCompensate(sctl)) >= 0)
2260 break;
2261 }
2262 rescan = false;
2263 WorkQueue[] qs = queues;
2264 int n = (qs == null) ? 0 : qs.length;
2265 scan: for (int l = n >>> 1; l > 0; --l, r += 2) {
2266 int j; WorkQueue q;
2267 if ((q = qs[j = r & SMASK & (n - 1)]) != null) {
2268 for (;;) {
2269 ForkJoinTask<?> t; ForkJoinTask<?>[] a;
2270 boolean eligible = false;
2271 int sq = q.source, b, cap; long k;
2272 if ((a = q.array) == null || (cap = a.length) <= 0)
2273 break;
2274 t = (ForkJoinTask<?>)U.getReferenceAcquire(
2275 a, k = slotOffset((cap - 1) & (b = q.base)));
2276 if (t == task)
2277 eligible = true;
2278 else if (t != null) { // check steal chain
2279 for (int v = sq, d = cap;;) {
2280 WorkQueue p;
2281 if (v == wid) {
2282 eligible = true;
2283 break;
2284 }
2285 if ((v & 1) == 0 || // external or none
2286 --d < 0 || // bound depth
2287 (p = qs[v & (n - 1)]) == null)
2288 break;
2289 v = p.source;
2290 }
2291 }
2292 if ((s = task.status) < 0)
2293 break outer; // validate
2294 if (q.source == sq && q.base == b &&
2295 U.getReference(a, k) == t) {
2296 if (!eligible) { // revisit if nonempty
2297 if (!rescan && t == null && q.top - b > 0)
2298 rescan = true;
2299 break;
2300 }
2301 if (U.compareAndSetReference(a, k, t, null)) {
2302 q.base = b + 1;
2303 w.source = j; // volatile write
2304 t.doExec();
2305 w.source = wsrc;
2306 rescan = true; // restart at index r
2307 break scan;
2308 }
2309 }
2310 }
2311 }
2312 }
2313 }
2314 }
2315 return s;
2316 }
2317
2318 /**
2319 * Version of helpJoin for CountedCompleters.
2320 *
2321 * @param task root of computation (only called when a CountedCompleter)
2322 * @param w caller's WorkQueue
2323 * @param internal true if w is owned by a ForkJoinWorkerThread
2324 * @return task status on exit, or UNCOMPENSATE for compensated blocking
2325 */
2326 final int helpComplete(ForkJoinTask<?> task, WorkQueue w, boolean internal) {
2327 int s = 0;
2328 if (task != null && (s = task.status) >= 0 && w != null) {
2329 int r = w.phase + 1; // for indexing
2330 long sctl = 0L; // track stability
2331 outer: for (boolean rescan = true, locals = true;;) {
2332 if (locals && (s = w.helpComplete(task, internal, 0)) < 0)
2333 break;
2334 if ((s = task.status) < 0)
2335 break;
2336 if (!rescan) {
2337 if ((runState & STOP) != 0L)
2338 break;
2339 if (sctl == (sctl = ctl) &&
2340 (!internal || (s = tryCompensate(sctl)) >= 0))
2341 break;
2342 }
2343 rescan = locals = false;
2344 WorkQueue[] qs = queues;
2345 int n = (qs == null) ? 0 : qs.length;
2346 scan: for (int l = n; l > 0; --l, ++r) {
2347 int j; WorkQueue q;
2348 if ((q = qs[j = r & SMASK & (n - 1)]) != null) {
2349 for (;;) {
2350 ForkJoinTask<?> t; ForkJoinTask<?>[] a;
2351 int b, cap, nb; long k;
2352 boolean eligible = false;
2353 if ((a = q.array) == null || (cap = a.length) <= 0)
2354 break;
2355 t = (ForkJoinTask<?>)U.getReferenceAcquire(
2356 a, k = slotOffset((cap - 1) & (b = q.base)));
2357 if (t instanceof CountedCompleter) {
2358 CountedCompleter<?> f = (CountedCompleter<?>)t;
2359 for (int steps = cap; steps > 0; --steps) {
2360 if (f == task) {
2361 eligible = true;
2362 break;
2363 }
2364 if ((f = f.completer) == null)
2365 break;
2366 }
2367 }
2368 if ((s = task.status) < 0) // validate
2369 break outer;
2370 if (q.base == b) {
2371 if (eligible) {
2372 if (U.compareAndSetReference(
2373 a, k, t, null)) {
2374 q.updateBase(b + 1);
2375 t.doExec();
2376 locals = rescan = true;
2377 break scan;
2378 }
2379 }
2380 else if (U.getReference(a, k) == t) {
2381 if (!rescan && t == null && q.top - b > 0)
2382 rescan = true; // revisit
2383 break;
2384 }
2385 }
2386 }
2387 }
2388 }
2389 }
2390 }
2391 return s;
2392 }
2393
2394 /**
2395 * Runs tasks until all workers are inactive and no tasks are
2396 * found. Rather than blocking when tasks cannot be found, rescans
2397 * until all others cannot find tasks either.
2398 *
2399 * @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
2400 * @param interruptible true if return on interrupt
2401 * @return positive if quiescent, negative if interrupted, else 0
2402 */
2403 private int helpQuiesce(WorkQueue w, long nanos, boolean interruptible) {
2404 int phase; // w.phase inactive bit set when temporarily quiescent
2405 if (w == null || ((phase = w.phase) & IDLE) != 0)
2406 return 0;
2407 int wsrc = w.source;
2408 long startTime = System.nanoTime();
2409 long maxSleep = Math.min(nanos >>> 8, MAX_SLEEP); // approx 1% nanos
2410 long prevSum = 0L;
2411 int activePhase = phase, inactivePhase = phase + IDLE;
2412 int r = phase + 1, waits = 0, returnStatus = 1;
2413 boolean locals = true;
2414 for (long e = runState;;) {
2415 if ((e & STOP) != 0L)
2416 break; // terminating
2417 if (interruptible && Thread.interrupted()) {
2418 returnStatus = -1;
2419 break;
2420 }
2421 if (locals) { // run local tasks before (re)polling
2422 locals = false;
2423 for (ForkJoinTask<?> u; (u = w.nextLocalTask()) != null;)
2424 u.doExec();
2425 }
2426 WorkQueue[] qs = queues;
2427 int n = (qs == null) ? 0 : qs.length;
2428 long phaseSum = 0L;
2429 boolean rescan = false, busy = false;
2430 scan: for (int l = n; l > 0; --l, ++r) {
2431 int j; WorkQueue q;
2432 if ((q = qs[j = r & SMASK & (n - 1)]) != null && q != w) {
2433 for (;;) {
2434 ForkJoinTask<?> t; ForkJoinTask<?>[] a;
2435 int b, cap; long k;
2436 if ((a = q.array) == null || (cap = a.length) <= 0)
2437 break;
2438 t = (ForkJoinTask<?>)U.getReferenceAcquire(
2439 a, k = slotOffset((cap - 1) & (b = q.base)));
2440 if (t != null && phase == inactivePhase) // reactivate
2441 w.phase = phase = activePhase;
2442 if (q.base == b && U.getReference(a, k) == t) {
2443 int nb = b + 1;
2444 if (t == null) {
2445 if (!rescan) {
2446 int qp = q.phase, mq = qp & (IDLE | 1);
2447 phaseSum += qp;
2448 if (mq == 0 || q.top - b > 0)
2449 rescan = true;
2450 else if (mq == 1)
2451 busy = true;
2452 }
2453 break;
2454 }
2455 if (U.compareAndSetReference(a, k, t, null)) {
2456 q.base = nb;
2457 w.source = j; // volatile write
2458 t.doExec();
2459 w.source = wsrc;
2460 rescan = locals = true;
2461 break scan;
2462 }
2463 }
2464 }
2465 }
2466 }
2467 if (e != (e = runState) || prevSum != (prevSum = phaseSum) ||
2468 rescan || (e & RS_LOCK) != 0L)
2469 ; // inconsistent
2470 else if (!busy)
2471 break;
2472 else if (phase == activePhase) {
2473 waits = 0; // recheck, then sleep
2474 w.phase = phase = inactivePhase;
2475 }
2476 else if (System.nanoTime() - startTime > nanos) {
2477 returnStatus = 0; // timed out
2478 break;
2479 }
2480 else if (waits == 0) // same as spinLockRunState except
2481 waits = MIN_SLEEP; // with rescan instead of onSpinWait
2482 else {
2483 LockSupport.parkNanos(this, (long)waits);
2484 if (waits < maxSleep)
2485 waits <<= 1;
2486 }
2487 }
2488 w.phase = activePhase;
2489 return returnStatus;
2490 }
2491
2492 /**
2493 * Helps quiesce from external caller until done, interrupted, or timeout
2494 *
2495 * @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
2496 * @param interruptible true if return on interrupt
2497 * @return positive if quiescent, negative if interrupted, else 0
2498 */
2499 private int externalHelpQuiesce(long nanos, boolean interruptible) {
2500 if (quiescent() < 0) {
2501 long startTime = System.nanoTime();
2502 long maxSleep = Math.min(nanos >>> 8, MAX_SLEEP);
2503 for (int waits = 0;;) {
2504 ForkJoinTask<?> t;
2505 if (interruptible && Thread.interrupted())
2506 return -1;
2507 else if ((t = pollScan(false)) != null) {
2508 waits = 0;
2509 t.doExec();
2510 }
2511 else if (quiescent() >= 0)
2512 break;
2513 else if (System.nanoTime() - startTime > nanos)
2514 return 0;
2515 else if (waits == 0)
2516 waits = MIN_SLEEP;
2517 else {
2518 LockSupport.parkNanos(this, (long)waits);
2519 if (waits < maxSleep)
2520 waits <<= 1;
2521 }
2522 }
2523 }
2524 return 1;
2525 }
2526
2527 /**
2528 * Helps quiesce from either internal or external caller
2529 *
2530 * @param pool the pool to use, or null if any
2531 * @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
2532 * @param interruptible true if return on interrupt
2533 * @return positive if quiescent, negative if interrupted, else 0
2534 */
2535 static final int helpQuiescePool(ForkJoinPool pool, long nanos,
2536 boolean interruptible) {
2537 Thread t; ForkJoinPool p; ForkJoinWorkerThread wt;
2538 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
2539 (p = (wt = (ForkJoinWorkerThread)t).pool) != null &&
2540 (p == pool || pool == null))
2541 return p.helpQuiesce(wt.workQueue, nanos, interruptible);
2542 else if ((p = pool) != null || (p = common) != null)
2543 return p.externalHelpQuiesce(nanos, interruptible);
2544 else
2545 return 0;
2546 }
2547
2548 /**
2549 * Gets and removes a local or stolen task for the given worker.
2550 *
2551 * @return a task, if available
2552 */
2553 final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2554 ForkJoinTask<?> t;
2555 if (w == null || (t = w.nextLocalTask()) == null)
2556 t = pollScan(false);
2557 return t;
2558 }
2559
2560 // External operations
2561
2562 /**
2563 * Finds and locks a WorkQueue for an external submitter, or
2564 * throws RejectedExecutionException if shutdown or terminating.
2565 * @param r current ThreadLocalRandom.getProbe() value
2566 * @param rejectOnShutdown true if RejectedExecutionException
2567 * should be thrown when shutdown (else only if terminating)
2568 */
2569 private WorkQueue submissionQueue(int r, boolean rejectOnShutdown) {
2570 int reuse; // nonzero if prefer create
2571 if ((reuse = r) == 0) {
2572 ThreadLocalRandom.localInit(); // initialize caller's probe
2573 r = ThreadLocalRandom.getProbe();
2574 }
2575 for (int probes = 0; ; ++probes) {
2576 int n, i, id; WorkQueue[] qs; WorkQueue q;
2577 if ((qs = queues) == null)
2578 break;
2579 if ((n = qs.length) <= 0)
2580 break;
2581 if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
2582 WorkQueue w = new WorkQueue(null, id, 0, false);
2583 w.phase = id;
2584 boolean reject = ((lockRunState() & SHUTDOWN) != 0 &&
2585 rejectOnShutdown);
2586 if (!reject && queues == qs && qs[i] == null)
2587 q = qs[i] = w; // else lost race to install
2588 unlockRunState();
2589 if (q != null)
2590 return q;
2591 if (reject)
2592 break;
2593 reuse = 0;
2594 }
2595 if (reuse == 0 || !q.tryLockPhase()) { // move index
2596 if (reuse == 0) {
2597 if (probes >= n >> 1)
2598 reuse = r; // stop prefering free slot
2599 }
2600 else if (q != null)
2601 reuse = 0; // probe on collision
2602 r = ThreadLocalRandom.advanceProbe(r);
2603 }
2604 else if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
2605 q.unlockPhase(); // check while q lock held
2606 break;
2607 }
2608 else
2609 return q;
2610 }
2611 throw new RejectedExecutionException();
2612 }
2613
2614 private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty, ForkJoinTask<T> task) {
2615 Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal;
2616 if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) &&
2617 (wt = (ForkJoinWorkerThread)t).pool == this) {
2618 internal = true;
2619 q = wt.workQueue;
2620 }
2621 else { // find and lock queue
2622 internal = false;
2623 q = submissionQueue(ThreadLocalRandom.getProbe(), true);
2624 }
2625 q.push(task, signalIfEmpty ? this : null, internal);
2626 return task;
2627 }
2628
2629 /**
2630 * Returns queue for an external submission, bypassing call to
2631 * submissionQueue if already established and unlocked.
2632 */
2633 final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) {
2634 WorkQueue[] qs; WorkQueue q; int n;
2635 int r = ThreadLocalRandom.getProbe();
2636 return (((qs = queues) != null && (n = qs.length) > 0 &&
2637 (q = qs[r & EXTERNAL_ID_MASK & (n - 1)]) != null && r != 0 &&
2638 q.tryLockPhase()) ? q : submissionQueue(r, rejectOnShutdown));
2639 }
2640
2641 /**
2642 * Returns queue for an external thread, if one exists that has
2643 * possibly ever submitted to the given pool (nonzero probe), or
2644 * null if none.
2645 */
2646 static WorkQueue externalQueue(ForkJoinPool p) {
2647 WorkQueue[] qs; int n;
2648 int r = ThreadLocalRandom.getProbe();
2649 return (p != null && (qs = p.queues) != null &&
2650 (n = qs.length) > 0 && r != 0) ?
2651 qs[r & EXTERNAL_ID_MASK & (n - 1)] : null;
2652 }
2653
2654 /**
2655 * Returns external queue for common pool.
2656 */
2657 static WorkQueue commonQueue() {
2658 return externalQueue(common);
2659 }
2660
2661 /**
2662 * If the given executor is a ForkJoinPool, poll and execute
2663 * AsynchronousCompletionTasks from worker's queue until none are
2664 * available or blocker is released.
2665 */
2666 static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
2667 WorkQueue w = null; Thread t; ForkJoinWorkerThread wt;
2668 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2669 (wt = (ForkJoinWorkerThread)t).pool == e)
2670 w = wt.workQueue;
2671 else if (e instanceof ForkJoinPool)
2672 w = externalQueue((ForkJoinPool)e);
2673 if (w != null)
2674 w.helpAsyncBlocker(blocker);
2675 }
2676
2677 /**
2678 * Returns a cheap heuristic guide for task partitioning when
2679 * programmers, frameworks, tools, or languages have little or no
2680 * idea about task granularity. In essence, by offering this
2681 * method, we ask users only about tradeoffs in overhead vs
2682 * expected throughput and its variance, rather than how finely to
2683 * partition tasks.
2684 *
2685 * In a steady state strict (tree-structured) computation, each
2686 * thread makes available for stealing enough tasks for other
2687 * threads to remain active. Inductively, if all threads play by
2688 * the same rules, each thread should make available only a
2689 * constant number of tasks.
2690 *
2691 * The minimum useful constant is just 1. But using a value of 1
2692 * would require immediate replenishment upon each steal to
2693 * maintain enough tasks, which is infeasible. Further,
2694 * partitionings/granularities of offered tasks should minimize
2695 * steal rates, which in general means that threads nearer the top
2696 * of computation tree should generate more than those nearer the
2697 * bottom. In perfect steady state, each thread is at
2698 * approximately the same level of computation tree. However,
2699 * producing extra tasks amortizes the uncertainty of progress and
2700 * diffusion assumptions.
2701 *
2702 * So, users will want to use values larger (but not much larger)
2703 * than 1 to both smooth over transient shortages and hedge
2704 * against uneven progress; as traded off against the cost of
2705 * extra task overhead. We leave the user to pick a threshold
2706 * value to compare with the results of this call to guide
2707 * decisions, but recommend values such as 3.
2708 *
2709 * When all threads are active, it is on average OK to estimate
2710 * surplus strictly locally. In steady-state, if one thread is
2711 * maintaining say 2 surplus tasks, then so are others. So we can
2712 * just use estimated queue length. However, this strategy alone
2713 * leads to serious mis-estimates in some non-steady-state
2714 * conditions (ramp-up, ramp-down, other stalls). We can detect
2715 * many of these by further considering the number of "idle"
2716 * threads, that are known to have zero queued tasks, so
2717 * compensate by a factor of (#idle/#active) threads.
2718 */
2719 static int getSurplusQueuedTaskCount() {
2720 Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2721 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2722 (pool = (wt = (ForkJoinWorkerThread)t).pool) != null &&
2723 (q = wt.workQueue) != null) {
2724 int n = q.top - q.base;
2725 int p = pool.parallelism;
2726 int a = (short)(pool.ctl >>> RC_SHIFT);
2727 return n - (a > (p >>>= 1) ? 0 :
2728 a > (p >>>= 1) ? 1 :
2729 a > (p >>>= 1) ? 2 :
2730 a > (p >>>= 1) ? 4 :
2731 8);
2732 }
2733 return 0;
2734 }
2735
2736 // Termination
2737
2738 /**
2739 * Possibly initiates and/or completes pool termination.
2740 *
2741 * @param now if true, unconditionally terminate, else only
2742 * if no work and no active workers
2743 * @param enable if true, terminate when next possible
2744 * @return runState on exit
2745 */
2746 private long tryTerminate(boolean now, boolean enable) {
2747 long e, isShutdown, ps;
2748 if (((e = runState) & TERMINATED) != 0L)
2749 now = false;
2750 else if ((e & STOP) != 0L)
2751 now = true;
2752 else if (now) {
2753 if (((ps = getAndBitwiseOrRunState(SHUTDOWN|STOP) & STOP)) == 0L) {
2754 if ((ps & RS_LOCK) != 0L) {
2755 spinLockRunState(); // ensure queues array stable after stop
2756 unlockRunState();
2757 }
2758 interruptAll();
2759 }
2760 }
2761 else if ((isShutdown = (e & SHUTDOWN)) != 0L || enable) {
2762 long quiet; DelayScheduler ds;
2763 if (isShutdown == 0L)
2764 getAndBitwiseOrRunState(SHUTDOWN);
2765 if ((quiet = quiescent()) > 0)
2766 now = true;
2767 else if (quiet == 0 && (ds = delayScheduler) != null)
2768 ds.signal();
2769 }
2770
2771 if (now) {
2772 DelayScheduler ds;
2773 releaseWaiters();
2774 if ((ds = delayScheduler) != null)
2775 ds.signal();
2776 for (;;) {
2777 if (((e = runState) & CLEANED) == 0L) {
2778 boolean clean = cleanQueues();
2779 if (((e = runState) & CLEANED) == 0L && clean)
2780 e = getAndBitwiseOrRunState(CLEANED) | CLEANED;
2781 }
2782 if ((e & TERMINATED) != 0L)
2783 break;
2784 if (ctl != 0L) // else loop if didn't finish cleaning
2785 break;
2786 if ((ds = delayScheduler) != null && ds.signal() >= 0)
2787 break;
2788 if ((e & CLEANED) != 0L) {
2789 e |= TERMINATED;
2790 if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0L) {
2791 CountDownLatch done; SharedThreadContainer ctr;
2792 if ((done = termination) != null)
2793 done.countDown();
2794 if ((ctr = container) != null)
2795 ctr.close();
2796 }
2797 break;
2798 }
2799 }
2800 }
2801 return e;
2802 }
2803
2804 /**
2805 * Scans queues in a psuedorandom order based on thread id,
2806 * cancelling tasks until empty, or returning early upon
2807 * interference or still-active external queues, in which case
2808 * other calls will finish cancellation.
2809 *
2810 * @return true if all queues empty
2811 */
2812 private boolean cleanQueues() {
2813 int r = (int)Thread.currentThread().threadId();
2814 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
2815 int step = (r >>> 16) | 1; // randomize traversals
2816 WorkQueue[] qs = queues;
2817 int n = (qs == null) ? 0 : qs.length;
2818 for (int l = n; l > 0; --l, r += step) {
2819 WorkQueue q; ForkJoinTask<?>[] a; int cap;
2820 if ((q = qs[r & (n - 1)]) != null &&
2821 (a = q.array) != null && (cap = a.length) > 0) {
2822 for (;;) {
2823 ForkJoinTask<?> t; int b; long k;
2824 t = (ForkJoinTask<?>)U.getReferenceAcquire(
2825 a, k = slotOffset((cap - 1) & (b = q.base)));
2826 if (q.base == b && t != null &&
2827 U.compareAndSetReference(a, k, t, null)) {
2828 q.updateBase(b + 1);
2829 try {
2830 t.cancel(false);
2831 } catch (Throwable ignore) {
2832 }
2833 }
2834 else if ((q.phase & (IDLE|1)) == 0 || // externally locked
2835 q.top - q.base > 0)
2836 return false; // incomplete
2837 else
2838 break;
2839 }
2840 }
2841 }
2842 return true;
2843 }
2844
2845 /**
2846 * Interrupts all workers
2847 */
2848 private void interruptAll() {
2849 Thread current = Thread.currentThread();
2850 WorkQueue[] qs = queues;
2851 int n = (qs == null) ? 0 : qs.length;
2852 for (int i = 1; i < n; i += 2) {
2853 WorkQueue q; Thread o;
2854 if ((q = qs[i]) != null && (o = q.owner) != null && o != current) {
2855 try {
2856 o.interrupt();
2857 } catch (Throwable ignore) {
2858 }
2859 }
2860 }
2861 }
2862
2863 /**
2864 * Returns termination signal, constructing if necessary
2865 */
2866 private CountDownLatch terminationSignal() {
2867 CountDownLatch signal, s, u;
2868 if ((signal = termination) == null)
2869 signal = ((u = cmpExTerminationSignal(
2870 s = new CountDownLatch(1))) == null) ? s : u;
2871 return signal;
2872 }
2873
2874 // Exported methods
2875
2876 // Constructors
2877
2878 /**
2879 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2880 * java.lang.Runtime#availableProcessors}, using defaults for all
2881 * other parameters (see {@link #ForkJoinPool(int,
2882 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,
2883 * int, int, int, Predicate, long, TimeUnit)}).
2884 */
2885 public ForkJoinPool() {
2886 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2887 defaultForkJoinWorkerThreadFactory, null, false,
2888 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2889 }
2890
2891 /**
2892 * Creates a {@code ForkJoinPool} with the indicated parallelism
2893 * level, using defaults for all other parameters (see {@link
2894 * #ForkJoinPool(int, ForkJoinWorkerThreadFactory,
2895 * UncaughtExceptionHandler, boolean, int, int, int, Predicate,
2896 * long, TimeUnit)}).
2897 *
2898 * @param parallelism the parallelism level
2899 * @throws IllegalArgumentException if parallelism less than or
2900 * equal to zero, or greater than implementation limit
2901 */
2902 public ForkJoinPool(int parallelism) {
2903 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false,
2904 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2905 }
2906
2907 /**
2908 * Creates a {@code ForkJoinPool} with the given parameters (using
2909 * defaults for others -- see {@link #ForkJoinPool(int,
2910 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,
2911 * int, int, int, Predicate, long, TimeUnit)}).
2912 *
2913 * @param parallelism the parallelism level. For default value,
2914 * use {@link java.lang.Runtime#availableProcessors}.
2915 * @param factory the factory for creating new threads. For default value,
2916 * use {@link #defaultForkJoinWorkerThreadFactory}.
2917 * @param handler the handler for internal worker threads that
2918 * terminate due to unrecoverable errors encountered while executing
2919 * tasks. For default value, use {@code null}.
2920 * @param asyncMode if true,
2921 * establishes local first-in-first-out scheduling mode for forked
2922 * tasks that are never joined. This mode may be more appropriate
2923 * than default locally stack-based mode in applications in which
2924 * worker threads only process event-style asynchronous tasks.
2925 * For default value, use {@code false}.
2926 * @throws IllegalArgumentException if parallelism less than or
2927 * equal to zero, or greater than implementation limit
2928 * @throws NullPointerException if the factory is null
2929 */
2930 public ForkJoinPool(int parallelism,
2931 ForkJoinWorkerThreadFactory factory,
2932 UncaughtExceptionHandler handler,
2933 boolean asyncMode) {
2934 this(parallelism, factory, handler, asyncMode,
2935 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2936 }
2937
2938 /**
2939 * Creates a {@code ForkJoinPool} with the given parameters.
2940 *
2941 * @param parallelism the parallelism level. For default value,
2942 * use {@link java.lang.Runtime#availableProcessors}.
2943 *
2944 * @param factory the factory for creating new threads. For
2945 * default value, use {@link #defaultForkJoinWorkerThreadFactory}.
2946 *
2947 * @param handler the handler for internal worker threads that
2948 * terminate due to unrecoverable errors encountered while
2949 * executing tasks. For default value, use {@code null}.
2950 *
2951 * @param asyncMode if true, establishes local first-in-first-out
2952 * scheduling mode for forked tasks that are never joined. This
2953 * mode may be more appropriate than default locally stack-based
2954 * mode in applications in which worker threads only process
2955 * event-style asynchronous tasks. For default value, use {@code
2956 * false}.
2957 *
2958 * @param corePoolSize ignored: used in previous releases of this
2959 * class but no longer applicable. Using {@code 0} maintains
2960 * compatibility across releases.
2961 *
2962 * @param maximumPoolSize the maximum number of threads allowed.
2963 * When the maximum is reached, attempts to replace blocked
2964 * threads fail. (However, because creation and termination of
2965 * different threads may overlap, and may be managed by the given
2966 * thread factory, this value may be transiently exceeded.) To
2967 * arrange the same value as is used by default for the common
2968 * pool, use {@code 256} plus the {@code parallelism} level. (By
2969 * default, the common pool allows a maximum of 256 spare
2970 * threads.) Using a value (for example {@code
2971 * Integer.MAX_VALUE}) larger than the implementation's total
2972 * thread limit has the same effect as using this limit (which is
2973 * the default).
2974 *
2975 * @param minimumRunnable the minimum allowed number of core
2976 * threads not blocked by a join or {@link ManagedBlocker}. To
2977 * ensure progress, when too few unblocked threads exist and
2978 * unexecuted tasks may exist, new threads are constructed, up to
2979 * the given maximumPoolSize. For the default value, use {@code
2980 * 1}, that ensures liveness. A larger value might improve
2981 * throughput in the presence of blocked activities, but might
2982 * not, due to increased overhead. A value of zero may be
2983 * acceptable when submitted tasks cannot have dependencies
2984 * requiring additional threads.
2985 *
2986 * @param saturate if non-null, a predicate invoked upon attempts
2987 * to create more than the maximum total allowed threads. By
2988 * default, when a thread is about to block on a join or {@link
2989 * ManagedBlocker}, but cannot be replaced because the
2990 * maximumPoolSize would be exceeded, a {@link
2991 * RejectedExecutionException} is thrown. But if this predicate
2992 * returns {@code true}, then no exception is thrown, so the pool
2993 * continues to operate with fewer than the target number of
2994 * runnable threads, which might not ensure progress.
2995 *
2996 * @param keepAliveTime the elapsed time since last use before
2997 * a thread is terminated (and then later replaced if needed).
2998 * For the default value, use {@code 60, TimeUnit.SECONDS}.
2999 *
3000 * @param unit the time unit for the {@code keepAliveTime} argument
3001 *
3002 * @throws IllegalArgumentException if parallelism is less than or
3003 * equal to zero, or is greater than implementation limit,
3004 * or if maximumPoolSize is less than parallelism,
3005 * of if the keepAliveTime is less than or equal to zero.
3006 * @throws NullPointerException if the factory is null
3007 * @since 9
3008 */
3009 public ForkJoinPool(int parallelism,
3010 ForkJoinWorkerThreadFactory factory,
3011 UncaughtExceptionHandler handler,
3012 boolean asyncMode,
3013 int corePoolSize,
3014 int maximumPoolSize,
3015 int minimumRunnable,
3016 Predicate<? super ForkJoinPool> saturate,
3017 long keepAliveTime,
3018 TimeUnit unit) {
3019 int p = parallelism;
3020 if (p <= 0 || p > MAX_CAP || p > maximumPoolSize || keepAliveTime <= 0L)
3021 throw new IllegalArgumentException();
3022 if (factory == null || unit == null)
3023 throw new NullPointerException();
3024 int size = Math.max(MIN_QUEUES_SIZE,
3025 1 << (33 - Integer.numberOfLeadingZeros(p - 1)));
3026 this.parallelism = p;
3027 this.factory = factory;
3028 this.ueh = handler;
3029 this.saturate = saturate;
3030 this.keepAlive = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
3031 int maxSpares = Math.clamp(maximumPoolSize - p, 0, MAX_CAP);
3032 int minAvail = Math.clamp(minimumRunnable, 0, MAX_CAP);
3033 this.config = (((asyncMode ? FIFO : 0) & LMASK) |
3034 (((long)maxSpares) << TC_SHIFT) |
3035 (((long)minAvail) << RC_SHIFT));
3036 this.queues = new WorkQueue[size];
3037 String pid = Integer.toString(getAndAddPoolIds(1) + 1);
3038 String name = "ForkJoinPool-" + pid;
3039 this.poolName = name;
3040 this.workerNamePrefix = name + "-worker-";
3041 this.container = SharedThreadContainer.create(name);
3042 }
3043
3044 /**
3045 * Constructor for common pool using parameters possibly
3046 * overridden by system properties
3047 */
3048 private ForkJoinPool(byte forCommonPoolOnly) {
3049 String name = "ForkJoinPool.commonPool";
3050 ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory;
3051 UncaughtExceptionHandler handler = null;
3052 int maxSpares = DEFAULT_COMMON_MAX_SPARES;
3053 int pc = 0, preset = 0; // nonzero if size set as property
3054 try { // ignore exceptions in accessing/parsing properties
3055 String pp = System.getProperty
3056 ("java.util.concurrent.ForkJoinPool.common.parallelism");
3057 if (pp != null) {
3058 pc = Math.max(0, Integer.parseInt(pp));
3059 preset = PRESET_SIZE;
3060 }
3061 String ms = System.getProperty
3062 ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
3063 if (ms != null)
3064 maxSpares = Math.clamp(Integer.parseInt(ms), 0, MAX_CAP);
3065 String sf = System.getProperty
3066 ("java.util.concurrent.ForkJoinPool.common.threadFactory");
3067 String sh = System.getProperty
3068 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
3069 if (sf != null || sh != null) {
3070 ClassLoader ldr = ClassLoader.getSystemClassLoader();
3071 if (sf != null)
3072 fac = (ForkJoinWorkerThreadFactory)
3073 ldr.loadClass(sf).getConstructor().newInstance();
3074 if (sh != null)
3075 handler = (UncaughtExceptionHandler)
3076 ldr.loadClass(sh).getConstructor().newInstance();
3077 }
3078 } catch (Exception ignore) {
3079 }
3080 if (preset == 0)
3081 pc = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
3082 int p = Math.min(pc, MAX_CAP);
3083 int size = Math.max(MIN_QUEUES_SIZE,
3084 (p == 0) ? 1 :
3085 1 << (33 - Integer.numberOfLeadingZeros(p-1)));
3086 this.parallelism = p;
3087 this.config = ((preset & LMASK) | (((long)maxSpares) << TC_SHIFT) |
3088 (1L << RC_SHIFT));
3089 this.factory = fac;
3090 this.ueh = handler;
3091 this.keepAlive = DEFAULT_KEEPALIVE;
3092 this.saturate = null;
3093 this.workerNamePrefix = null;
3094 this.poolName = name;
3095 this.queues = new WorkQueue[size];
3096 this.container = SharedThreadContainer.create(name);
3097 }
3098
3099 /**
3100 * Returns the common pool instance. This pool is statically
3101 * constructed; its run state is unaffected by attempts to {@link
3102 * #shutdown} or {@link #shutdownNow}. However this pool and any
3103 * ongoing processing are automatically terminated upon program
3104 * {@link System#exit}. Any program that relies on asynchronous
3105 * task processing to complete before program termination should
3106 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
3107 * before exit.
3108 *
3109 * @return the common pool instance
3110 * @since 1.8
3111 */
3112 public static ForkJoinPool commonPool() {
3113 // assert common != null : "static init error";
3114 return common;
3115 }
3116
3117 /**
3118 * Package-private access to commonPool overriding zero parallelism
3119 */
3120 static ForkJoinPool asyncCommonPool() {
3121 ForkJoinPool cp; int p;
3122 if ((p = (cp = common).parallelism) == 0)
3123 U.compareAndSetInt(cp, PARALLELISM, 0, 2);
3124 return cp;
3125 }
3126
3127 // Execution methods
3128
3129 /**
3130 * Performs the given task, returning its result upon completion.
3131 * If the computation encounters an unchecked Exception or Error,
3132 * it is rethrown as the outcome of this invocation. Rethrown
3133 * exceptions behave in the same way as regular exceptions, but,
3134 * when possible, contain stack traces (as displayed for example
3135 * using {@code ex.printStackTrace()}) of both the current thread
3136 * as well as the thread actually encountering the exception;
3137 * minimally only the latter.
3138 *
3139 * @param task the task
3140 * @param <T> the type of the task's result
3141 * @return the task's result
3142 * @throws NullPointerException if the task is null
3143 * @throws RejectedExecutionException if the task cannot be
3144 * scheduled for execution
3145 */
3146 public <T> T invoke(ForkJoinTask<T> task) {
3147 poolSubmit(true, Objects.requireNonNull(task));
3148 try {
3149 return task.join();
3150 } catch (RuntimeException | Error unchecked) {
3151 throw unchecked;
3152 } catch (Exception checked) {
3153 throw new RuntimeException(checked);
3154 }
3155 }
3156
3157 /**
3158 * Arranges for (asynchronous) execution of the given task.
3159 *
3160 * @param task the task
3161 * @throws NullPointerException if the task is null
3162 * @throws RejectedExecutionException if the task cannot be
3163 * scheduled for execution
3164 */
3165 public void execute(ForkJoinTask<?> task) {
3166 poolSubmit(true, Objects.requireNonNull(task));
3167 }
3168
3169 // AbstractExecutorService methods
3170
3171 /**
3172 * @throws NullPointerException if the task is null
3173 * @throws RejectedExecutionException if the task cannot be
3174 * scheduled for execution
3175 */
3176 @Override
3177 @SuppressWarnings("unchecked")
3178 public void execute(Runnable task) {
3179 poolSubmit(true, (Objects.requireNonNull(task) instanceof ForkJoinTask<?>)
3180 ? (ForkJoinTask<Void>) task // avoid re-wrap
3181 : new ForkJoinTask.RunnableExecuteAction(task));
3182 }
3183
3184 /**
3185 * Submits a ForkJoinTask for execution.
3186 *
3187 * @implSpec
3188 * This method is equivalent to {@link #externalSubmit(ForkJoinTask)}
3189 * when called from a thread that is not in this pool.
3190 *
3191 * @param task the task to submit
3192 * @param <T> the type of the task's result
3193 * @return the task
3194 * @throws NullPointerException if the task is null
3195 * @throws RejectedExecutionException if the task cannot be
3196 * scheduled for execution
3197 */
3198 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
3199 return poolSubmit(true, Objects.requireNonNull(task));
3200 }
3201
3202 /**
3203 * @throws NullPointerException if the task is null
3204 * @throws RejectedExecutionException if the task cannot be
3205 * scheduled for execution
3206 */
3207 @Override
3208 public <T> ForkJoinTask<T> submit(Callable<T> task) {
3209 Objects.requireNonNull(task);
3210 return poolSubmit(
3211 true,
3212 (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
3213 new ForkJoinTask.AdaptedCallable<T>(task) :
3214 new ForkJoinTask.AdaptedInterruptibleCallable<T>(task));
3215 }
3216
3217 /**
3218 * @throws NullPointerException if the task is null
3219 * @throws RejectedExecutionException if the task cannot be
3220 * scheduled for execution
3221 */
3222 @Override
3223 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
3224 Objects.requireNonNull(task);
3225 return poolSubmit(
3226 true,
3227 (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
3228 new ForkJoinTask.AdaptedRunnable<T>(task, result) :
3229 new ForkJoinTask.AdaptedInterruptibleRunnable<T>(task, result));
3230 }
3231
3232 /**
3233 * @throws NullPointerException if the task is null
3234 * @throws RejectedExecutionException if the task cannot be
3235 * scheduled for execution
3236 */
3237 @Override
3238 @SuppressWarnings("unchecked")
3239 public ForkJoinTask<?> submit(Runnable task) {
3240 Objects.requireNonNull(task);
3241 return poolSubmit(
3242 true,
3243 (task instanceof ForkJoinTask<?>) ?
3244 (ForkJoinTask<Void>) task : // avoid re-wrap
3245 ((Thread.currentThread() instanceof ForkJoinWorkerThread) ?
3246 new ForkJoinTask.AdaptedRunnable<Void>(task, null) :
3247 new ForkJoinTask.AdaptedInterruptibleRunnable<Void>(task, null)));
3248 }
3249
3250 /**
3251 * Submits the given task as if submitted from a non-{@code ForkJoinTask}
3252 * client. The task is added to a scheduling queue for submissions to the
3253 * pool even when called from a thread in the pool.
3254 *
3255 * @implSpec
3256 * This method is equivalent to {@link #submit(ForkJoinTask)} when called
3257 * from a thread that is not in this pool.
3258 *
3259 * @return the task
3260 * @param task the task to submit
3261 * @param <T> the type of the task's result
3262 * @throws NullPointerException if the task is null
3263 * @throws RejectedExecutionException if the task cannot be
3264 * scheduled for execution
3265 * @since 20
3266 */
3267 public <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
3268 Objects.requireNonNull(task);
3269 externalSubmissionQueue(true).push(task, this, false);
3270 return task;
3271 }
3272
3273 /**
3274 * Submits the given task without guaranteeing that it will
3275 * eventually execute in the absence of available active threads.
3276 * In some contexts, this method may reduce contention and
3277 * overhead by relying on context-specific knowledge that existing
3278 * threads (possibly including the calling thread if operating in
3279 * this pool) will eventually be available to execute the task.
3280 *
3281 * @param task the task
3282 * @param <T> the type of the task's result
3283 * @return the task
3284 * @throws NullPointerException if the task is null
3285 * @throws RejectedExecutionException if the task cannot be
3286 * scheduled for execution
3287 * @since 19
3288 */
3289 public <T> ForkJoinTask<T> lazySubmit(ForkJoinTask<T> task) {
3290 return poolSubmit(false, Objects.requireNonNull(task));
3291 }
3292
3293 /**
3294 * Changes the target parallelism of this pool, controlling the
3295 * future creation, use, and termination of worker threads.
3296 * Applications include contexts in which the number of available
3297 * processors changes over time.
3298 *
3299 * @implNote This implementation restricts the maximum number of
3300 * running threads to 32767
3301 *
3302 * @param size the target parallelism level
3303 * @return the previous parallelism level.
3304 * @throws IllegalArgumentException if size is less than 1 or
3305 * greater than the maximum supported by this pool.
3306 * @throws UnsupportedOperationException this is the{@link
3307 * #commonPool()} and parallelism level was set by System
3308 * property {@systemProperty
3309 * java.util.concurrent.ForkJoinPool.common.parallelism}.
3310 * @since 19
3311 */
3312 public int setParallelism(int size) {
3313 if (size < 1 || size > MAX_CAP)
3314 throw new IllegalArgumentException();
3315 if ((config & PRESET_SIZE) != 0)
3316 throw new UnsupportedOperationException("Cannot override System property");
3317 return getAndSetParallelism(size);
3318 }
3319
3320 /**
3321 * Uninterrupible version of {@code invokeAll}. Executes the given
3322 * tasks, returning a list of Futures holding their status and
3323 * results when all complete, ignoring interrupts. {@link
3324 * Future#isDone} is {@code true} for each element of the returned
3325 * list. Note that a <em>completed</em> task could have
3326 * terminated either normally or by throwing an exception. The
3327 * results of this method are undefined if the given collection is
3328 * modified while this operation is in progress.
3329 *
3330 * @apiNote This method supports usages that previously relied on an
3331 * incompatible override of
3332 * {@link ExecutorService#invokeAll(java.util.Collection)}.
3333 *
3334 * @param tasks the collection of tasks
3335 * @param <T> the type of the values returned from the tasks
3336 * @return a list of Futures representing the tasks, in the same
3337 * sequential order as produced by the iterator for the
3338 * given task list, each of which has completed
3339 * @throws NullPointerException if tasks or any of its elements are {@code null}
3340 * @throws RejectedExecutionException if any task cannot be
3341 * scheduled for execution
3342 * @since 22
3343 */
3344 public <T> List<Future<T>> invokeAllUninterruptibly(Collection<? extends Callable<T>> tasks) {
3345 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
3346 try {
3347 for (Callable<T> t : tasks) {
3348 ForkJoinTask<T> f = ForkJoinTask.adapt(t);
3349 futures.add(f);
3350 poolSubmit(true, f);
3351 }
3352 for (int i = futures.size() - 1; i >= 0; --i)
3353 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
3354 return futures;
3355 } catch (Throwable t) {
3356 for (Future<T> e : futures)
3357 e.cancel(true);
3358 throw t;
3359 }
3360 }
3361
3362 /**
3363 * Common support for timed and untimed invokeAll
3364 */
3365 private <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
3366 long deadline)
3367 throws InterruptedException {
3368 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
3369 try {
3370 for (Callable<T> t : tasks) {
3371 ForkJoinTask<T> f = ForkJoinTask.adaptInterruptible(t);
3372 futures.add(f);
3373 poolSubmit(true, f);
3374 }
3375 for (int i = futures.size() - 1; i >= 0; --i)
3376 ((ForkJoinTask<?>)futures.get(i))
3377 .quietlyJoinPoolInvokeAllTask(deadline);
3378 return futures;
3379 } catch (Throwable t) {
3380 for (Future<T> e : futures)
3381 e.cancel(true);
3382 throw t;
3383 }
3384 }
3385
3386 @Override
3387 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
3388 throws InterruptedException {
3389 return invokeAll(tasks, 0L);
3390 }
3391 // for jdk version < 22, replace with
3392 // /**
3393 // * @throws NullPointerException {@inheritDoc}
3394 // * @throws RejectedExecutionException {@inheritDoc}
3395 // */
3396 // @Override
3397 // public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
3398 // return invokeAllUninterruptibly(tasks);
3399 // }
3400
3401 @Override
3402 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
3403 long timeout, TimeUnit unit)
3404 throws InterruptedException {
3405 return invokeAll(tasks, (System.nanoTime() + unit.toNanos(timeout)) | 1L);
3406 }
3407
3408 @Override
3409 public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
3410 throws InterruptedException, ExecutionException {
3411 try {
3412 return new ForkJoinTask.InvokeAnyRoot<T>()
3413 .invokeAny(tasks, this, false, 0L);
3414 } catch (TimeoutException cannotHappen) {
3415 assert false;
3416 return null;
3417 }
3418 }
3419
3420 @Override
3421 public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
3422 long timeout, TimeUnit unit)
3423 throws InterruptedException, ExecutionException, TimeoutException {
3424 return new ForkJoinTask.InvokeAnyRoot<T>()
3425 .invokeAny(tasks, this, true, unit.toNanos(timeout));
3426 }
3427
3428 // Support for delayed tasks
3429
3430 /**
3431 * Returns STOP and SHUTDOWN status (zero if neither), masking or
3432 * truncating out other bits.
3433 */
3434 final int shutdownStatus(DelayScheduler ds) {
3435 return (int)(runState & (SHUTDOWN | STOP));
3436 }
3437
3438 /**
3439 * Tries to stop and possibly terminate if already enabled, return success.
3440 */
3441 final boolean tryStopIfShutdown(DelayScheduler ds) {
3442 return (tryTerminate(false, false) & STOP) != 0L;
3443 }
3444
3445 /**
3446 * Creates and starts DelayScheduler
3447 */
3448 private DelayScheduler startDelayScheduler() {
3449 DelayScheduler ds;
3450 if ((ds = delayScheduler) == null) {
3451 boolean start = false;
3452 String name = poolName + "-delayScheduler";
3453 if (workerNamePrefix == null)
3454 asyncCommonPool(); // override common parallelism zero
3455 long isShutdown = lockRunState() & SHUTDOWN;
3456 try {
3457 if (isShutdown == 0L && (ds = delayScheduler) == null) {
3458 ds = delayScheduler = new DelayScheduler(this, name);
3459 start = true;
3460 }
3461 } finally {
3462 unlockRunState();
3463 }
3464 if (start) { // start outside of lock
3465 SharedThreadContainer ctr;
3466 try {
3467 if ((ctr = container) != null)
3468 ctr.start(ds);
3469 else
3470 ds.start();
3471 } catch (RuntimeException | Error ex) { // back out
3472 lockRunState();
3473 ds = delayScheduler = null;
3474 unlockRunState();
3475 tryTerminate(false, false);
3476 if (ex instanceof Error)
3477 throw ex;
3478 }
3479 }
3480 }
3481 return ds;
3482 }
3483
3484 /**
3485 * Arranges execution of a ScheduledForkJoinTask whose delay has
3486 * elapsed
3487 */
3488 final void executeEnabledScheduledTask(ScheduledForkJoinTask<?> task) {
3489 externalSubmissionQueue(false).push(task, this, false);
3490 }
3491
3492 /**
3493 * Arranges delayed execution of a ScheduledForkJoinTask via the
3494 * DelayScheduler, creating and starting it if necessary.
3495 * @return the task
3496 */
3497 final <T> ScheduledForkJoinTask<T> scheduleDelayedTask(ScheduledForkJoinTask<T> task) {
3498 DelayScheduler ds;
3499 if (((ds = delayScheduler) == null &&
3500 (ds = startDelayScheduler()) == null) ||
3501 (runState & SHUTDOWN) != 0L)
3502 throw new RejectedExecutionException();
3503 ds.pend(task);
3504 return task;
3505 }
3506
3507 /**
3508 * Submits a one-shot task that becomes enabled for execution after the given
3509 * delay. At that point it will execute unless explicitly
3510 * cancelled, or fail to execute (eventually reporting
3511 * cancellation) when encountering resource exhaustion, or the
3512 * pool is {@link #shutdownNow}, or is {@link #shutdown} when
3513 * otherwise quiescent and {@link #cancelDelayedTasksOnShutdown}
3514 * is in effect.
3515 *
3516 * @param command the task to execute
3517 * @param delay the time from now to delay execution
3518 * @param unit the time unit of the delay parameter
3519 * @return a ForkJoinTask implementing the ScheduledFuture
3520 * interface, whose {@code get()} method will return
3521 * {@code null} upon normal completion.
3522 * @throws RejectedExecutionException if the pool is shutdown or
3523 * submission encounters resource exhaustion.
3524 * @throws NullPointerException if command or unit is null
3525 * @since 25
3526 */
3527 public ScheduledFuture<?> schedule(Runnable command,
3528 long delay, TimeUnit unit) {
3529 return scheduleDelayedTask(
3530 new ScheduledForkJoinTask<Void>(
3531 unit.toNanos(delay), 0L, false, // implicit null check of unit
3532 Objects.requireNonNull(command), null, this));
3533 }
3534
3535 /**
3536 * Submits a value-returning one-shot task that becomes enabled for execution
3537 * after the given delay. At that point it will execute unless
3538 * explicitly cancelled, or fail to execute (eventually reporting
3539 * cancellation) when encountering resource exhaustion, or the
3540 * pool is {@link #shutdownNow}, or is {@link #shutdown} when
3541 * otherwise quiescent and {@link #cancelDelayedTasksOnShutdown}
3542 * is in effect.
3543 *
3544 * @param callable the function to execute
3545 * @param delay the time from now to delay execution
3546 * @param unit the time unit of the delay parameter
3547 * @param <V> the type of the callable's result
3548 * @return a ForkJoinTask implementing the ScheduledFuture
3549 * interface, whose {@code get()} method will return the
3550 * value from the callable upon normal completion.
3551 * @throws RejectedExecutionException if the pool is shutdown or
3552 * submission encounters resource exhaustion.
3553 * @throws NullPointerException if command or unit is null
3554 * @since 25
3555 */
3556 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
3557 long delay, TimeUnit unit) {
3558 return scheduleDelayedTask(
3559 new ScheduledForkJoinTask<V>(
3560 unit.toNanos(delay), 0L, false, null, // implicit null check of unit
3561 Objects.requireNonNull(callable), this));
3562 }
3563
3564 /**
3565 * Submits a periodic action that becomes enabled for execution first after the
3566 * given initial delay, and subsequently with the given period;
3567 * that is, executions will commence after
3568 * {@code initialDelay}, then {@code initialDelay + period}, then
3569 * {@code initialDelay + 2 * period}, and so on.
3570 *
3571 * <p>The sequence of task executions continues indefinitely until
3572 * one of the following exceptional completions occur:
3573 * <ul>
3574 * <li>The task is {@linkplain Future#cancel explicitly cancelled}
3575 * <li>Method {@link #shutdownNow} is called
3576 * <li>Method {@link #shutdown} is called and the pool is
3577 * otherwise quiescent, in which case existing executions continue
3578 * but subsequent executions do not.
3579 * <li>An execution or the task encounters resource exhaustion.
3580 * <li>An execution of the task throws an exception. In this case
3581 * calling {@link Future#get() get} on the returned future will throw
3582 * {@link ExecutionException}, holding the exception as its cause.
3583 * </ul>
3584 * Subsequent executions are suppressed. Subsequent calls to
3585 * {@link Future#isDone isDone()} on the returned future will
3586 * return {@code true}.
3587 *
3588 * <p>If any execution of this task takes longer than its period, then
3589 * subsequent executions may start late, but will not concurrently
3590 * execute.
3591 * @param command the task to execute
3592 * @param initialDelay the time to delay first execution
3593 * @param period the period between successive executions
3594 * @param unit the time unit of the initialDelay and period parameters
3595 * @return a ForkJoinTask implementing the ScheduledFuture
3596 * interface. The future's {@link Future#get() get()}
3597 * method will never return normally, and will throw an
3598 * exception upon task cancellation or abnormal
3599 * termination of a task execution.
3600 * @throws RejectedExecutionException if the pool is shutdown or
3601 * submission encounters resource exhaustion.
3602 * @throws NullPointerException if command or unit is null
3603 * @throws IllegalArgumentException if period less than or equal to zero
3604 * @since 25
3605 */
3606 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
3607 long initialDelay,
3608 long period, TimeUnit unit) {
3609 if (period <= 0L)
3610 throw new IllegalArgumentException();
3611 return scheduleDelayedTask(
3612 new ScheduledForkJoinTask<Void>(
3613 unit.toNanos(initialDelay), // implicit null check of unit
3614 unit.toNanos(period), false,
3615 Objects.requireNonNull(command), null, this));
3616 }
3617
3618 /**
3619 * Submits a periodic action that becomes enabled for execution first after the
3620 * given initial delay, and subsequently with the given delay
3621 * between the termination of one execution and the commencement of
3622 * the next.
3623 * <p>The sequence of task executions continues indefinitely until
3624 * one of the following exceptional completions occur:
3625 * <ul>
3626 * <li>The task is {@linkplain Future#cancel explicitly cancelled}
3627 * <li>Method {@link #shutdownNow} is called
3628 * <li>Method {@link #shutdown} is called and the pool is
3629 * otherwise quiescent, in which case existing executions continue
3630 * but subsequent executions do not.
3631 * <li>An execution or the task encounters resource exhaustion.
3632 * <li>An execution of the task throws an exception. In this case
3633 * calling {@link Future#get() get} on the returned future will throw
3634 * {@link ExecutionException}, holding the exception as its cause.
3635 * </ul>
3636 * Subsequent executions are suppressed. Subsequent calls to
3637 * {@link Future#isDone isDone()} on the returned future will
3638 * return {@code true}.
3639 * @param command the task to execute
3640 * @param initialDelay the time to delay first execution
3641 * @param delay the delay between the termination of one
3642 * execution and the commencement of the next
3643 * @param unit the time unit of the initialDelay and delay parameters
3644 * @return a ForkJoinTask implementing the ScheduledFuture
3645 * interface. The future's {@link Future#get() get()}
3646 * method will never return normally, and will throw an
3647 * exception upon task cancellation or abnormal
3648 * termination of a task execution.
3649 * @throws RejectedExecutionException if the pool is shutdown or
3650 * submission encounters resource exhaustion.
3651 * @throws NullPointerException if command or unit is null
3652 * @throws IllegalArgumentException if delay less than or equal to zero
3653 * @since 25
3654 */
3655 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
3656 long initialDelay,
3657 long delay, TimeUnit unit) {
3658 if (delay <= 0L)
3659 throw new IllegalArgumentException();
3660 return scheduleDelayedTask(
3661 new ScheduledForkJoinTask<Void>(
3662 unit.toNanos(initialDelay), // implicit null check of unit
3663 -unit.toNanos(delay), false, // negative for fixed delay
3664 Objects.requireNonNull(command), null, this));
3665 }
3666
3667 /**
3668 * Body of a task performed on timeout of another task
3669 */
3670 static final class TimeoutAction<V> implements Runnable {
3671 // set after construction, nulled after use
3672 ForkJoinTask.CallableWithTimeout<V> task;
3673 Consumer<? super ForkJoinTask<V>> action;
3674 TimeoutAction(Consumer<? super ForkJoinTask<V>> action) {
3675 this.action = action;
3676 }
3677 public void run() {
3678 ForkJoinTask.CallableWithTimeout<V> t = task;
3679 Consumer<? super ForkJoinTask<V>> a = action;
3680 task = null;
3681 action = null;
3682 if (t != null && t.status >= 0) {
3683 if (a == null)
3684 t.cancel(true);
3685 else {
3686 a.accept(t);
3687 t.interruptIfRunning(true);
3688 }
3689 }
3690 }
3691 }
3692
3693 /**
3694 * Submits a task executing the given function, cancelling the
3695 * task or performing a given timeoutAction if not completed
3696 * within the given timeout period. If the optional {@code
3697 * timeoutAction} is null, the task is cancelled (via {@code
3698 * cancel(true)}. Otherwise, the action is applied and the task
3699 * may be interrupted if running. Actions may include {@link
3700 * ForkJoinTask#complete} to set a replacement value or {@link
3701 * ForkJoinTask#completeExceptionally} to throw an appropriate
3702 * exception. Note that these can succeed only if the task has
3703 * not already completed when the timeoutAction executes.
3704 *
3705 * @param callable the function to execute
3706 * @param <V> the type of the callable's result
3707 * @param timeout the time to wait before cancelling if not completed
3708 * @param timeoutAction if nonnull, an action to perform on
3709 * timeout, otherwise the default action is to cancel using
3710 * {@code cancel(true)}.
3711 * @param unit the time unit of the timeout parameter
3712 * @return a Future that can be used to extract result or cancel
3713 * @throws RejectedExecutionException if the task cannot be
3714 * scheduled for execution
3715 * @throws NullPointerException if callable or unit is null
3716 * @since 25
3717 */
3718 public <V> ForkJoinTask<V> submitWithTimeout(Callable<V> callable,
3719 long timeout, TimeUnit unit,
3720 Consumer<? super ForkJoinTask<V>> timeoutAction) {
3721 ForkJoinTask.CallableWithTimeout<V> task; TimeoutAction<V> onTimeout;
3722 Objects.requireNonNull(callable);
3723 ScheduledForkJoinTask<Void> timeoutTask =
3724 new ScheduledForkJoinTask<Void>(
3725 unit.toNanos(timeout), 0L, true,
3726 onTimeout = new TimeoutAction<V>(timeoutAction), null, this);
3727 onTimeout.task = task =
3728 new ForkJoinTask.CallableWithTimeout<V>(callable, timeoutTask);
3729 scheduleDelayedTask(timeoutTask);
3730 return poolSubmit(true, task);
3731 }
3732
3733 /**
3734 * Arranges that scheduled tasks that are not executing and have
3735 * not already been enabled for execution will not be executed and
3736 * will be cancelled upon {@link #shutdown} (unless this pool is
3737 * the {@link #commonPool()} which never shuts down). This method
3738 * may be invoked either before {@link #shutdown} to take effect
3739 * upon the next call, or afterwards to cancel such tasks, which
3740 * may then allow termination. Note that subsequent executions of
3741 * periodic tasks are always disabled upon shutdown, so this
3742 * method applies meaningfully only to non-periodic tasks.
3743 * @since 25
3744 */
3745 public void cancelDelayedTasksOnShutdown() {
3746 DelayScheduler ds;
3747 if ((ds = delayScheduler) != null ||
3748 (ds = startDelayScheduler()) != null)
3749 ds.cancelDelayedTasksOnShutdown();
3750 }
3751
3752 /**
3753 * Returns the factory used for constructing new workers.
3754 *
3755 * @return the factory used for constructing new workers
3756 */
3757 public ForkJoinWorkerThreadFactory getFactory() {
3758 return factory;
3759 }
3760
3761 /**
3762 * Returns the handler for internal worker threads that terminate
3763 * due to unrecoverable errors encountered while executing tasks.
3764 *
3765 * @return the handler, or {@code null} if none
3766 */
3767 public UncaughtExceptionHandler getUncaughtExceptionHandler() {
3768 return ueh;
3769 }
3770
3771 /**
3772 * Returns the targeted parallelism level of this pool.
3773 *
3774 * @return the targeted parallelism level of this pool
3775 */
3776 public int getParallelism() {
3777 return Math.max(getParallelismOpaque(), 1);
3778 }
3779
3780 /**
3781 * Returns the targeted parallelism level of the common pool.
3782 *
3783 * @return the targeted parallelism level of the common pool
3784 * @since 1.8
3785 */
3786 public static int getCommonPoolParallelism() {
3787 return common.getParallelism();
3788 }
3789
3790 /**
3791 * Returns the number of worker threads that have started but not
3792 * yet terminated. The result returned by this method may differ
3793 * from {@link #getParallelism} when threads are created to
3794 * maintain parallelism when others are cooperatively blocked.
3795 *
3796 * @return the number of worker threads
3797 */
3798 public int getPoolSize() {
3799 return (short)(ctl >>> TC_SHIFT);
3800 }
3801
3802 /**
3803 * Returns {@code true} if this pool uses local first-in-first-out
3804 * scheduling mode for forked tasks that are never joined.
3805 *
3806 * @return {@code true} if this pool uses async mode
3807 */
3808 public boolean getAsyncMode() {
3809 return (config & FIFO) != 0;
3810 }
3811
3812 /**
3813 * Returns an estimate of the number of worker threads that are
3814 * not blocked waiting to join tasks or for other managed
3815 * synchronization. This method may overestimate the
3816 * number of running threads.
3817 *
3818 * @return the number of worker threads
3819 */
3820 public int getRunningThreadCount() {
3821 WorkQueue[] qs; WorkQueue q;
3822 int rc = 0;
3823 if ((runState & TERMINATED) == 0L && (qs = queues) != null) {
3824 for (int i = 1; i < qs.length; i += 2) {
3825 if ((q = qs[i]) != null && q.isApparentlyUnblocked())
3826 ++rc;
3827 }
3828 }
3829 return rc;
3830 }
3831
3832 /**
3833 * Returns an estimate of the number of threads that are currently
3834 * stealing or executing tasks. This method may overestimate the
3835 * number of active threads.
3836 *
3837 * @return the number of active threads
3838 */
3839 public int getActiveThreadCount() {
3840 return Math.max((short)(ctl >>> RC_SHIFT), 0);
3841 }
3842
3843 /**
3844 * Returns {@code true} if all worker threads are currently idle.
3845 * An idle worker is one that cannot obtain a task to execute
3846 * because none are available to steal from other threads, and
3847 * there are no pending submissions to the pool. This method is
3848 * conservative; it might not return {@code true} immediately upon
3849 * idleness of all threads, but will eventually become true if
3850 * threads remain inactive.
3851 *
3852 * @return {@code true} if all threads are currently idle
3853 */
3854 public boolean isQuiescent() {
3855 return quiescent() >= 0;
3856 }
3857
3858 /**
3859 * Returns an estimate of the total number of completed tasks that
3860 * were executed by a thread other than their submitter. The
3861 * reported value underestimates the actual total number of steals
3862 * when the pool is not quiescent. This value may be useful for
3863 * monitoring and tuning fork/join programs: in general, steal
3864 * counts should be high enough to keep threads busy, but low
3865 * enough to avoid overhead and contention across threads.
3866 *
3867 * @return the number of steals
3868 */
3869 public long getStealCount() {
3870 long count = stealCount;
3871 WorkQueue[] qs; WorkQueue q;
3872 if ((qs = queues) != null) {
3873 for (int i = 1; i < qs.length; i += 2) {
3874 if ((q = qs[i]) != null)
3875 count += (long)q.nsteals & 0xffffffffL;
3876 }
3877 }
3878 return count;
3879 }
3880
3881 /**
3882 * Returns an estimate of the total number of tasks currently held
3883 * in queues by worker threads (but not including tasks submitted
3884 * to the pool that have not begun executing). This value is only
3885 * an approximation, obtained by iterating across all threads in
3886 * the pool. This method may be useful for tuning task
3887 * granularities.The returned count does not include scheduled
3888 * tasks that are not yet ready to execute, which are reported
3889 * separately by method {@link getDelayedTaskCount}.
3890 *
3891 * @return the number of queued tasks
3892 * @see ForkJoinWorkerThread#getQueuedTaskCount()
3893 */
3894 public long getQueuedTaskCount() {
3895 WorkQueue[] qs; WorkQueue q;
3896 long count = 0;
3897 if ((runState & TERMINATED) == 0L && (qs = queues) != null) {
3898 for (int i = 1; i < qs.length; i += 2) {
3899 if ((q = qs[i]) != null)
3900 count += q.queueSize();
3901 }
3902 }
3903 return count;
3904 }
3905
3906 /**
3907 * Returns an estimate of the number of tasks submitted to this
3908 * pool that have not yet begun executing. This method may take
3909 * time proportional to the number of submissions.
3910 *
3911 * @return the number of queued submissions
3912 */
3913 public int getQueuedSubmissionCount() {
3914 WorkQueue[] qs; WorkQueue q;
3915 int count = 0;
3916 if ((runState & TERMINATED) == 0L && (qs = queues) != null) {
3917 for (int i = 0; i < qs.length; i += 2) {
3918 if ((q = qs[i]) != null)
3919 count += q.queueSize();
3920 }
3921 }
3922 return count;
3923 }
3924
3925 /**
3926 * Returns an estimate of the number of delayed (including
3927 * periodic) tasks scheduled in this pool that are not yet ready
3928 * to submit for execution. The returned value is inaccurate while
3929 * delayed tasks are being processed.
3930 *
3931 * @return an estimate of the number of delayed tasks
3932 * @since 25
3933 */
3934 public long getDelayedTaskCount() {
3935 DelayScheduler ds;
3936 return ((ds = delayScheduler) == null ? 0 : ds.lastStableSize());
3937 }
3938
3939 /**
3940 * Returns {@code true} if there are any tasks submitted to this
3941 * pool that have not yet begun executing.
3942 *
3943 * @return {@code true} if there are any queued submissions
3944 */
3945 public boolean hasQueuedSubmissions() {
3946 WorkQueue[] qs; WorkQueue q;
3947 if ((runState & STOP) == 0L && (qs = queues) != null) {
3948 for (int i = 0; i < qs.length; i += 2) {
3949 if ((q = qs[i]) != null && q.queueSize() > 0)
3950 return true;
3951 }
3952 }
3953 return false;
3954 }
3955
3956 /**
3957 * Removes and returns the next unexecuted submission if one is
3958 * available. This method may be useful in extensions to this
3959 * class that re-assign work in systems with multiple pools.
3960 *
3961 * @return the next submission, or {@code null} if none
3962 */
3963 protected ForkJoinTask<?> pollSubmission() {
3964 return pollScan(true);
3965 }
3966
3967 /**
3968 * Removes all available unexecuted submitted and forked tasks
3969 * from scheduling queues and adds them to the given collection,
3970 * without altering their execution status. These may include
3971 * artificially generated or wrapped tasks. This method is
3972 * designed to be invoked only when the pool is known to be
3973 * quiescent. Invocations at other times may not remove all
3974 * tasks. A failure encountered while attempting to add elements
3975 * to collection {@code c} may result in elements being in
3976 * neither, either or both collections when the associated
3977 * exception is thrown. The behavior of this operation is
3978 * undefined if the specified collection is modified while the
3979 * operation is in progress.
3980 *
3981 * @param c the collection to transfer elements into
3982 * @return the number of elements transferred
3983 */
3984 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
3985 int count = 0;
3986 for (ForkJoinTask<?> t; (t = pollScan(false)) != null; ) {
3987 c.add(t);
3988 ++count;
3989 }
3990 return count;
3991 }
3992
3993 /**
3994 * Returns a string identifying this pool, as well as its state,
3995 * including indications of run state, parallelism level, and
3996 * worker and task counts.
3997 *
3998 * @return a string identifying this pool, as well as its state
3999 */
4000 public String toString() {
4001 // Use a single pass through queues to collect counts
4002 DelayScheduler ds;
4003 long e = runState;
4004 long st = stealCount;
4005 long qt = 0L, ss = 0L; int rc = 0;
4006 WorkQueue[] qs; WorkQueue q;
4007 if ((qs = queues) != null) {
4008 for (int i = 0; i < qs.length; ++i) {
4009 if ((q = qs[i]) != null) {
4010 int size = q.queueSize();
4011 if ((i & 1) == 0)
4012 ss += size;
4013 else {
4014 qt += size;
4015 st += (long)q.nsteals & 0xffffffffL;
4016 if (q.isApparentlyUnblocked())
4017 ++rc;
4018 }
4019 }
4020 }
4021 }
4022 String delayed = ((ds = delayScheduler) == null ? "" :
4023 ", delayed = " + ds.lastStableSize());
4024 int pc = parallelism;
4025 long c = ctl;
4026 int tc = (short)(c >>> TC_SHIFT);
4027 int ac = (short)(c >>> RC_SHIFT);
4028 if (ac < 0) // ignore transient negative
4029 ac = 0;
4030 String level = ((e & TERMINATED) != 0L ? "Terminated" :
4031 (e & STOP) != 0L ? "Terminating" :
4032 (e & SHUTDOWN) != 0L ? "Shutting down" :
4033 "Running");
4034 return super.toString() +
4035 "[" + level +
4036 ", parallelism = " + pc +
4037 ", size = " + tc +
4038 ", active = " + ac +
4039 ", running = " + rc +
4040 ", steals = " + st +
4041 ", tasks = " + qt +
4042 ", submissions = " + ss +
4043 delayed +
4044 "]";
4045 }
4046
4047 /**
4048 * Possibly initiates an orderly shutdown in which previously
4049 * submitted tasks are executed, but no new tasks will be
4050 * accepted. Invocation has no effect on execution state if this
4051 * is the {@link #commonPool()}, and no additional effect if
4052 * already shut down. Tasks that are in the process of being
4053 * submitted concurrently during the course of this method may or
4054 * may not be rejected.
4055 */
4056 public void shutdown() {
4057 if (workerNamePrefix != null) // not common pool
4058 tryTerminate(false, true);
4059 }
4060
4061 /**
4062 * Possibly attempts to cancel and/or stop all tasks, and reject
4063 * all subsequently submitted tasks. Invocation has no effect on
4064 * execution state if this is the {@link #commonPool()}, and no
4065 * additional effect if already shut down. Otherwise, tasks that
4066 * are in the process of being submitted or executed concurrently
4067 * during the course of this method may or may not be
4068 * rejected. This method cancels both existing and unexecuted
4069 * tasks, in order to permit termination in the presence of task
4070 * dependencies. So the method always returns an empty list
4071 * (unlike the case for some other Executors).
4072 *
4073 * @return an empty list
4074 */
4075 public List<Runnable> shutdownNow() {
4076 if (workerNamePrefix != null) // not common pool
4077 tryTerminate(true, true);
4078 return Collections.emptyList();
4079 }
4080
4081 /**
4082 * Returns {@code true} if all tasks have completed following shut down.
4083 *
4084 * @return {@code true} if all tasks have completed following shut down
4085 */
4086 public boolean isTerminated() {
4087 return (tryTerminate(false, false) & TERMINATED) != 0;
4088 }
4089
4090 /**
4091 * Returns {@code true} if the process of termination has
4092 * commenced but not yet completed. This method may be useful for
4093 * debugging. A return of {@code true} reported a sufficient
4094 * period after shutdown may indicate that submitted tasks have
4095 * ignored or suppressed interruption, or are waiting for I/O,
4096 * causing this executor not to properly terminate. (See the
4097 * advisory notes for class {@link ForkJoinTask} stating that
4098 * tasks should not normally entail blocking operations. But if
4099 * they do, they must abort them on interrupt.)
4100 *
4101 * @return {@code true} if terminating but not yet terminated
4102 */
4103 public boolean isTerminating() {
4104 return (tryTerminate(false, false) & (STOP | TERMINATED)) == STOP;
4105 }
4106
4107 /**
4108 * Returns {@code true} if this pool has been shut down.
4109 *
4110 * @return {@code true} if this pool has been shut down
4111 */
4112 public boolean isShutdown() {
4113 return (runState & SHUTDOWN) != 0L;
4114 }
4115
4116 /**
4117 * Blocks until all tasks have completed execution after a
4118 * shutdown request, or the timeout occurs, or the current thread
4119 * is interrupted, whichever happens first. Because the {@link
4120 * #commonPool()} never terminates until program shutdown, when
4121 * applied to the common pool, this method is equivalent to {@link
4122 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
4123 *
4124 * @param timeout the maximum time to wait
4125 * @param unit the time unit of the timeout argument
4126 * @return {@code true} if this executor terminated and
4127 * {@code false} if the timeout elapsed before termination
4128 * @throws InterruptedException if interrupted while waiting
4129 */
4130 public boolean awaitTermination(long timeout, TimeUnit unit)
4131 throws InterruptedException {
4132 long nanos = unit.toNanos(timeout);
4133 CountDownLatch done;
4134 if (workerNamePrefix == null) { // is common pool
4135 if (helpQuiescePool(this, nanos, true) < 0)
4136 throw new InterruptedException();
4137 return false;
4138 }
4139 else if ((tryTerminate(false, false) & TERMINATED) != 0 ||
4140 (done = terminationSignal()) == null ||
4141 (runState & TERMINATED) != 0L)
4142 return true;
4143 else
4144 return done.await(nanos, TimeUnit.NANOSECONDS);
4145 }
4146
4147 /**
4148 * If called by a ForkJoinTask operating in this pool, equivalent
4149 * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
4150 * waits and/or attempts to assist performing tasks until this
4151 * pool {@link #isQuiescent} or the indicated timeout elapses.
4152 *
4153 * @param timeout the maximum time to wait
4154 * @param unit the time unit of the timeout argument
4155 * @return {@code true} if quiescent; {@code false} if the
4156 * timeout elapsed.
4157 */
4158 public boolean awaitQuiescence(long timeout, TimeUnit unit) {
4159 return (helpQuiescePool(this, unit.toNanos(timeout), false) > 0);
4160 }
4161
4162 /**
4163 * Unless this is the {@link #commonPool()}, initiates an orderly
4164 * shutdown in which previously submitted tasks are executed, but
4165 * no new tasks will be accepted, and waits until all tasks have
4166 * completed execution and the executor has terminated.
4167 *
4168 * <p> If already terminated, or this is the {@link
4169 * #commonPool()}, this method has no effect on execution, and
4170 * does not wait. Otherwise, if interrupted while waiting, this
4171 * method stops all executing tasks as if by invoking {@link
4172 * #shutdownNow()}. It then continues to wait until all actively
4173 * executing tasks have completed. Tasks that were awaiting
4174 * execution are not executed. The interrupt status will be
4175 * re-asserted before this method returns.
4176 *
4177 * @since 19
4178 */
4179 @Override
4180 public void close() {
4181 if (workerNamePrefix != null) {
4182 CountDownLatch done = null;
4183 boolean interrupted = false;
4184 while ((tryTerminate(interrupted, true) & TERMINATED) == 0) {
4185 if (done == null)
4186 done = terminationSignal();
4187 else {
4188 try {
4189 done.await();
4190 break;
4191 } catch (InterruptedException ex) {
4192 interrupted = true;
4193 }
4194 }
4195 }
4196 if (interrupted)
4197 Thread.currentThread().interrupt();
4198 }
4199 }
4200
4201 /**
4202 * Interface for extending managed parallelism for tasks running
4203 * in {@link ForkJoinPool}s.
4204 *
4205 * <p>A {@code ManagedBlocker} provides two methods. Method
4206 * {@link #isReleasable} must return {@code true} if blocking is
4207 * not necessary. Method {@link #block} blocks the current thread
4208 * if necessary (perhaps internally invoking {@code isReleasable}
4209 * before actually blocking). These actions are performed by any
4210 * thread invoking {@link
4211 * ForkJoinPool#managedBlock(ManagedBlocker)}. The unusual
4212 * methods in this API accommodate synchronizers that may, but
4213 * don't usually, block for long periods. Similarly, they allow
4214 * more efficient internal handling of cases in which additional
4215 * workers may be, but usually are not, needed to ensure
4216 * sufficient parallelism. Toward this end, implementations of
4217 * method {@code isReleasable} must be amenable to repeated
4218 * invocation. Neither method is invoked after a prior invocation
4219 * of {@code isReleasable} or {@code block} returns {@code true}.
4220 *
4221 * <p>For example, here is a ManagedBlocker based on a
4222 * ReentrantLock:
4223 * <pre> {@code
4224 * class ManagedLocker implements ManagedBlocker {
4225 * final ReentrantLock lock;
4226 * boolean hasLock = false;
4227 * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
4228 * public boolean block() {
4229 * if (!hasLock)
4230 * lock.lock();
4231 * return true;
4232 * }
4233 * public boolean isReleasable() {
4234 * return hasLock || (hasLock = lock.tryLock());
4235 * }
4236 * }}</pre>
4237 *
4238 * <p>Here is a class that possibly blocks waiting for an
4239 * item on a given queue:
4240 * <pre> {@code
4241 * class QueueTaker<E> implements ManagedBlocker {
4242 * final BlockingQueue<E> queue;
4243 * volatile E item = null;
4244 * QueueTaker(BlockingQueue<E> q) { this.queue = q; }
4245 * public boolean block() throws InterruptedException {
4246 * if (item == null)
4247 * item = queue.take();
4248 * return true;
4249 * }
4250 * public boolean isReleasable() {
4251 * return item != null || (item = queue.poll()) != null;
4252 * }
4253 * public E getItem() { // call after pool.managedBlock completes
4254 * return item;
4255 * }
4256 * }}</pre>
4257 */
4258 public static interface ManagedBlocker {
4259 /**
4260 * Possibly blocks the current thread, for example waiting for
4261 * a lock or condition.
4262 *
4263 * @return {@code true} if no additional blocking is necessary
4264 * (i.e., if isReleasable would return true)
4265 * @throws InterruptedException if interrupted while waiting
4266 * (the method is not required to do so, but is allowed to)
4267 */
4268 boolean block() throws InterruptedException;
4269
4270 /**
4271 * Returns {@code true} if blocking is unnecessary.
4272 * @return {@code true} if blocking is unnecessary
4273 */
4274 boolean isReleasable();
4275 }
4276
4277 /**
4278 * Runs the given possibly blocking task. When {@linkplain
4279 * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
4280 * method possibly arranges for a spare thread to be activated if
4281 * necessary to ensure sufficient parallelism while the current
4282 * thread is blocked in {@link ManagedBlocker#block blocker.block()}.
4283 *
4284 * <p>This method repeatedly calls {@code blocker.isReleasable()} and
4285 * {@code blocker.block()} until either method returns {@code true}.
4286 * Every call to {@code blocker.block()} is preceded by a call to
4287 * {@code blocker.isReleasable()} that returned {@code false}.
4288 *
4289 * <p>If not running in a ForkJoinPool, this method is
4290 * behaviorally equivalent to
4291 * <pre> {@code
4292 * while (!blocker.isReleasable())
4293 * if (blocker.block())
4294 * break;}</pre>
4295 *
4296 * If running in a ForkJoinPool, the pool may first be expanded to
4297 * ensure sufficient parallelism available during the call to
4298 * {@code blocker.block()}.
4299 *
4300 * @param blocker the blocker task
4301 * @throws InterruptedException if {@code blocker.block()} did so
4302 */
4303 public static void managedBlock(ManagedBlocker blocker)
4304 throws InterruptedException {
4305 Thread t; ForkJoinPool p;
4306 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
4307 (p = ((ForkJoinWorkerThread)t).pool) != null)
4308 p.compensatedBlock(blocker);
4309 else
4310 unmanagedBlock(blocker);
4311 }
4312
4313 /** ManagedBlock for ForkJoinWorkerThreads */
4314 private void compensatedBlock(ManagedBlocker blocker)
4315 throws InterruptedException {
4316 Objects.requireNonNull(blocker);
4317 for (;;) {
4318 int comp; boolean done;
4319 long c = ctl;
4320 if (blocker.isReleasable())
4321 break;
4322 if ((runState & STOP) != 0L)
4323 throw new InterruptedException();
4324 if ((comp = tryCompensate(c)) >= 0) {
4325 try {
4326 done = blocker.block();
4327 } finally {
4328 if (comp > 0)
4329 getAndAddCtl(RC_UNIT);
4330 }
4331 if (done)
4332 break;
4333 }
4334 }
4335 }
4336
4337 /**
4338 * Invokes tryCompensate to create or re-activate a spare thread to
4339 * compensate for a thread that performs a blocking operation. When the
4340 * blocking operation is done then endCompensatedBlock must be invoked
4341 * with the value returned by this method to re-adjust the parallelism.
4342 * @return value to use in endCompensatedBlock
4343 */
4344 final long beginCompensatedBlock() {
4345 int c;
4346 do {} while ((c = tryCompensate(ctl)) < 0);
4347 return (c == 0) ? 0L : RC_UNIT;
4348 }
4349
4350 /**
4351 * Re-adjusts parallelism after a blocking operation completes.
4352 * @param post value from beginCompensatedBlock
4353 */
4354 void endCompensatedBlock(long post) {
4355 if (post > 0L) {
4356 getAndAddCtl(post);
4357 }
4358 }
4359
4360 /** ManagedBlock for external threads */
4361 private static void unmanagedBlock(ManagedBlocker blocker)
4362 throws InterruptedException {
4363 Objects.requireNonNull(blocker);
4364 do {} while (!blocker.isReleasable() && !blocker.block());
4365 }
4366
4367 @Override
4368 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
4369 Objects.requireNonNull(runnable);
4370 return (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
4371 new ForkJoinTask.AdaptedRunnable<T>(runnable, value) :
4372 new ForkJoinTask.AdaptedInterruptibleRunnable<T>(runnable, value);
4373 }
4374
4375 @Override
4376 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
4377 Objects.requireNonNull(callable);
4378 return (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
4379 new ForkJoinTask.AdaptedCallable<T>(callable) :
4380 new ForkJoinTask.AdaptedInterruptibleCallable<T>(callable);
4381 }
4382
4383 static {
4384 U = Unsafe.getUnsafe();
4385 Class<ForkJoinPool> klass = ForkJoinPool.class;
4386 try {
4387 Field poolIdsField = klass.getDeclaredField("poolIds");
4388 POOLIDS_BASE = U.staticFieldBase(poolIdsField);
4389 POOLIDS = U.staticFieldOffset(poolIdsField);
4390 } catch (NoSuchFieldException e) {
4391 throw new ExceptionInInitializerError(e);
4392 }
4393 CTL = U.objectFieldOffset(klass, "ctl");
4394 RUNSTATE = U.objectFieldOffset(klass, "runState");
4395 PARALLELISM = U.objectFieldOffset(klass, "parallelism");
4396 THREADIDS = U.objectFieldOffset(klass, "threadIds");
4397 TERMINATION = U.objectFieldOffset(klass, "termination");
4398 Class<ForkJoinTask[]> aklass = ForkJoinTask[].class;
4399 ABASE = U.arrayBaseOffset(aklass);
4400 int scale = U.arrayIndexScale(aklass);
4401 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
4402 if ((scale & (scale - 1)) != 0)
4403 throw new Error("array index scale not a power of two");
4404
4405 Class<?> dep = LockSupport.class; // ensure loaded
4406 // allow access to non-public methods
4407 JLA = SharedSecrets.getJavaLangAccess();
4408 SharedSecrets.setJavaUtilConcurrentFJPAccess(
4409 new JavaUtilConcurrentFJPAccess() {
4410 @Override
4411 public long beginCompensatedBlock(ForkJoinPool pool) {
4412 return pool.beginCompensatedBlock();
4413 }
4414 public void endCompensatedBlock(ForkJoinPool pool, long post) {
4415 pool.endCompensatedBlock(post);
4416 }
4417 });
4418 defaultForkJoinWorkerThreadFactory =
4419 new DefaultForkJoinWorkerThreadFactory();
4420 common = new ForkJoinPool((byte)0);
4421 }
4422 }