1 /*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation. Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36 package java.util.concurrent;
37
38 import java.lang.Thread.UncaughtExceptionHandler;
39 import java.lang.reflect.Field;
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.Collections;
43 import java.util.List;
44 import java.util.Objects;
45 import java.util.function.Consumer;
46 import java.util.function.Predicate;
47 import java.util.concurrent.CountDownLatch;
48 import java.util.concurrent.locks.LockSupport;
49 import jdk.internal.access.JavaLangAccess;
50 import jdk.internal.access.JavaUtilConcurrentFJPAccess;
51 import jdk.internal.access.SharedSecrets;
52 import jdk.internal.misc.Unsafe;
53 import jdk.internal.vm.SharedThreadContainer;
54 import static java.util.concurrent.DelayScheduler.ScheduledForkJoinTask;
55
56 /**
57 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
58 * A {@code ForkJoinPool} provides the entry point for submissions
59 * from non-{@code ForkJoinTask} clients, as well as management and
60 * monitoring operations.
61 *
62 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
63 * ExecutorService} mainly by virtue of employing
64 * <em>work-stealing</em>: all threads in the pool attempt to find and
65 * execute tasks submitted to the pool and/or created by other active
66 * tasks (eventually blocking waiting for work if none exist). This
67 * enables efficient processing when most tasks spawn other subtasks
68 * (as do most {@code ForkJoinTask}s), as well as when many small
69 * tasks are submitted to the pool from external clients. Especially
70 * when setting <em>asyncMode</em> to true in constructors, {@code
71 * ForkJoinPool}s may also be appropriate for use with event-style
72 * tasks that are never joined. All worker threads are initialized
73 * with {@link Thread#isDaemon} set {@code true}.
74 *
75 * <p>A static {@link #commonPool()} is available and appropriate for
76 * most applications. The common pool is used by any ForkJoinTask that
77 * is not explicitly submitted to a specified pool. Using the common
78 * pool normally reduces resource usage (its threads are slowly
79 * reclaimed during periods of non-use, and reinstated upon subsequent
80 * use).
81 *
82 * <p>For applications that require separate or custom pools, a {@code
83 * ForkJoinPool} may be constructed with a given target parallelism
84 * level; by default, equal to the number of available processors.
85 * The pool attempts to maintain enough active (or available) threads
86 * by dynamically adding, suspending, or resuming internal worker
87 * threads, even if some tasks are stalled waiting to join others.
88 * However, no such adjustments are guaranteed in the face of blocked
89 * I/O or other unmanaged synchronization. The nested {@link
90 * ManagedBlocker} interface enables extension of the kinds of
91 * synchronization accommodated. The default policies may be
92 * overridden using a constructor with parameters corresponding to
93 * those documented in class {@link ThreadPoolExecutor}.
94 *
95 * <p>In addition to execution and lifecycle control methods, this
96 * class provides status check methods (for example
97 * {@link #getStealCount}) that are intended to aid in developing,
98 * tuning, and monitoring fork/join applications. Also, method
99 * {@link #toString} returns indications of pool state in a
100 * convenient form for informal monitoring.
101 *
102 * <p>As is the case with other ExecutorServices, there are three
103 * main task execution methods summarized in the following table.
104 * These are designed to be used primarily by clients not already
105 * engaged in fork/join computations in the current pool. The main
106 * forms of these methods accept instances of {@code ForkJoinTask},
107 * but overloaded forms also allow mixed execution of plain {@code
108 * Runnable}- or {@code Callable}- based activities as well. However,
109 * tasks that are already executing in a pool should normally instead
110 * use the within-computation forms listed in the table unless using
111 * async event-style tasks that are not usually joined, in which case
112 * there is little difference among choice of methods.
113 *
114 * <table class="plain">
115 * <caption>Summary of task execution methods</caption>
116 * <tr>
117 * <td></td>
118 * <th scope="col"> Call from non-fork/join clients</th>
119 * <th scope="col"> Call from within fork/join computations</th>
120 * </tr>
121 * <tr>
122 * <th scope="row" style="text-align:left"> Arrange async execution</th>
123 * <td> {@link #execute(ForkJoinTask)}</td>
124 * <td> {@link ForkJoinTask#fork}</td>
125 * </tr>
126 * <tr>
127 * <th scope="row" style="text-align:left"> Await and obtain result</th>
128 * <td> {@link #invoke(ForkJoinTask)}</td>
129 * <td> {@link ForkJoinTask#invoke}</td>
130 * </tr>
131 * <tr>
132 * <th scope="row" style="text-align:left"> Arrange exec and obtain Future</th>
133 * <td> {@link #submit(ForkJoinTask)}</td>
134 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
135 * </tr>
136 * </table>
137 *
138 * <p>Additionally, this class supports {@link
139 * ScheduledExecutorService} methods to delay or periodically execute
140 * tasks, as well as method {@link #submitWithTimeout} to cancel tasks
141 * that take too long. The scheduled functions or actions may create
142 * and invoke other {@linkplain ForkJoinTask ForkJoinTasks}. Delayed
143 * actions become enabled for execution and behave as ordinary submitted
144 * tasks when their delays elapse. Scheduling methods return
145 * {@linkplain ForkJoinTask ForkJoinTasks} that implement the {@link
146 * ScheduledFuture} interface. Resource exhaustion encountered after
147 * initial submission results in task cancellation. When time-based
148 * methods are used, shutdown policies match the default policies of
149 * class {@link ScheduledThreadPoolExecutor}: upon {@link #shutdown},
150 * existing periodic tasks will not re-execute, and the pool
151 * terminates when quiescent and existing delayed tasks
152 * complete. Method {@link #cancelDelayedTasksOnShutdown} may be used
153 * to disable all delayed tasks upon shutdown, and method {@link
154 * #shutdownNow} may be used to instead unconditionally initiate pool
155 * termination. Monitoring methods such as {@link #getQueuedTaskCount}
156 * do not include scheduled tasks that are not yet enabled for execution,
157 * which are reported separately by method {@link
158 * #getDelayedTaskCount}.
159 *
160 * <p>The parameters used to construct the common pool may be controlled by
161 * setting the following {@linkplain System#getProperty system properties}:
162 * <ul>
163 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.parallelism}
164 * - the parallelism level, a non-negative integer. Usage is discouraged.
165 * Use {@link #setParallelism} instead.
166 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.threadFactory}
167 * - the class name of a {@link ForkJoinWorkerThreadFactory}.
168 * The {@linkplain ClassLoader#getSystemClassLoader() system class loader}
169 * is used to load this class.
170 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.exceptionHandler}
171 * - the class name of a {@link UncaughtExceptionHandler}.
172 * The {@linkplain ClassLoader#getSystemClassLoader() system class loader}
173 * is used to load this class.
174 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.maximumSpares}
175 * - the maximum number of allowed extra threads to maintain target
176 * parallelism (default 256).
177 * </ul>
178 * If no thread factory is supplied via a system property, then the
179 * common pool uses a factory that uses the system class loader as the
180 * {@linkplain Thread#getContextClassLoader() thread context class loader}.
181 *
182 * Upon any error in establishing these settings, default parameters
183 * are used. It is possible to disable use of threads by using a
184 * factory that may return {@code null}, in which case some tasks may
185 * never execute. While possible, it is strongly discouraged to set
186 * the parallelism property to zero, which may be internally
187 * overridden in the presence of intrinsically async tasks.
188 *
189 * @implNote This implementation restricts the maximum number of
190 * running threads to 32767. Attempts to create pools with greater
191 * than the maximum number result in {@code
192 * IllegalArgumentException}. Also, this implementation rejects
193 * submitted tasks (that is, by throwing {@link
194 * RejectedExecutionException}) only when the pool is shut down or
195 * internal resources have been exhausted.
196 *
197 * @since 1.7
198 * @author Doug Lea
199 */
200 public class ForkJoinPool extends AbstractExecutorService
201 implements ScheduledExecutorService {
202
203 /*
204 * Implementation Overview
205 *
206 * This class and its nested classes provide the main
207 * functionality and control for a set of worker threads. Because
208 * most internal methods and nested classes are interrelated,
209 * their main rationale and descriptions are presented here;
210 * individual methods and nested classes contain only brief
211 * comments about details. Broadly: submissions from non-FJ
212 * threads enter into submission queues. Workers take these tasks
213 * and typically split them into subtasks that may be stolen by
214 * other workers. Work-stealing based on randomized scans
215 * generally leads to better throughput than "work dealing" in
216 * which producers assign tasks to idle threads, in part because
217 * threads that have finished other tasks before the signalled
218 * thread wakes up (which can be a long time) can take the task
219 * instead. Preference rules give first priority to processing
220 * tasks from their own queues (LIFO or FIFO, depending on mode),
221 * then to randomized FIFO steals of tasks in other queues.
222 *
223 * This framework began as vehicle for supporting structured
224 * parallelism using work-stealing, designed to work best when
225 * tasks are dag-structured (wrt completion dependencies), nested
226 * (generated using recursion or completions), of reasonable
227 * granularity, independent (wrt memory and resources) and where
228 * callers participate in task execution. These are properties
229 * that anyone aiming for efficient parallel multicore execution
230 * should design for. Over time, the scalability advantages of
231 * this framework led to extensions to better support more diverse
232 * usage contexts, amounting to weakenings or violations of each
233 * of these properties. Accommodating them may compromise
234 * performance, but mechanics discussed below include tradeoffs
235 * attempting to arrange that no single performance issue dominates.
236 *
237 * Here's a brief history of major revisions, each also with other
238 * minor features and changes.
239 *
240 * 1. Only handle recursively structured computational tasks
241 * 2. Async (FIFO) mode and striped submission queues
242 * 3. Completion-based tasks (mainly CountedCompleters)
243 * 4. CommonPool and parallelStream support
244 * 5. InterruptibleTasks for externally submitted tasks
245 * 6. Support ScheduledExecutorService methods
246 *
247 * Most changes involve adaptions of base algorithms using
248 * combinations of static and dynamic bitwise mode settings (both
249 * here and in ForkJoinTask), and subclassing of ForkJoinTask.
250 * There are a fair number of odd code constructions and design
251 * decisions for components that reside at the edge of Java vs JVM
252 * functionality.
253 *
254 * WorkQueues
255 * ==========
256 *
257 * Most operations occur within work-stealing queues (in nested
258 * class WorkQueue). These are special forms of Deques that
259 * support only three of the four possible end-operations -- push,
260 * pop, and poll (aka steal), under the further constraints that
261 * push and pop are called only from the owning thread (or, as
262 * extended here, under a lock), while poll may be called from
263 * other threads. (If you are unfamiliar with them, you probably
264 * want to read Herlihy and Shavit's book "The Art of
265 * Multiprocessor programming", chapter 16 describing these in
266 * more detail before proceeding.) The main work-stealing queue
267 * design is roughly similar to those in the papers "Dynamic
268 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
269 * (http://research.sun.com/scalable/pubs/index.html) and
270 * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
271 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
272 * The main differences ultimately stem from GC requirements that
273 * we null out taken slots as soon as we can, to maintain as small
274 * a footprint as possible even in programs generating huge
275 * numbers of tasks. To accomplish this, we shift the CAS
276 * arbitrating pop vs poll (steal) from being on the indices
277 * ("base" and "top") to the slots themselves. These provide the
278 * primary required memory ordering -- see "Correct and Efficient
279 * Work-Stealing for Weak Memory Models" by Le, Pop, Cohen, and
280 * Nardelli, PPoPP 2013
281 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
282 * analysis of memory ordering requirements in work-stealing
283 * algorithms similar to the one used here. We use per-operation
284 * ordered writes of various kinds for accesses when required.
285 *
286 * We also support a user mode in which local task processing is
287 * in FIFO, not LIFO order, simply by using a local version of
288 * poll rather than pop. This can be useful in message-passing
289 * frameworks in which tasks are never joined, although with
290 * increased contention among task producers and consumers. Also,
291 * the same data structure (and class) is used for "submission
292 * queues" (described below) holding externally submitted tasks,
293 * that differ only in that a lock (using field "phase"; see below) is
294 * required by external callers to push and pop tasks.
295 *
296 * Adding tasks then takes the form of a classic array push(task)
297 * in a circular buffer:
298 * q.array[q.top++ % length] = task;
299 *
300 * The actual code needs to null-check and size-check the array,
301 * uses masking, not mod, for indexing a power-of-two-sized array,
302 * enforces memory ordering, supports resizing, and possibly
303 * signals waiting workers to start scanning (described below),
304 * which requires stronger forms of order accesses.
305 *
306 * The pop operation (always performed by owner) is of the form:
307 * if ((task = getAndSet(q.array, (q.top-1) % length, null)) != null)
308 * decrement top and return task;
309 * If this fails, the queue is empty. This operation is one part
310 * of the nextLocalTask method, that instead does a local-poll
311 * in FIFO mode.
312 *
313 * The poll operation is, basically:
314 * if (CAS nonnull task t = q.array[k = q.base % length] to null)
315 * increment base and return task;
316 *
317 * However, there are several more cases that must be dealt with.
318 * Some of them are just due to asynchrony; others reflect
319 * contention and stealing policies. Stepping through them
320 * illustrates some of the implementation decisions in this class.
321 *
322 * * Slot k must be read with an acquiring read, which it must
323 * anyway to dereference and run the task if the (acquiring)
324 * CAS succeeds.
325 *
326 * * q.base may change between reading and using its value to
327 * index the slot. To avoid trying to use the wrong t, the
328 * index and slot must be reread (not necessarily immediately)
329 * until consistent, unless this is a local poll by owner, in
330 * which case this form of inconsistency can only appear as t
331 * being null, below.
332 *
333 * * Similarly, q.array may change (due to a resize), unless this
334 * is a local poll by owner. Otherwise, when t is present, this
335 * only needs consideration on CAS failure (since a CAS
336 * confirms the non-resized case.)
337 *
338 * * t may appear null because a previous poll operation has not
339 * yet incremented q.base, so the read is from an already-taken
340 * index. This form of stall reflects the non-lock-freedom of
341 * the poll operation. Stalls can be detected by observing that
342 * q.base doesn't change on repeated reads of null t and when
343 * no other alternatives apply, spin-wait for it to settle. To
344 * reduce producing these kinds of stalls by other stealers, we
345 * encourage timely writes to indices using otherwise
346 * unnecessarily strong writes.
347 *
348 * * The CAS may fail, in which case we may want to retry unless
349 * there is too much contention. One goal is to balance and
350 * spread out the many forms of contention that may be
351 * encountered across polling and other operations to avoid
352 * sustained performance degradations. Across all cases where
353 * alternatives exist, a bounded number of CAS misses or stalls
354 * are tolerated (for slots, ctl, and elsewhere described
355 * below) before taking alternative action. These may move
356 * contention or retries elsewhere, which is still preferable
357 * to single-point bottlenecks.
358 *
359 * * Even though the check "top == base" is quiescently accurate
360 * to determine whether a queue is empty, it is not of much use
361 * when deciding whether to try to poll or repoll after a
362 * failure. Both top and base may move independently, and both
363 * lag updates to the underlying array. To reduce memory
364 * contention, non-owners avoid reading the "top" when
365 * possible, by using one-ahead reads to check whether to
366 * repoll, relying on the fact that a non-empty queue does not
367 * have two null slots in a row, except in cases (resizes and
368 * shifts) that can be detected with a secondary recheck that
369 * is less likely to conflict with owner writes.
370 *
371 * The poll operations in q.poll(), runWorker(), helpJoin(), and
372 * elsewhere differ with respect to whether other queues are
373 * available to try, and the presence or nature of screening steps
374 * when only some kinds of tasks can be taken. When alternatives
375 * (or failing) is an option, they uniformly give up after
376 * bounded numbers of stalls and/or CAS failures, which reduces
377 * contention when too many workers are polling too few tasks.
378 * Overall, in the aggregate, we ensure probabilistic
379 * non-blockingness of work-stealing at least until checking
380 * quiescence (which is intrinsically blocking): If an attempted
381 * steal fails in these ways, a scanning thief chooses a different
382 * target to try next. In contexts where alternatives aren't
383 * available, and when progress conditions can be isolated to
384 * values of a single variable, simple spinloops (using
385 * Thread.onSpinWait) are used to reduce memory traffic.
386 *
387 * WorkQueues are also used in a similar way for tasks submitted
388 * to the pool. We cannot mix these tasks in the same queues used
389 * by workers. Instead, we randomly associate submission queues
390 * with submitting threads (or carriers when using VirtualThreads)
391 * using a form of hashing. The ThreadLocalRandom probe value
392 * serves as a hash code for choosing existing queues, and may be
393 * randomly repositioned upon contention with other submitters.
394 * In essence, submitters act like workers except that they are
395 * restricted to executing local tasks that they submitted (or
396 * when known, subtasks thereof). Insertion of tasks in shared
397 * mode requires a lock. We use only a simple spinlock (as one
398 * role of field "phase") because submitters encountering a busy
399 * queue move to a different position to use or create other
400 * queues. They (spin) block when registering new queues, or
401 * indirectly elsewhere, by revisiting later.
402 *
403 * Management
404 * ==========
405 *
406 * The main throughput advantages of work-stealing stem from
407 * decentralized control -- workers mostly take tasks from
408 * themselves or each other, at rates that can exceed a billion
409 * per second. Most non-atomic control is performed by some form
410 * of scanning across or within queues. The pool itself creates,
411 * activates (enables scanning for and running tasks),
412 * deactivates, blocks, and terminates threads, all with minimal
413 * central information. There are only a few properties that we
414 * can globally track or maintain, so we pack them into a small
415 * number of variables, often maintaining atomicity without
416 * blocking or locking. Nearly all essentially atomic control
417 * state is held in a few variables that are by far most often
418 * read (not written) as status and consistency checks. We pack as
419 * much information into them as we can.
420 *
421 * Field "ctl" contains 64 bits holding information needed to
422 * atomically decide to add, enqueue (on an event queue), and
423 * dequeue and release workers. To enable this packing, we
424 * restrict maximum parallelism to (1<<15)-1 (which is far in
425 * excess of normal operating range) to allow ids, counts, and
426 * their negations (used for thresholding) to fit into 16bit
427 * subfields.
428 *
429 * Field "runState" and per-WorkQueue field "phase" play similar
430 * roles, as lockable, versioned counters. Field runState also
431 * includes monotonic event bits:
432 * * SHUTDOWN: no more external tasks accepted; STOP when quiescent
433 * * STOP: no more tasks run, and deregister all workers
434 * * CLEANED: all unexecuted tasks have been cancelled
435 * * TERMINATED: all workers deregistered and all queues cleaned
436 * The version tags enable detection of state changes (by
437 * comparing two reads) modulo bit wraparound. The bit range in
438 * each case suffices for purposes of determining quiescence,
439 * termination, avoiding ABA-like errors, and signal control, most
440 * of which are ultimately based on at most 15bit ranges (due to
441 * 32767 max total workers). RunState updates do not need to be
442 * atomic with respect to ctl updates, but because they are not,
443 * some care is required to avoid stalls. The seqLock properties
444 * detect changes and conditionally upgrade to coordinate with
445 * updates. It is typically held for less than a dozen
446 * instructions unless the queue array is being resized, during
447 * which contention is rare. To be conservative, lockRunState is
448 * implemented as a spin/sleep loop. Here and elsewhere spin
449 * constants are short enough to apply even on systems with few
450 * available processors. In addition to checking pool status,
451 * reads of runState sometimes serve as acquire fences before
452 * reading other fields.
453 *
454 * Field "parallelism" holds the target parallelism (normally
455 * corresponding to pool size). Users can dynamically reset target
456 * parallelism, but is only accessed when signalling or awaiting
457 * work, so only slowly has an effect in creating threads or
458 * letting them time out and terminate when idle.
459 *
460 * Array "queues" holds references to WorkQueues. It is updated
461 * (only during worker creation and termination) under the
462 * runState lock. It is otherwise concurrently readable but reads
463 * for use in scans (see below) are always prefaced by a volatile
464 * read of runState (or equivalent constructions), ensuring that
465 * its state is current at the point it is used (which is all we
466 * require). To simplify index-based operations, the array size is
467 * always a power of two, and all readers must tolerate null
468 * slots. Worker queues are at odd indices. Worker phase ids
469 * masked with SMASK match their index. Shared (submission) queues
470 * are at even indices. Grouping them together in this way aids in
471 * task scanning: At top-level, both kinds of queues should be
472 * sampled with approximately the same probability, which is
473 * simpler if they are all in the same array. But we also need to
474 * identify what kind they are without looking at them, leading to
475 * this odd/even scheme. One disadvantage is that there are
476 * usually many fewer submission queues, so there can be many
477 * wasted probes (null slots). But this is still cheaper than
478 * alternatives. Other loops over the queues array vary in origin
479 * and stride depending on whether they cover only submission
480 * (even) or worker (odd) queues or both, and whether they require
481 * randomness (in which case cyclically exhaustive strides may be
482 * used).
483 *
484 * All worker thread creation is on-demand, triggered by task
485 * submissions, replacement of terminated workers, and/or
486 * compensation for blocked workers. However, all other support
487 * code is set up to work with other policies. To ensure that we
488 * do not hold on to worker or task references that would prevent
489 * GC, all accesses to workQueues in waiting, signalling, and
490 * control methods are via indices into the queues array (which is
491 * one source of some of the messy code constructions here). In
492 * essence, the queues array serves as a weak reference
493 * mechanism. In particular, the stack top subfield of ctl stores
494 * indices, not references. Operations on queues obtained from
495 * these indices remain valid (with at most some unnecessary extra
496 * work) even if an underlying worker failed and was replaced by
497 * another at the same index. During termination, worker queue
498 * array updates are disabled.
499 *
500 * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
501 * cannot let workers spin indefinitely scanning for tasks when
502 * none can be found immediately, and we cannot start/resume
503 * workers unless there appear to be tasks available. On the
504 * other hand, we must quickly prod them into action when new
505 * tasks are submitted or generated. These latencies are mainly a
506 * function of JVM park/unpark (and underlying OS) performance,
507 * which can be slow and variable (even though usages are
508 * streamlined as much as possible). In many usages, ramp-up time
509 * is the main limiting factor in overall performance, which is
510 * compounded at program start-up by JIT compilation and
511 * allocation. On the other hand, throughput degrades when too
512 * many threads poll for too few tasks. (See below.)
513 *
514 * The "ctl" field atomically maintains total and "released"
515 * worker counts, plus the head of the available worker queue
516 * (actually stack, represented by the lower 32bit subfield of
517 * ctl). Released workers are those known to be scanning for
518 * and/or running tasks (we cannot accurately determine
519 * which). Unreleased ("available") workers are recorded in the
520 * ctl stack. These workers are made eligible for signalling by
521 * enqueuing in ctl (see method deactivate). This "queue" is a
522 * form of Treiber stack. This is ideal for activating threads in
523 * most-recently used order, and improves performance and
524 * locality, outweighing the disadvantages of being prone to
525 * contention and inability to release a worker unless it is
526 * topmost on stack. The top stack state holds the value of the
527 * "phase" field of the worker: its index and status, plus a
528 * version counter that, in addition to the count subfields (also
529 * serving as version stamps) provide protection against Treiber
530 * stack ABA effects.
531 *
532 * Creating workers. To create a worker, we pre-increment counts
533 * (serving as a reservation), and attempt to construct a
534 * ForkJoinWorkerThread via its factory. On starting, the new
535 * thread first invokes registerWorker, where it is assigned an
536 * index in the queues array (expanding the array if necessary).
537 * Upon any exception across these steps, or null return from
538 * factory, deregisterWorker adjusts counts and records
539 * accordingly. If a null return, the pool continues running with
540 * fewer than the target number workers. If exceptional, the
541 * exception is propagated, generally to some external caller.
542 *
543 * WorkQueue field "phase" encodes the queue array id in lower
544 * bits, and otherwise acts similarly to the pool runState field:
545 * The "IDLE" bit is clear while active (either a released worker
546 * or a locked external queue), with other bits serving as a
547 * version counter to distinguish changes across multiple reads.
548 * Note that phase field updates lag queue CAS releases; seeing a
549 * non-idle phase does not guarantee that the worker is available
550 * (and so is never checked in this way).
551 *
552 * The ctl field also serves as the basis for memory
553 * synchronization surrounding activation. This uses a more
554 * efficient version of a Dekker-like rule that task producers and
555 * consumers sync with each other by both writing/CASing ctl (even
556 * if to its current value). However, rather than CASing ctl to
557 * its current value in the common case where no action is
558 * required, we reduce write contention by ensuring that
559 * signalWork invocations are prefaced with a fully fenced memory
560 * access (which is usually needed anyway).
561 *
562 * Signalling. Signals (in signalWork) cause new or reactivated
563 * workers to scan for tasks. SignalWork is invoked in two cases:
564 * (1) When a task is pushed onto an empty queue, and (2) When a
565 * worker takes a top-level task from a queue that has additional
566 * tasks. Together, these suffice in O(log(#threads)) steps to
567 * fully activate with at least enough workers, and ideally no
568 * more than required. This ideal is unobtainable: Callers do not
569 * know whether another worker will finish its current task and
570 * poll for others without need of a signal (which is otherwise an
571 * advantage of work-stealing vs other schemes), and also must
572 * conservatively estimate the triggering conditions of emptiness
573 * or non-emptiness; all of which usually cause more activations
574 * than necessary (see below). (Method signalWork is also used as
575 * failsafe in case of Thread failures in deregisterWorker, to
576 * activate or create a new worker to replace them).
577 *
578 * Top-Level scheduling
579 * ====================
580 *
581 * Scanning. Method runWorker performs top-level scanning for (and
582 * execution of) tasks by polling a pseudo-random permutation of
583 * the array (by starting at a given index, and using a constant
584 * cyclically exhaustive stride.) It uses the same basic polling
585 * method as WorkQueue.poll(), but restarts with a different
586 * permutation on each rescan. The pseudorandom generator need
587 * not have high-quality statistical properties in the long
588 * term. We use Marsaglia XorShifts, seeded with the Weyl sequence
589 * from ThreadLocalRandom probes, which are cheap and suffice.
590 *
591 * Deactivation. When no tasks are found by a worker in runWorker,
592 * it invokes deactivate, that first deactivates (to an IDLE
593 * phase). Avoiding missed signals during deactivation requires a
594 * (conservative) rescan, reactivating if there may be tasks to
595 * poll. Because idle workers are often not yet blocked (parked),
596 * we use a WorkQueue field to advertise that a waiter actually
597 * needs unparking upon signal.
598 *
599 * When tasks are constructed as (recursive) DAGs, top-level
600 * scanning is usually infrequent, and doesn't encounter most
601 * of the following problems addressed by runWorker and awaitWork:
602 *
603 * Locality. Polls are organized into "runs", continuing until
604 * empty or contended, while also minimizing interference by
605 * postponing bookeeping to ends of runs. This may reduce
606 * fairness.
607 *
608 * Contention. When many workers try to poll few queues, they
609 * often collide, generating CAS failures and disrupting locality
610 * of workers already running their tasks. This also leads to
611 * stalls when tasks cannot be taken because other workers have
612 * not finished poll operations, which is detected by reading
613 * ahead in queue arrays. In both cases, workers restart scans in a
614 * way that approximates randomized backoff.
615 *
616 * Oversignalling. When many short top-level tasks are present in
617 * a small number of queues, the above signalling strategy may
618 * activate many more workers than needed, worsening locality and
619 * contention problems, while also generating more global
620 * contention (field ctl is CASed on every activation and
621 * deactivation). We filter out (both in runWorker and
622 * signalWork) attempted signals that are surely not needed
623 * because the signalled tasks are already taken.
624 *
625 * Shutdown and Quiescence
626 * =======================
627 *
628 * Quiescence. Workers scan looking for work, giving up when they
629 * don't find any, without being sure that none are available.
630 * However, some required functionality relies on consensus about
631 * quiescence (also termination, discussed below). The count
632 * fields in ctl allow accurate discovery of states in which all
633 * workers are idle. However, because external (asynchronous)
634 * submitters are not part of this vote, these mechanisms
635 * themselves do not guarantee that the pool is in a quiescent
636 * state with respect to methods isQuiescent, shutdown (which
637 * begins termination when quiescent), helpQuiesce, and indirectly
638 * others including tryCompensate. Method quiescent() is used in
639 * all of these contexts. It provides checks that all workers are
640 * idle and there are no submissions that they could poll if they
641 * were not idle, retrying on inconsistent reads of queues and
642 * using the runState seqLock to retry on queue array updates.
643 * (It also reports quiescence if the pool is terminating.) A true
644 * report means only that there was a moment at which quiescence
645 * held. False negatives are inevitable (for example when queues
646 * indices lag updates, as described above), which is accommodated
647 * when (tentatively) idle by scanning for work etc, and then
648 * re-invoking. This includes cases in which the final unparked
649 * thread (in deactivate()) uses quiescent() to check for tasks
650 * that could have been added during a race window that would not
651 * be accompanied by a signal, in which case re-activating itself
652 * (or any other worker) to rescan. Method helpQuiesce acts
653 * similarly but cannot rely on ctl counts to determine that all
654 * workers are inactive because the caller and any others
655 * executing helpQuiesce are not included in counts.
656 *
657 * Termination. Termination is initiated by setting STOP in one of
658 * three ways (via methods tryTerminate and quiescent):
659 * * A call to shutdownNow, in which case all workers are
660 * interrupted, first ensuring that the queues array is stable,
661 * to avoid missing any workers.
662 * * A call to shutdown when quiescent, in which case method
663 * releaseWaiters is used to dequeue them, at which point they notice
664 * STOP state and return from runWorker to deregister();
665 * * The pool becomes quiescent() sometime after shutdown has
666 * been called, in which case releaseWaiters is also used to
667 * propagate as they deregister.
668 * Upon STOP, each worker, as well as external callers to
669 * tryTerminate (via close() etc) race to set CLEANED, indicating
670 * that all tasks have been cancelled. The implementation (method
671 * cleanQueues) balances cases in which there may be many tasks to
672 * cancel (benefitting from parallelism) versus contention and
673 * interference when many threads try to poll remaining queues,
674 * while also avoiding unnecessary rechecks, by using
675 * pseudorandom scans and giving up upon interference. This may be
676 * retried by the same caller only when there are no more
677 * registered workers, using the same criteria as method
678 * quiescent. When CLEANED and all workers have deregistered,
679 * TERMINATED is set, also signalling any caller of
680 * awaitTermination or close. Because shutdownNow-based
681 * termination relies on interrupts, there is no guarantee that
682 * workers will stop if their tasks ignore interrupts. Class
683 * InterruptibleTask (see below) further arranges runState checks
684 * before executing task bodies, and ensures interrupts while
685 * terminating. Even so, there are no guarantees because tasks may
686 * internally enter unbounded loops.
687 *
688 * Trimming workers. To release resources after periods of lack of
689 * use, a worker starting to wait when the pool is quiescent will
690 * time out and terminate if the pool has remained quiescent for
691 * period given by field keepAlive (default 60sec), which applies
692 * to the first timeout of a quiescent pool. Subsequent cases use
693 * minimal delays such that, if still quiescent, all will be
694 * released soon thereafter. This is checked by setting the
695 * "source" field of signallee to an invalid value, that will
696 * remain invalid only if it did not process any tasks.
697 *
698 * Joining Tasks
699 * =============
700 *
701 * The "Join" part of ForkJoinPools consists of a set of
702 * mechanisms that sometimes or always (depending on the kind of
703 * task) avoid context switching or adding worker threads when one
704 * task would otherwise be blocked waiting for completion of
705 * another, basically, just by running that task or one of its
706 * subtasks if not already taken. These mechanics are disabled for
707 * InterruptibleTasks, that guarantee that callers do not execute
708 * submitted tasks.
709 *
710 * The basic structure of joining is an extended spin/block scheme
711 * in which workers check for task completions status between
712 * steps to find other work, until relevant pool state stabilizes
713 * enough to believe that no such tasks are available, at which
714 * point blocking. This is usually a good choice of when to block
715 * that would otherwise be harder to approximate.
716 *
717 * These forms of helping may increase stack space usage, but that
718 * space is bounded in tree/dag structured procedurally parallel
719 * designs to be no more than that if a task were executed only by
720 * the joining thread. This is arranged by associated task
721 * subclasses that also help detect and control the ways in which
722 * this may occur.
723 *
724 * Normally, the first option when joining a task that is not done
725 * is to try to take it from the local queue and run it. Method
726 * tryRemoveAndExec tries to do so. For tasks with any form of
727 * subtasks that must be completed first, we try to locate these
728 * subtasks and run them as well. This is easy when local, but
729 * when stolen, steal-backs are restricted to the same rules as
730 * stealing (polling), which requires additional bookkeeping and
731 * scanning. This cost is still very much worthwhile because of
732 * its impact on task scheduling and resource control.
733 *
734 * The two methods for finding and executing subtasks vary in
735 * details. The algorithm in helpJoin entails a form of "linear
736 * helping". Each worker records (in field "source") the index of
737 * the internal queue from which it last stole a task. (Note:
738 * because chains cannot include even-numbered external queues,
739 * they are ignored, and 0 is an OK default. However, the source
740 * field is set anyway, or eventually to DROPPED, to ensure
741 * volatile memory synchronization effects.) The scan in method
742 * helpJoin uses these markers to try to find a worker to help
743 * (i.e., steal back a task from and execute it) that could make
744 * progress toward completion of the actively joined task. Thus,
745 * the joiner executes a task that would be on its own local deque
746 * if the to-be-joined task had not been stolen. This is a
747 * conservative variant of the approach described in Wagner &
748 * Calder "Leapfrogging: a portable technique for implementing
749 * efficient futures" SIGPLAN Notices, 1993
750 * (http://portal.acm.org/citation.cfm?id=155354). It differs
751 * mainly in that we only record queues, not full dependency
752 * links. This requires a linear scan of the queues to locate
753 * stealers, but isolates cost to when it is needed, rather than
754 * adding to per-task overhead. For CountedCompleters, the
755 * analogous method helpComplete doesn't need stealer-tracking,
756 * but requires a similar (but simpler) check of completion
757 * chains.
758 *
759 * In either case, searches can fail to locate stealers when
760 * stalls delay recording sources or issuing subtasks. We avoid
761 * some of these cases by using snapshotted values of ctl as a
762 * check that the numbers of workers are not changing, along with
763 * rescans to deal with contention and stalls. But even when
764 * accurately identified, stealers might not ever produce a task
765 * that the joiner can in turn help with.
766 *
767 * Related method helpAsyncBlocker does not directly rely on
768 * subtask structure, but instead avoids or postpones blocking of
769 * tagged tasks (CompletableFuture.AsynchronousCompletionTask) by
770 * executing other asyncs that can be processed in any order.
771 * This is currently invoked only in non-join-based blocking
772 * contexts from classes CompletableFuture and
773 * SubmissionPublisher, that could be further generalized.
774 *
775 * When any of the above fail to avoid blocking, we rely on
776 * "compensation" -- an indirect form of context switching that
777 * either activates an existing worker to take the place of the
778 * blocked one, or expands the number of workers.
779 *
780 * Compensation does not by default aim to keep exactly the target
781 * parallelism number of unblocked threads running at any given
782 * time. Some previous versions of this class employed immediate
783 * compensations for any blocked join. However, in practice, the
784 * vast majority of blockages are transient byproducts of GC and
785 * other JVM or OS activities that are made worse by replacement
786 * by causing longer-term oversubscription. These are inevitable
787 * without (unobtainably) perfect information about whether worker
788 * creation is actually necessary. False alarms are common enough
789 * to negatively impact performance, so compensation is by default
790 * attempted only when it appears possible that the pool could
791 * stall due to lack of any unblocked workers. However, we allow
792 * users to override defaults using the long form of the
793 * ForkJoinPool constructor. The compensation mechanism may also
794 * be bounded. Bounds for the commonPool better enable JVMs to
795 * cope with programming errors and abuse before running out of
796 * resources to do so.
797 *
798 * The ManagedBlocker extension API can't use helping so relies
799 * only on compensation in method awaitBlocker. This API was
800 * designed to highlight the uncertainty of compensation decisions
801 * by requiring implementation of method isReleasable to abort
802 * compensation during attempts to obtain a stable snapshot. But
803 * users now rely upon the fact that if isReleasable always
804 * returns false, the API can be used to obtain precautionary
805 * compensation, which is sometimes the only reasonable option
806 * when running unknown code in tasks; which is now supported more
807 * simply (see method beginCompensatedBlock).
808 *
809 * Common Pool
810 * ===========
811 *
812 * The static common pool always exists after static
813 * initialization. Since it (or any other created pool) need
814 * never be used, we minimize initial construction overhead and
815 * footprint to the setup of about a dozen fields, although with
816 * some System property parsing properties are set. The common pool is
817 * distinguished by having a null workerNamePrefix (which is an
818 * odd convention, but avoids the need to decode status in factory
819 * classes). It also has PRESET_SIZE config set if parallelism
820 * was configured by system property.
821 *
822 * When external threads use the common pool, they can perform
823 * subtask processing (see helpComplete and related methods) upon
824 * joins, unless they are submitted using ExecutorService
825 * submission methods, which implicitly disallow this. This
826 * caller-helps policy makes it sensible to set common pool
827 * parallelism level to one (or more) less than the total number
828 * of available cores, or even zero for pure caller-runs. External
829 * threads waiting for joins first check the common pool for their
830 * task, which fails quickly if the caller did not fork to common
831 * pool.
832 *
833 * Guarantees for common pool parallelism zero are limited to
834 * tasks that are joined by their callers in a tree-structured
835 * fashion or use CountedCompleters (as is true for jdk
836 * parallelStreams). Support infiltrates several methods,
837 * including those that retry helping steps until we are sure that
838 * none apply if there are no workers. To deal with conflicting
839 * requirements, uses of the commonPool that require async because
840 * caller-runs need not apply, ensure threads are enabled (by
841 * setting parallelism) via method asyncCommonPool before
842 * proceeding. (In principle, overriding zero parallelism needs to
843 * ensure at least one worker, but due to other backward
844 * compatibility contraints, ensures two.)
845 *
846 * As a more appropriate default in managed environments, unless
847 * overridden by system properties, we use workers of subclass
848 * InnocuousForkJoinWorkerThread for the commonPool. These
849 * workers do not belong to any user-defined ThreadGroup, and
850 * clear all ThreadLocals and reset the ContextClassLoader before
851 * (re)activating to execute top-level tasks. The associated
852 * mechanics may be JVM-dependent and must access particular
853 * Thread class fields to achieve this effect.
854 *
855 * InterruptibleTasks
856 * ====================
857 *
858 * Regular ForkJoinTasks manage task cancellation (method cancel)
859 * independently from the interrupted status of threads running
860 * tasks. Interrupts are issued internally only while
861 * terminating, to wake up workers and cancel queued tasks. By
862 * default, interrupts are cleared only when necessary to ensure
863 * that calls to LockSupport.park do not loop indefinitely (park
864 * returns immediately if the current thread is interrupted).
865 *
866 * To comply with ExecutorService specs, we use subclasses of
867 * abstract class InterruptibleTask for tasks that require
868 * stronger interruption and cancellation guarantees. External
869 * submitters never run these tasks, even if in the common pool
870 * (as indicated by ForkJoinTask.noUserHelp status bit).
871 * InterruptibleTasks include a "runner" field (implemented
872 * similarly to FutureTask) to support cancel(true). Upon pool
873 * shutdown, runners are interrupted so they can cancel. Since
874 * external joining callers never run these tasks, they must await
875 * cancellation by others, which can occur along several different
876 * paths.
877 *
878 * Across these APIs, rules for reporting exceptions for tasks
879 * with results accessed via join() differ from those via get(),
880 * which differ from those invoked using pool submit methods by
881 * non-workers (which comply with Future.get() specs). Internal
882 * usages of ForkJoinTasks ignore interrupted status when executing
883 * or awaiting completion. Otherwise, reporting task results or
884 * exceptions is preferred to throwing InterruptedExceptions,
885 * which are in turn preferred to timeouts. Similarly, completion
886 * status is preferred to reporting cancellation. Cancellation is
887 * reported as an unchecked exception by join(), and by worker
888 * calls to get(), but is otherwise wrapped in a (checked)
889 * ExecutionException.
890 *
891 * Worker Threads cannot be VirtualThreads, as enforced by
892 * requiring ForkJoinWorkerThreads in factories. There are
893 * several constructions relying on this. However as of this
894 * writing, virtual thread bodies are by default run as some form
895 * of InterruptibleTask.
896 *
897 * DelayScheduler
898 * ================
899 *
900 * This class supports ScheduledExecutorService methods by
901 * creating and starting a DelayScheduler on first use of these
902 * methods (via startDelayScheduler). The scheduler operates
903 * independently in its own thread, relaying tasks to the pool to
904 * execute when their delays elapse (see method
905 * executeEnabledScheduledTask). The only other interactions with
906 * the delayScheduler are to control shutdown and maintain
907 * shutdown-related policies in methods quiescent() and
908 * tryTerminate(). In particular, processing must deal with cases
909 * in which tasks are submitted before shutdown, but not enabled
910 * until afterwards, in which case they must bypass some screening
911 * to be allowed to run. Conversely, the DelayScheduler checks
912 * runState status and when enabled, completes termination, using
913 * only methods shutdownStatus and tryStopIfShutdown. All of these
914 * methods are final and have signatures referencing
915 * DelaySchedulers, so cannot conflict with those of any existing
916 * FJP subclasses.
917 *
918 * Memory placement
919 * ================
920 *
921 * Performance is very sensitive to placement of instances of
922 * ForkJoinPool and WorkQueues and their queue arrays, as well as
923 * the placement of their fields. Caches misses and contention due
924 * to false-sharing have been observed to slow down some programs
925 * by more than a factor of four. Effects may vary across initial
926 * memory configuarations, applications, and different garbage
927 * collectors and GC settings, so there is no perfect solution.
928 * Too much isolation may generate more cache misses in common
929 * cases (because some fields snd slots are usually read at the
930 * same time). The @Contended annotation provides only rough
931 * control (for good reason). Similarly for relying on fields
932 * being placed in size-sorted declaration order.
933 *
934 * We isolate the ForkJoinPool.ctl field that otherwise causes the
935 * most false-sharing misses with respect to other fields. Also,
936 * ForkJoinPool fields are ordered such that fields less prone to
937 * contention effects are first, offsetting those that otherwise
938 * would be, while also reducing total footprint vs using
939 * multiple @Contended regions, which tends to slow down
940 * less-contended applications. To help arrange this, some
941 * non-reference fields are declared as "long" even when ints or
942 * shorts would suffice. For class WorkQueue, an
943 * embedded @Contended isolates the very busy top index, and
944 * another segregates status and bookkeeping fields written
945 * (mostly) by owners, that otherwise interfere with reading
946 * array, top, and base fields. There are other variables commonly
947 * contributing to false-sharing-related performance issues
948 * (including fields of class Thread), but we can't do much about
949 * this except try to minimize access.
950 *
951 * Initial sizing and resizing of WorkQueue arrays is an even more
952 * delicate tradeoff because the best strategy systematically
953 * varies across garbage collectors. Small arrays are better for
954 * locality and reduce GC scan time, but large arrays reduce both
955 * direct false-sharing and indirect cases due to GC bookkeeping
956 * (cardmarks etc), and reduce the number of resizes, which are
957 * not especially fast because they require atomic transfers.
958 * Currently, arrays are initialized to be just large enough to
959 * avoid resizing in most tree-structured tasks, but grow rapidly
960 * until large. (Maintenance note: any changes in fields, queues,
961 * or their uses, or JVM layout policies, must be accompanied by
962 * re-evaluation of these placement and sizing decisions.)
963 *
964 * Style notes
965 * ===========
966 *
967 * Memory ordering relies mainly on atomic operations (CAS,
968 * getAndSet, getAndAdd) along with moded accesses. These use
969 * jdk-internal Unsafe for atomics and special memory modes,
970 * rather than VarHandles, to avoid initialization dependencies in
971 * other jdk components that require early parallelism. This can
972 * be awkward and ugly, but also reflects the need to control
973 * outcomes across the unusual cases that arise in very racy code
974 * with very few invariants. All atomic task slot updates use
975 * Unsafe operations requiring offset positions, not indices, as
976 * computed by method slotOffset. All fields are read into locals
977 * before use, and null-checked if they are references, even if
978 * they can never be null under current usages. Usually,
979 * computations (held in local variables) are defined as soon as
980 * logically enabled, sometimes to convince compilers that they
981 * may be performed despite memory ordering constraints. Array
982 * accesses using masked indices include checks (that are always
983 * true) that the array length is non-zero to avoid compilers
984 * inserting more expensive traps. This is usually done in a
985 * "C"-like style of listing declarations at the heads of methods
986 * or blocks, and using inline assignments on first encounter.
987 * Nearly all explicit checks lead to bypass/return, not exception
988 * throws, because they may legitimately arise during shutdown. A
989 * few unusual loop constructions encourage (with varying
990 * effectiveness) JVMs about where (not) to place safepoints. All
991 * public methods screen arguments (mainly null checks) before
992 * creating or executing tasks.
993 *
994 * There is a lot of representation-level coupling among classes
995 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
996 * fields of WorkQueue maintain data structures managed by
997 * ForkJoinPool, so are directly accessed. There is little point
998 * trying to reduce this, since any associated future changes in
999 * representations will need to be accompanied by algorithmic
1000 * changes anyway. Several methods intrinsically sprawl because
1001 * they must accumulate sets of consistent reads of fields held in
1002 * local variables. Some others are artificially broken up to
1003 * reduce producer/consumer imbalances due to dynamic compilation.
1004 * There are also other coding oddities (including several
1005 * unnecessary-looking hoisted null checks) that help some methods
1006 * perform reasonably even when interpreted (not compiled).
1007 *
1008 * The order of declarations in this file is (with a few exceptions):
1009 * (1) Static configuration constants
1010 * (2) Static utility functions
1011 * (3) Nested (static) classes
1012 * (4) Fields, along with constants used when unpacking some of them
1013 * (5) Internal control methods
1014 * (6) Callbacks and other support for ForkJoinTask methods
1015 * (7) Exported methods
1016 * (8) Static block initializing statics in minimally dependent order
1017 *
1018 */
1019
1020 // static configuration constants
1021
1022 /**
1023 * Default idle timeout value (in milliseconds) for idle threads
1024 * to park waiting for new work before terminating.
1025 */
1026 static final long DEFAULT_KEEPALIVE = 60_000L;
1027
1028 /**
1029 * Undershoot tolerance for idle timeouts, also serving as the
1030 * minimum allowed timeout value.
1031 */
1032 static final long TIMEOUT_SLOP = 20L;
1033
1034 /**
1035 * The default value for common pool maxSpares. Overridable using
1036 * the "java.util.concurrent.ForkJoinPool.common.maximumSpares"
1037 * system property. The default value is far in excess of normal
1038 * requirements, but also far short of maximum capacity and typical OS
1039 * thread limits, so allows JVMs to catch misuse/abuse before
1040 * running out of resources needed to do so.
1041 */
1042 static final int DEFAULT_COMMON_MAX_SPARES = 256;
1043
1044 /**
1045 * Initial capacity of work-stealing queue array.
1046 * Must be a power of two, at least 2. See above.
1047 */
1048 static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
1049
1050 // conversions among short, int, long
1051 static final int SMASK = 0xffff; // (unsigned) short bits
1052 static final long LMASK = 0xffffffffL; // lower 32 bits of long
1053 static final long UMASK = ~LMASK; // upper 32 bits
1054
1055 // masks and sentinels for queue indices
1056 static final int MAX_CAP = 0x7fff; // max # workers
1057 static final int EXTERNAL_ID_MASK = 0x3ffe; // max external queue id
1058 static final int INVALID_ID = 0x4000; // unused external queue id
1059
1060 // pool.runState bits
1061 static final long STOP = 1L << 0; // terminating
1062 static final long SHUTDOWN = 1L << 1; // terminate when quiescent
1063 static final long CLEANED = 1L << 2; // stopped and queues cleared
1064 static final long TERMINATED = 1L << 3; // only set if STOP also set
1065 static final long RS_LOCK = 1L << 4; // lowest seqlock bit
1066
1067 // spin/sleep limits for runState locking and elsewhere
1068 static final int SPIN_WAITS = 1 << 7; // max calls to onSpinWait
1069 static final int MIN_SLEEP = 1 << 10; // approx 1 usec as nanos
1070 static final int MAX_SLEEP = 1 << 20; // approx 1 sec as nanos
1071
1072 // {pool, workQueue} config bits
1073 static final int FIFO = 1 << 0; // fifo queue or access mode
1074 static final int CLEAR_TLS = 1 << 1; // set for Innocuous workers
1075 static final int PRESET_SIZE = 1 << 2; // size was set by property
1076
1077 // others
1078 static final int DROPPED = 1 << 16; // removed from ctl counts
1079 static final int UNCOMPENSATE = 1 << 16; // tryCompensate return
1080 static final int IDLE = 1 << 16; // phase seqlock/version count
1081 static final int MIN_QUEUES_SIZE = 1 << 4; // ensure external slots
1082
1083 /*
1084 * Bits and masks for ctl and bounds are packed with 4 16 bit subfields:
1085 * RC: Number of released (unqueued) workers
1086 * TC: Number of total workers
1087 * SS: version count and status of top waiting thread
1088 * ID: poolIndex of top of Treiber stack of waiters
1089 *
1090 * When convenient, we can extract the lower 32 stack top bits
1091 * (including version bits) as sp=(int)ctl. When sp is non-zero,
1092 * there are waiting workers. Count fields may be transiently
1093 * negative during termination because of out-of-order updates.
1094 * To deal with this, we use casts in and out of "short" and/or
1095 * signed shifts to maintain signedness. Because it occupies
1096 * uppermost bits, we can add one release count using getAndAdd of
1097 * RC_UNIT, rather than CAS, when returning from a blocked join.
1098 * Other updates of multiple subfields require CAS.
1099 */
1100
1101 // Release counts
1102 static final int RC_SHIFT = 48;
1103 static final long RC_UNIT = 0x0001L << RC_SHIFT;
1104 static final long RC_MASK = 0xffffL << RC_SHIFT;
1105 // Total counts
1106 static final int TC_SHIFT = 32;
1107 static final long TC_UNIT = 0x0001L << TC_SHIFT;
1108 static final long TC_MASK = 0xffffL << TC_SHIFT;
1109
1110 /*
1111 * All atomic operations on task arrays (queues) use Unsafe
1112 * operations that take array offsets versus indices, based on
1113 * array base and shift constants established during static
1114 * initialization.
1115 */
1116 static final long ABASE;
1117 static final int ASHIFT;
1118
1119 // Static utilities
1120
1121 /**
1122 * Returns the array offset corresponding to the given index for
1123 * Unsafe task queue operations
1124 */
1125 static long slotOffset(int index) {
1126 return ((long)index << ASHIFT) + ABASE;
1127 }
1128
1129 // Nested classes
1130
1131 /**
1132 * Factory for creating new {@link ForkJoinWorkerThread}s.
1133 * A {@code ForkJoinWorkerThreadFactory} must be defined and used
1134 * for {@code ForkJoinWorkerThread} subclasses that extend base
1135 * functionality or initialize threads with different contexts.
1136 */
1137 public static interface ForkJoinWorkerThreadFactory {
1138 /**
1139 * Returns a new worker thread operating in the given pool.
1140 * Returning null or throwing an exception may result in tasks
1141 * never being executed. If this method throws an exception,
1142 * it is relayed to the caller of the method (for example
1143 * {@code execute}) causing attempted thread creation. If this
1144 * method returns null or throws an exception, it is not
1145 * retried until the next attempted creation (for example
1146 * another call to {@code execute}).
1147 *
1148 * @param pool the pool this thread works in
1149 * @return the new worker thread, or {@code null} if the request
1150 * to create a thread is rejected
1151 * @throws NullPointerException if the pool is null
1152 */
1153 public ForkJoinWorkerThread newThread(ForkJoinPool pool);
1154 }
1155
1156 /**
1157 * Default ForkJoinWorkerThreadFactory implementation; creates a
1158 * new ForkJoinWorkerThread using the system class loader as the
1159 * thread context class loader.
1160 */
1161 static final class DefaultForkJoinWorkerThreadFactory
1162 implements ForkJoinWorkerThreadFactory {
1163 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
1164 return ((pool.workerNamePrefix == null) ? // is commonPool
1165 new ForkJoinWorkerThread.InnocuousForkJoinWorkerThread(pool) :
1166 new ForkJoinWorkerThread(null, pool, true, false));
1167 }
1168 }
1169
1170 /**
1171 * Queues supporting work-stealing as well as external task
1172 * submission. See above for descriptions and algorithms.
1173 */
1174 static final class WorkQueue {
1175 // fields declared in order of their likely layout on most VMs
1176 final ForkJoinWorkerThread owner; // null if shared
1177 ForkJoinTask<?>[] array; // the queued tasks; power of 2 size
1178 int base; // index of next slot for poll
1179 final int config; // mode bits
1180
1181 @jdk.internal.vm.annotation.Contended("t") // segregate
1182 int top; // index of next slot for push
1183
1184 // fields otherwise causing more unnecessary false-sharing cache misses
1185 @jdk.internal.vm.annotation.Contended("w")
1186 volatile int phase; // versioned active status
1187 @jdk.internal.vm.annotation.Contended("w")
1188 int stackPred; // pool stack (ctl) predecessor link
1189 @jdk.internal.vm.annotation.Contended("w")
1190 volatile int parking; // nonzero if parked in awaitWork
1191 @jdk.internal.vm.annotation.Contended("w")
1192 volatile int source; // source queue id (or DROPPED)
1193 @jdk.internal.vm.annotation.Contended("w")
1194 int nsteals; // number of steals from other queues
1195
1196 // Support for atomic operations
1197 private static final Unsafe U;
1198 private static final long PHASE;
1199 private static final long BASE;
1200 private static final long TOP;
1201 private static final long ARRAY;
1202
1203 final void updateBase(int v) {
1204 U.putIntVolatile(this, BASE, v);
1205 }
1206 final void updateTop(int v) {
1207 U.putIntOpaque(this, TOP, v);
1208 }
1209 final void updateArray(ForkJoinTask<?>[] a) {
1210 U.getAndSetReference(this, ARRAY, a);
1211 }
1212 final void unlockPhase() {
1213 U.getAndAddInt(this, PHASE, IDLE);
1214 }
1215 final boolean tryLockPhase() { // seqlock acquire
1216 int p;
1217 return (((p = phase) & IDLE) != 0 &&
1218 U.compareAndSetInt(this, PHASE, p, p + IDLE));
1219 }
1220
1221 /**
1222 * Constructor. For internal queues, most fields are initialized
1223 * upon thread start in pool.registerWorker.
1224 */
1225 WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
1226 boolean clearThreadLocals) {
1227 this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg;
1228 if ((this.owner = owner) == null) {
1229 array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1230 phase = id | IDLE;
1231 }
1232 }
1233
1234 /**
1235 * Returns an exportable index (used by ForkJoinWorkerThread).
1236 */
1237 final int getPoolIndex() {
1238 return (phase & 0xffff) >>> 1; // ignore odd/even tag bit
1239 }
1240
1241 /**
1242 * Returns the approximate number of tasks in the queue.
1243 */
1244 final int queueSize() {
1245 int unused = phase; // for ordering effect
1246 return Math.max(top - base, 0); // ignore transient negative
1247 }
1248
1249 /**
1250 * Pushes a task. Called only by owner or if already locked
1251 *
1252 * @param task the task; no-op if null
1253 * @param pool the pool to signal if was previously empty, else null
1254 * @param internal if caller owns this queue
1255 * @throws RejectedExecutionException if array could not be resized
1256 */
1257 final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
1258 int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a, na;
1259 if ((a = array) != null && (cap = a.length) > 0) { // else disabled
1260 int k = (m = cap - 1) & s;
1261 if ((room = m - (s - b)) >= 0) {
1262 top = s + 1;
1263 long pos = slotOffset(k);
1264 if (!internal)
1265 U.putReference(a, pos, task); // inside lock
1266 else
1267 U.getAndSetReference(a, pos, task); // fully fenced
1268 if (room == 0 && (na = growArray(a, cap, s)) != null)
1269 k = ((a = na).length - 1) & s; // resize
1270 }
1271 if (!internal)
1272 unlockPhase();
1273 if (room < 0)
1274 throw new RejectedExecutionException("Queue capacity exceeded");
1275 if (pool != null &&
1276 (room == 0 ||
1277 U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null))
1278 pool.signalWork(a, k); // may have appeared empty
1279 }
1280 }
1281
1282 /**
1283 * Resizes the queue array unless out of memory.
1284 * @param a old array
1285 * @param cap old array capacity
1286 * @param s current top
1287 * @return new array, or null on failure
1288 */
1289 private ForkJoinTask<?>[] growArray(ForkJoinTask<?>[] a, int cap, int s) {
1290 int newCap = (cap >= 1 << 16) ? cap << 1 : cap << 2;
1291 ForkJoinTask<?>[] newArray = null;
1292 if (a != null && a.length == cap && cap > 0 && newCap > 0) {
1293 try {
1294 newArray = new ForkJoinTask<?>[newCap];
1295 } catch (OutOfMemoryError ex) {
1296 }
1297 if (newArray != null) { // else throw on next push
1298 int mask = cap - 1, newMask = newCap - 1;
1299 for (int k = s, j = cap; j > 0; --j, --k) {
1300 ForkJoinTask<?> u; // poll old, push to new
1301 if ((u = (ForkJoinTask<?>)U.getAndSetReference(
1302 a, slotOffset(k & mask), null)) == null)
1303 break; // lost to pollers
1304 newArray[k & newMask] = u;
1305 }
1306 updateArray(newArray); // fully fenced
1307 }
1308 }
1309 return newArray;
1310 }
1311
1312 /**
1313 * Takes next task, if one exists, in lifo order.
1314 */
1315 private ForkJoinTask<?> localPop() {
1316 ForkJoinTask<?> t = null;
1317 int s = top - 1, cap; long k; ForkJoinTask<?>[] a;
1318 if ((a = array) != null && (cap = a.length) > 0 &&
1319 U.getReference(a, k = slotOffset((cap - 1) & s)) != null &&
1320 (t = (ForkJoinTask<?>)U.getAndSetReference(a, k, null)) != null)
1321 updateTop(s);
1322 return t;
1323 }
1324
1325 /**
1326 * Takes next task, if one exists, in fifo order.
1327 */
1328 private ForkJoinTask<?> localPoll() {
1329 ForkJoinTask<?> t = null;
1330 int p = top, cap; ForkJoinTask<?>[] a;
1331 if ((a = array) != null && (cap = a.length) > 0) {
1332 for (int b = base; p - b > 0; ) {
1333 int nb = b + 1;
1334 long k = slotOffset((cap - 1) & b);
1335 if (U.getReference(a, k) == null) {
1336 if (nb == p)
1337 break; // else base is lagging
1338 while (b == (b = U.getIntAcquire(this, BASE)))
1339 Thread.onSpinWait(); // spin to reduce memory traffic
1340 }
1341 else if ((t = (ForkJoinTask<?>)
1342 U.getAndSetReference(a, k, null)) != null) {
1343 updateBase(nb);
1344 break;
1345 }
1346 else
1347 b = base;
1348 }
1349 }
1350 return t;
1351 }
1352
1353 /**
1354 * Takes next task, if one exists, using configured mode.
1355 */
1356 final ForkJoinTask<?> nextLocalTask() {
1357 return (config & FIFO) == 0 ? localPop() : localPoll();
1358 }
1359
1360 /**
1361 * Pops the given task only if it is at the current top.
1362 * @param task the task. Caller must ensure non-null.
1363 * @param internal if caller owns this queue
1364 */
1365 final boolean tryUnpush(ForkJoinTask<?> task, boolean internal) {
1366 boolean taken = false;
1367 ForkJoinTask<?>[] a = array;
1368 int p = top, s = p - 1, cap; long k;
1369 if (a != null && (cap = a.length) > 0 &&
1370 U.getReference(a, k = slotOffset((cap - 1) & s)) == task &&
1371 (internal || tryLockPhase())) {
1372 if (top == p && U.compareAndSetReference(a, k, task, null)) {
1373 taken = true;
1374 updateTop(s);
1375 }
1376 if (!internal)
1377 unlockPhase();
1378 }
1379 return taken;
1380 }
1381
1382 /**
1383 * Returns next task, if one exists, in order specified by mode.
1384 */
1385 final ForkJoinTask<?> peek() {
1386 ForkJoinTask<?>[] a = array;
1387 int b = base, cfg = config, p = top, cap;
1388 if (p != b && a != null && (cap = a.length) > 0) {
1389 if ((cfg & FIFO) == 0)
1390 return a[(cap - 1) & (p - 1)];
1391 else { // skip over in-progress removals
1392 ForkJoinTask<?> t;
1393 for ( ; p - b > 0; ++b) {
1394 if ((t = a[(cap - 1) & b]) != null)
1395 return t;
1396 }
1397 }
1398 }
1399 return null;
1400 }
1401
1402 /**
1403 * Polls for a task. Used only by non-owners.
1404 */
1405 final ForkJoinTask<?> poll() {
1406 for (int pb = -1, b; ; pb = b) { // track progress
1407 ForkJoinTask<?> t; int cap, nb; long k; ForkJoinTask<?>[] a;
1408 if ((a = array) == null || (cap = a.length) <= 0)
1409 break;
1410 t = (ForkJoinTask<?>)U.getReferenceAcquire(
1411 a, k = slotOffset((cap - 1) & (b = base)));
1412 Object u = U.getReference( // next slot
1413 a, slotOffset((cap - 1) & (nb = b + 1)));
1414 if (base != b) // inconsistent
1415 ;
1416 else if (t == null) {
1417 if (u == null && top - b <= 0)
1418 break; // empty
1419 if (pb == b)
1420 Thread.onSpinWait(); // stalled
1421 }
1422 else if (U.compareAndSetReference(a, k, t, null)) {
1423 updateBase(nb);
1424 return t;
1425 }
1426 }
1427 return null;
1428 }
1429
1430 // specialized execution methods
1431
1432 /**
1433 * Runs the given task, as well as remaining local tasks, and
1434 * those from the given queue that can be polled without interference.
1435 */
1436 final void topLevelExec(ForkJoinTask<?> task, WorkQueue q, int fifo) {
1437 if (task != null && q != null) {
1438 int stolen = 1;
1439 for (;;) {
1440 task.doExec();
1441 task = null;
1442 int p = top, cap; ForkJoinTask<?>[] a;
1443 if ((a = array) == null || (cap = a.length) <= 0)
1444 break;
1445 if (fifo == 0) { // specialized localPop
1446 int s = p - 1;
1447 long k = slotOffset((cap - 1) & s);
1448 if (U.getReference(a, k) != null &&
1449 (task = (ForkJoinTask<?>)
1450 U.getAndSetReference(a, k, null)) != null)
1451 top = s;
1452 } else { // specialized localPoll
1453 for (int b = base; p - b > 0; ) {
1454 int nb = b + 1;
1455 long k = slotOffset((cap - 1) & b);
1456 if (U.getReference(a, k) != null &&
1457 (task = (ForkJoinTask<?>)
1458 U.getAndSetReference(a, k, null)) != null) {
1459 base = nb;
1460 break;
1461 }
1462 if (nb == p)
1463 break;
1464 while (b == (b = U.getIntAcquire(this, BASE)))
1465 Thread.onSpinWait();
1466 }
1467 }
1468 if (task == null) { // one-shot steal attempt
1469 int qb = q.base, qcap; ForkJoinTask<?>[] qa; long bp;
1470 if ((qa = q.array) != null && (qcap = qa.length) > 0 &&
1471 (task = (ForkJoinTask<?>)U.getReferenceAcquire(
1472 qa, bp = slotOffset((qcap - 1) & qb))) != null &&
1473 q.base == qb &&
1474 U.compareAndSetReference(qa, bp, task, null)) {
1475 q.base = qb + 1;
1476 ++stolen;
1477 }
1478 else
1479 break;
1480 }
1481 }
1482 nsteals += stolen;
1483 ForkJoinWorkerThread o;
1484 if ((config & CLEAR_TLS) != 0 && (o = owner) != null)
1485 o.resetThreadLocals();
1486 }
1487 }
1488
1489 /**
1490 * Deep form of tryUnpush: Traverses from top and removes and
1491 * runs task if present.
1492 */
1493 final void tryRemoveAndExec(ForkJoinTask<?> task, boolean internal) {
1494 ForkJoinTask<?>[] a = array;
1495 int b = base, p = top, s = p - 1, d = p - b, cap;
1496 if (a != null && (cap = a.length) > 0) {
1497 for (int m = cap - 1, i = s; d > 0; --i, --d) {
1498 long k; boolean taken;
1499 ForkJoinTask<?> t = (ForkJoinTask<?>)U.getReference(
1500 a, k = slotOffset(i & m));
1501 if (t == null)
1502 break;
1503 if (t == task) {
1504 if (!internal && !tryLockPhase())
1505 break; // fail if locked
1506 if (taken =
1507 (top == p &&
1508 U.compareAndSetReference(a, k, task, null))) {
1509 if (i == s) // act as pop
1510 updateTop(s);
1511 else if (i == base) // act as poll
1512 updateBase(i + 1);
1513 else { // swap with top
1514 U.putReferenceVolatile(
1515 a, k, (ForkJoinTask<?>)
1516 U.getAndSetReference(
1517 a, slotOffset(s & m), null));
1518 updateTop(s);
1519 }
1520 }
1521 if (!internal)
1522 unlockPhase();
1523 if (taken)
1524 task.doExec();
1525 break;
1526 }
1527 }
1528 }
1529 }
1530
1531 /**
1532 * Tries to pop and run tasks within the target's computation
1533 * until done, not found, or limit exceeded.
1534 *
1535 * @param task root of computation
1536 * @param limit max runs, or zero for no limit
1537 * @return task status if known to be done
1538 */
1539 final int helpComplete(ForkJoinTask<?> task, boolean internal, int limit) {
1540 int status = 0;
1541 if (task != null) {
1542 outer: for (;;) {
1543 ForkJoinTask<?>[] a; boolean taken; Object o;
1544 int stat, p, s, cap;
1545 if ((stat = task.status) < 0) {
1546 status = stat;
1547 break;
1548 }
1549 if ((a = array) == null || (cap = a.length) <= 0)
1550 break;
1551 long k = slotOffset((cap - 1) & (s = (p = top) - 1));
1552 if (!((o = U.getReference(a, k)) instanceof CountedCompleter))
1553 break;
1554 CountedCompleter<?> t = (CountedCompleter<?>)o, f = t;
1555 for (int steps = cap;;) { // bound path
1556 if (f == task)
1557 break;
1558 if ((f = f.completer) == null || --steps == 0)
1559 break outer;
1560 }
1561 if (!internal && !tryLockPhase())
1562 break;
1563 if (taken =
1564 (top == p &&
1565 U.compareAndSetReference(a, k, t, null)))
1566 updateTop(s);
1567 if (!internal)
1568 unlockPhase();
1569 if (!taken)
1570 break;
1571 t.doExec();
1572 if (limit != 0 && --limit == 0)
1573 break;
1574 }
1575 }
1576 return status;
1577 }
1578
1579 /**
1580 * Tries to poll and run AsynchronousCompletionTasks until
1581 * none found or blocker is released
1582 *
1583 * @param blocker the blocker
1584 */
1585 final void helpAsyncBlocker(ManagedBlocker blocker) {
1586 for (;;) {
1587 ForkJoinTask<?> t; ForkJoinTask<?>[] a; int b, cap; long k;
1588 if ((a = array) == null || (cap = a.length) <= 0)
1589 break;
1590 t = (ForkJoinTask<?>)U.getReferenceAcquire(
1591 a, k = slotOffset((cap - 1) & (b = base)));
1592 if (t == null) {
1593 if (top - b <= 0)
1594 break;
1595 }
1596 else if (!(t instanceof CompletableFuture
1597 .AsynchronousCompletionTask))
1598 break;
1599 if (blocker != null && blocker.isReleasable())
1600 break;
1601 if (base == b && t != null &&
1602 U.compareAndSetReference(a, k, t, null)) {
1603 updateBase(b + 1);
1604 t.doExec();
1605 }
1606 }
1607 }
1608
1609 // misc
1610
1611 /**
1612 * Cancels all local tasks. Called only by owner.
1613 */
1614 final void cancelTasks() {
1615 for (ForkJoinTask<?> t; (t = localPop()) != null; ) {
1616 try {
1617 t.cancel(false);
1618 } catch (Throwable ignore) {
1619 }
1620 }
1621 }
1622
1623 /**
1624 * Returns true if internal and not known to be blocked.
1625 */
1626 final boolean isApparentlyUnblocked() {
1627 Thread wt; Thread.State s;
1628 return ((wt = owner) != null && (phase & IDLE) != 0 &&
1629 (s = wt.getState()) != Thread.State.BLOCKED &&
1630 s != Thread.State.WAITING &&
1631 s != Thread.State.TIMED_WAITING);
1632 }
1633
1634 static {
1635 U = Unsafe.getUnsafe();
1636 Class<WorkQueue> klass = WorkQueue.class;
1637 PHASE = U.objectFieldOffset(klass, "phase");
1638 BASE = U.objectFieldOffset(klass, "base");
1639 TOP = U.objectFieldOffset(klass, "top");
1640 ARRAY = U.objectFieldOffset(klass, "array");
1641 }
1642 }
1643
1644 // static fields (initialized in static initializer below)
1645
1646 /**
1647 * Creates a new ForkJoinWorkerThread. This factory is used unless
1648 * overridden in ForkJoinPool constructors.
1649 */
1650 public static final ForkJoinWorkerThreadFactory
1651 defaultForkJoinWorkerThreadFactory;
1652
1653 /**
1654 * Common (static) pool. Non-null for public use unless a static
1655 * construction exception, but internal usages null-check on use
1656 * to paranoically avoid potential initialization circularities
1657 * as well as to simplify generated code.
1658 */
1659 static final ForkJoinPool common;
1660
1661 /**
1662 * Sequence number for creating worker names
1663 */
1664 private static volatile int poolIds;
1665
1666 /**
1667 * For VirtualThread intrinsics
1668 */
1669 private static final JavaLangAccess JLA;
1670
1671 // fields declared in order of their likely layout on most VMs
1672 volatile CountDownLatch termination; // lazily constructed
1673 final Predicate<? super ForkJoinPool> saturate;
1674 final ForkJoinWorkerThreadFactory factory;
1675 final UncaughtExceptionHandler ueh; // per-worker UEH
1676 final SharedThreadContainer container;
1677 final String workerNamePrefix; // null for common pool
1678 final String poolName;
1679 volatile DelayScheduler delayScheduler; // lazily constructed
1680 WorkQueue[] queues; // main registry
1681 volatile long runState; // versioned, lockable
1682 final long keepAlive; // milliseconds before dropping if idle
1683 final long config; // static configuration bits
1684 volatile long stealCount; // collects worker nsteals
1685 volatile long threadIds; // for worker thread names
1686
1687 @jdk.internal.vm.annotation.Contended("fjpctl") // segregate
1688 volatile long ctl; // main pool control
1689 @jdk.internal.vm.annotation.Contended("fjpctl") // colocate
1690 int parallelism; // target number of workers
1691
1692 // Support for atomic operations
1693 private static final Unsafe U;
1694 private static final long CTL;
1695 private static final long RUNSTATE;
1696 private static final long PARALLELISM;
1697 private static final long THREADIDS;
1698 private static final long TERMINATION;
1699 private static final Object POOLIDS_BASE;
1700 private static final long POOLIDS;
1701
1702 private boolean compareAndSetCtl(long c, long v) {
1703 return U.compareAndSetLong(this, CTL, c, v);
1704 }
1705 private long compareAndExchangeCtl(long c, long v) {
1706 return U.compareAndExchangeLong(this, CTL, c, v);
1707 }
1708 private long getAndAddCtl(long v) {
1709 return U.getAndAddLong(this, CTL, v);
1710 }
1711 private long incrementThreadIds() {
1712 return U.getAndAddLong(this, THREADIDS, 1L);
1713 }
1714 private static int getAndAddPoolIds(int x) {
1715 return U.getAndAddInt(POOLIDS_BASE, POOLIDS, x);
1716 }
1717 private int getAndSetParallelism(int v) {
1718 return U.getAndSetInt(this, PARALLELISM, v);
1719 }
1720 private int getParallelismOpaque() {
1721 return U.getIntOpaque(this, PARALLELISM);
1722 }
1723 private CountDownLatch cmpExTerminationSignal(CountDownLatch x) {
1724 return (CountDownLatch)
1725 U.compareAndExchangeReference(this, TERMINATION, null, x);
1726 }
1727
1728 // runState operations
1729
1730 private long getAndBitwiseOrRunState(long v) { // for status bits
1731 return U.getAndBitwiseOrLong(this, RUNSTATE, v);
1732 }
1733 private boolean casRunState(long c, long v) {
1734 return U.compareAndSetLong(this, RUNSTATE, c, v);
1735 }
1736 private void unlockRunState() { // increment lock bit
1737 U.getAndAddLong(this, RUNSTATE, RS_LOCK);
1738 }
1739 private long lockRunState() { // lock and return current state
1740 long s, u; // locked when RS_LOCK set
1741 if (((s = runState) & RS_LOCK) == 0L && casRunState(s, u = s + RS_LOCK))
1742 return u;
1743 else
1744 return spinLockRunState();
1745 }
1746 private long spinLockRunState() { // spin/sleep
1747 for (int waits = 0;;) {
1748 long s, u;
1749 if (((s = runState) & RS_LOCK) == 0L) {
1750 if (casRunState(s, u = s + RS_LOCK))
1751 return u;
1752 waits = 0;
1753 } else if (waits < SPIN_WAITS) {
1754 ++waits;
1755 Thread.onSpinWait();
1756 } else {
1757 if (waits < MIN_SLEEP)
1758 waits = MIN_SLEEP;
1759 LockSupport.parkNanos(this, (long)waits);
1760 if (waits < MAX_SLEEP)
1761 waits <<= 1;
1762 }
1763 }
1764 }
1765
1766 static boolean poolIsStopping(ForkJoinPool p) { // Used by ForkJoinTask
1767 return p != null && (p.runState & STOP) != 0L;
1768 }
1769
1770 // Creating, registering, and deregistering workers
1771
1772 /**
1773 * Tries to construct and start one worker. Assumes that total
1774 * count has already been incremented as a reservation. Invokes
1775 * deregisterWorker on any failure.
1776 *
1777 * @return true if successful
1778 */
1779 private boolean createWorker() {
1780 ForkJoinWorkerThreadFactory fac = factory;
1781 SharedThreadContainer ctr = container;
1782 Throwable ex = null;
1783 ForkJoinWorkerThread wt = null;
1784 try {
1785 if ((runState & STOP) == 0L && // avoid construction if terminating
1786 fac != null && (wt = fac.newThread(this)) != null) {
1787 if (ctr != null)
1788 ctr.start(wt);
1789 else
1790 wt.start();
1791 return true;
1792 }
1793 } catch (Throwable rex) {
1794 ex = rex;
1795 }
1796 deregisterWorker(wt, ex);
1797 return false;
1798 }
1799
1800 /**
1801 * Provides a name for ForkJoinWorkerThread constructor.
1802 */
1803 final String nextWorkerThreadName() {
1804 String prefix = workerNamePrefix;
1805 long tid = incrementThreadIds() + 1L;
1806 if (prefix == null) // commonPool has no prefix
1807 prefix = "ForkJoinPool.commonPool-worker-";
1808 return prefix.concat(Long.toString(tid));
1809 }
1810
1811 /**
1812 * Finishes initializing and records internal queue.
1813 *
1814 * @param w caller's WorkQueue
1815 */
1816 final void registerWorker(WorkQueue w) {
1817 if (w != null) {
1818 w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1819 ThreadLocalRandom.localInit();
1820 int seed = w.stackPred = ThreadLocalRandom.getProbe();
1821 int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag
1822 int id = ((seed << 1) | 1) & SMASK; // base of linear-probe-like scan
1823 long stop = lockRunState() & STOP;
1824 try {
1825 WorkQueue[] qs; int n;
1826 if (stop == 0L && (qs = queues) != null && (n = qs.length) > 0) {
1827 for (int k = n, m = n - 1; ; id += 2) {
1828 if (qs[id &= m] == null)
1829 break;
1830 if ((k -= 2) <= 0) {
1831 id |= n;
1832 break;
1833 }
1834 }
1835 w.phase = id | phaseSeq; // now publishable
1836 if (id < n)
1837 qs[id] = w;
1838 else { // expand
1839 int an = n << 1, am = an - 1;
1840 WorkQueue[] as = new WorkQueue[an];
1841 as[id & am] = w;
1842 for (int j = 1; j < n; j += 2)
1843 as[j] = qs[j];
1844 for (int j = 0; j < n; j += 2) {
1845 WorkQueue q; // shared queues may move
1846 if ((q = qs[j]) != null)
1847 as[q.phase & EXTERNAL_ID_MASK & am] = q;
1848 }
1849 U.storeFence(); // fill before publish
1850 queues = as;
1851 }
1852 }
1853 } finally {
1854 unlockRunState();
1855 }
1856 }
1857 }
1858
1859 /**
1860 * Final callback from terminating worker, as well as upon failure
1861 * to construct or start a worker. Removes record of worker from
1862 * array, and adjusts counts. If pool is shutting down, tries to
1863 * complete termination.
1864 *
1865 * @param wt the worker thread, or null if construction failed
1866 * @param ex the exception causing failure, or null if none
1867 */
1868 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1869 WorkQueue w = null; // null if not created
1870 int phase = 0; // 0 if not registered
1871 if (wt != null && (w = wt.workQueue) != null &&
1872 (phase = w.phase) != 0 && (phase & IDLE) != 0)
1873 releaseWaiters(); // ensure released
1874 if (w == null || w.source != DROPPED) {
1875 long c = ctl; // decrement counts
1876 do {} while (c != (c = compareAndExchangeCtl(
1877 c, ((RC_MASK & (c - RC_UNIT)) |
1878 (TC_MASK & (c - TC_UNIT)) |
1879 (LMASK & c)))));
1880 }
1881 if (phase != 0 && w != null) { // remove index unless terminating
1882 long ns = w.nsteals & 0xffffffffL;
1883 if ((runState & STOP) == 0L) {
1884 WorkQueue[] qs; int n, i;
1885 if ((lockRunState() & STOP) == 0L &&
1886 (qs = queues) != null && (n = qs.length) > 0 &&
1887 qs[i = phase & SMASK & (n - 1)] == w) {
1888 qs[i] = null;
1889 stealCount += ns; // accumulate steals
1890 }
1891 unlockRunState();
1892 }
1893 }
1894 if ((tryTerminate(false, false) & STOP) == 0L &&
1895 phase != 0 && w != null && w.source != DROPPED) {
1896 w.cancelTasks(); // clean queue
1897 signalWork(null, 0); // possibly replace
1898 }
1899 if (ex != null)
1900 ForkJoinTask.rethrow(ex);
1901 }
1902
1903 /**
1904 * Releases an idle worker, or creates one if not enough exist,
1905 * giving up if array a is nonnull and task at a[k] already taken.
1906 */
1907 final void signalWork(ForkJoinTask<?>[] a, int k) {
1908 int pc = parallelism;
1909 for (long c = ctl;;) {
1910 WorkQueue[] qs = queues;
1911 long ac = (c + RC_UNIT) & RC_MASK, nc;
1912 int sp = (int)c, i = sp & SMASK;
1913 if ((short)(c >>> RC_SHIFT) >= pc)
1914 break;
1915 if (qs == null)
1916 break;
1917 if (qs.length <= i)
1918 break;
1919 WorkQueue w = qs[i], v = null;
1920 if (sp == 0) {
1921 if ((short)(c >>> TC_SHIFT) >= pc)
1922 break;
1923 nc = ((c + TC_UNIT) & TC_MASK) | ac;
1924 }
1925 else if ((v = w) == null)
1926 break;
1927 else
1928 nc = (v.stackPred & LMASK) | (c & TC_MASK) | ac;
1929 if (a != null && k < a.length && k >= 0 && a[k] == null)
1930 break;
1931 if (c == (c = ctl) && c == (c = compareAndExchangeCtl(c, nc))) {
1932 if (v == null)
1933 createWorker();
1934 else {
1935 v.phase = sp;
1936 if (v.parking != 0)
1937 U.unpark(v.owner);
1938 }
1939 break;
1940 }
1941 }
1942 }
1943
1944 /**
1945 * Releases all waiting workers. Called only during shutdown.
1946 */
1947 private void releaseWaiters() {
1948 for (long c = ctl;;) {
1949 WorkQueue[] qs; WorkQueue v; int sp, i;
1950 if ((sp = (int)c) == 0 || (qs = queues) == null ||
1951 qs.length <= (i = sp & SMASK) || (v = qs[i]) == null)
1952 break;
1953 if (c == (c = compareAndExchangeCtl(
1954 c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) |
1955 (v.stackPred & LMASK))))) {
1956 v.phase = sp;
1957 if (v.parking != 0)
1958 U.unpark(v.owner);
1959 }
1960 }
1961 }
1962
1963 /**
1964 * Internal version of isQuiescent and related functionality.
1965 * @return positive if stopping, nonnegative if terminating or all
1966 * workers are inactive and submission queues are empty and
1967 * unlocked; if so, setting STOP if shutdown is enabled
1968 */
1969 private int quiescent() {
1970 for (;;) {
1971 long phaseSum = 0L;
1972 boolean swept = false;
1973 for (long e, prevRunState = 0L; ; prevRunState = e) {
1974 DelayScheduler ds;
1975 long c = ctl;
1976 if (((e = runState) & STOP) != 0L)
1977 return 1; // terminating
1978 else if ((c & RC_MASK) > 0L)
1979 return -1; // at least one active
1980 else if (!swept || e != prevRunState || (e & RS_LOCK) != 0) {
1981 long sum = c;
1982 WorkQueue[] qs = queues;
1983 int n = (qs == null) ? 0 : qs.length;
1984 for (int i = 0; i < n; ++i) { // scan queues
1985 WorkQueue q;
1986 if ((q = qs[i]) != null) {
1987 int p = q.phase, s = q.top, b = q.base;
1988 sum += (p & 0xffffffffL) | ((long)b << 32);
1989 if ((p & IDLE) == 0 || s - b > 0)
1990 return -1;
1991 }
1992 }
1993 swept = (phaseSum == (phaseSum = sum));
1994 }
1995 else if ((e & SHUTDOWN) == 0)
1996 return 0;
1997 else if ((ds = delayScheduler) != null && !ds.canShutDown())
1998 return 0;
1999 else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP))
2000 return 1; // enable termination
2001 else
2002 break; // restart
2003 }
2004 }
2005 }
2006
2007 /**
2008 * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
2009 * See above for explanation.
2010 *
2011 * @param w caller's WorkQueue (may be null on failed initialization)
2012 */
2013 final void runWorker(WorkQueue w) {
2014 if (w != null) {
2015 WorkQueue[] qs;
2016 int phase = w.phase, r = w.stackPred; // seed from registerWorker
2017 int fifo = (int)config & FIFO, rescans = 0, n;
2018 while ((runState & STOP) == 0L && (qs = queues) != null &&
2019 (n = qs.length) > 0) {
2020 int i = r, step = (r >>> 16) | 1;
2021 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
2022 scan: for (int j = n; j != 0; --j, i += step) {
2023 WorkQueue q; int qid;
2024 if ((q = qs[qid = i & (n - 1)]) != null) {
2025 ForkJoinTask<?>[] a; int cap; // poll queue
2026 while ((a = q.array) != null && (cap = a.length) > 0) {
2027 int b, nb, nk; long bp; ForkJoinTask<?> t;
2028 t = (ForkJoinTask<?>)U.getReferenceAcquire(
2029 a, bp = slotOffset((cap - 1) & (b = q.base)));
2030 if (q.base != b)
2031 continue; // inconsistent
2032 long np = slotOffset(nk = (nb = b + 1) & (cap - 1));
2033 if (t == null) {
2034 if (q.array != a) // resized
2035 continue;
2036 if (rescans > 0) // ran or stalled
2037 break scan;
2038 if (U.getReference(a, np) != null ||
2039 (rescans < 0 && q.top - b > 0)) {
2040 rescans = 1; // may be stalled
2041 continue;
2042 }
2043 if (U.getReference(a, bp) != null)
2044 continue; // stale
2045 break; // probably empty
2046 }
2047 if ((phase & IDLE) != 0 &&
2048 ((phase = tryReactivate(w, phase)) & IDLE) != 0) {
2049 rescans = 1; // can't take yet
2050 break scan;
2051 }
2052 if (U.getReference(a, bp) == t &&
2053 U.compareAndSetReference(a, bp, t, null)) {
2054 q.base = nb;
2055 Object nt = U.getReferenceAcquire(a, np);
2056 w.source = qid;
2057 rescans = 1;
2058 if (nt != null && // confirm a[nk]
2059 U.getReferenceAcquire(a, np) == nt)
2060 signalWork(a, nk); // propagate
2061 w.topLevelExec(t, q, fifo);
2062 }
2063 }
2064 }
2065 }
2066 int prev;
2067 if (rescans >= 0)
2068 --rescans;
2069 else if ((phase = deactivate(w, prev = phase)) == 0)
2070 break;
2071 else if (phase != prev)
2072 rescans = 0;
2073 }
2074 }
2075 }
2076
2077 /**
2078 * If active, tries to deactivate worker, keeping active on
2079 * contention; else awaits signal or termination
2080 *
2081 * @param w the work queue
2082 * @param phase w's currently known phase
2083 * @return current phase or 0 on exit
2084 */
2085 private int deactivate(WorkQueue w, int phase) {
2086 if (w != null) { // always true; hoist checks
2087 if ((phase & IDLE) == 0) {
2088 int idlePhase = phase | IDLE;
2089 long pc = ctl, e;
2090 long qc = ((phase + (IDLE << 1)) & LMASK) | ((pc - RC_UNIT) & UMASK);
2091 w.stackPred = (int)pc; // set ctl stack link
2092 w.phase = idlePhase; // try to enqueue
2093 if (!compareAndSetCtl(pc, qc))
2094 w.phase = phase; // back out on contention
2095 else {
2096 phase = idlePhase;
2097 if ((qc & RC_MASK) <= 0L && ((e = runState) & SHUTDOWN) != 0L &&
2098 (e & STOP) == 0L)
2099 quiescent(); // may trigger quiescent termination
2100 }
2101 }
2102 else if ((runState & STOP) != 0L)
2103 phase = 0;
2104 else { // spin before blocking
2105 int activePhase = phase + IDLE;
2106 int noise = activePhase | (activePhase >>> 16);
2107 int spins = (SPIN_WAITS << 1) | (noise & (SPIN_WAITS - 1));
2108 while ((phase = w.phase) != activePhase && --spins != 0)
2109 Thread.onSpinWait();
2110 if (spins == 0 && awaitWork(w, phase = activePhase) != 0)
2111 phase = 0;
2112 }
2113 }
2114 return phase;
2115 }
2116
2117 /**
2118 * Reactivates worker w if it is currently top of ctl stack
2119 *
2120 * @param w the work queue
2121 * @param phase w's currently known (idle) phase
2122 * @return currently known phase on exit
2123 */
2124 private int tryReactivate(WorkQueue w, int phase) {
2125 int activePhase = phase + IDLE; long c;
2126 if (w != null && (phase = w.phase) != activePhase &&
2127 (int)(c = ctl) == activePhase &&
2128 compareAndSetCtl(c, (w.stackPred & LMASK) | ((c + RC_UNIT) & UMASK)))
2129 phase = w.phase = activePhase;
2130 return phase;
2131 }
2132
2133 /**
2134 * Awaits signal or termination.
2135 *
2136 * @param w the work queue
2137 * @param activePhase w's next active phase
2138 * @return 0 if now active
2139 */
2140 private int awaitWork(WorkQueue w, int activePhase) {
2141 int idle = 1;
2142 if (w != null) { // always true; hoist checks
2143 long waitTime = (w.source == INVALID_ID) ? 0L : keepAlive;
2144 LockSupport.setCurrentBlocker(this);
2145 for (long deadline = 0L;;) {
2146 Thread.interrupted(); // clear status
2147 if ((runState & STOP) != 0L)
2148 break;
2149 if ((idle = w.phase - activePhase) == 0)
2150 break;
2151 boolean trimmable = false; // use timed wait if trimmable
2152 long d = 0L, c;
2153 if (((c = ctl) & RC_MASK) == 0L && (int)c == activePhase) {
2154 long now = System.currentTimeMillis();
2155 if (deadline == 0L)
2156 deadline = waitTime + now;
2157 if (deadline - now <= TIMEOUT_SLOP) {
2158 if (tryTrim(w, c, activePhase))
2159 break;
2160 continue; // lost race to trim
2161 }
2162 d = deadline;
2163 trimmable = true;
2164 }
2165 w.parking = 1; // enable unpark and recheck
2166 if ((idle = w.phase - activePhase) != 0)
2167 U.park(trimmable, d);
2168 w.parking = 0; // close unpark window
2169 if (idle == 0 || (idle = w.phase - activePhase) == 0)
2170 break;
2171 }
2172 LockSupport.setCurrentBlocker(null);
2173 }
2174 return idle;
2175 }
2176
2177 /**
2178 * Tries to remove and deregister worker after timeout, and release
2179 * another to do the same unless new tasks are found.
2180 */
2181 private boolean tryTrim(WorkQueue w, long c, int activePhase) {
2182 if (w != null) {
2183 int vp, i; WorkQueue[] vs; WorkQueue v;
2184 long nc = ((w.stackPred & LMASK) |
2185 ((RC_MASK & c) | (TC_MASK & (c - TC_UNIT))));
2186 if (compareAndSetCtl(c, nc)) {
2187 w.source = DROPPED;
2188 w.phase = activePhase;
2189 if ((vp = (int)nc) != 0 && (vs = queues) != null &&
2190 vs.length > (i = vp & SMASK) && (v = vs[i]) != null &&
2191 compareAndSetCtl( // try to wake up next waiter
2192 nc, ((v.stackPred & LMASK) |
2193 ((UMASK & (nc + RC_UNIT)) | (nc & TC_MASK))))) {
2194 v.source = INVALID_ID; // enable cascaded timeouts
2195 v.phase = vp;
2196 U.unpark(v.owner);
2197 }
2198 return true;
2199 }
2200 }
2201 return false;
2202 }
2203
2204 /**
2205 * Scans for and returns a polled task, if available. Used only
2206 * for untracked polls. Begins scan at a random index to avoid
2207 * systematic unfairness.
2208 *
2209 * @param submissionsOnly if true, only scan submission queues
2210 */
2211 private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
2212 if ((runState & STOP) == 0L) {
2213 WorkQueue[] qs; int n; WorkQueue q; ForkJoinTask<?> t;
2214 int r = ThreadLocalRandom.nextSecondarySeed();
2215 if (submissionsOnly) // even indices only
2216 r &= ~1;
2217 int step = (submissionsOnly) ? 2 : 1;
2218 if ((qs = queues) != null && (n = qs.length) > 0) {
2219 for (int i = n; i > 0; i -= step, r += step) {
2220 if ((q = qs[r & (n - 1)]) != null &&
2221 (t = q.poll()) != null)
2222 return t;
2223 }
2224 }
2225 }
2226 return null;
2227 }
2228
2229 /**
2230 * Tries to decrement counts (sometimes implicitly) and possibly
2231 * arrange for a compensating worker in preparation for
2232 * blocking. May fail due to interference, in which case -1 is
2233 * returned so caller may retry. A zero return value indicates
2234 * that the caller doesn't need to re-adjust counts when later
2235 * unblocked.
2236 *
2237 * @param c incoming ctl value
2238 * @return UNCOMPENSATE: block then adjust, 0: block, -1 : retry
2239 */
2240 private int tryCompensate(long c) {
2241 Predicate<? super ForkJoinPool> sat;
2242 long b = config;
2243 int pc = parallelism, // unpack fields
2244 minActive = (short)(b >>> RC_SHIFT),
2245 maxTotal = (short)(b >>> TC_SHIFT) + pc,
2246 active = (short)(c >>> RC_SHIFT),
2247 total = (short)(c >>> TC_SHIFT),
2248 sp = (int)c,
2249 stat = -1; // default retry return
2250 if (sp != 0 && active <= pc) { // activate idle worker
2251 WorkQueue[] qs; WorkQueue v; int i;
2252 if ((qs = queues) != null && qs.length > (i = sp & SMASK) &&
2253 (v = qs[i]) != null &&
2254 compareAndSetCtl(c, (c & UMASK) | (v.stackPred & LMASK))) {
2255 v.phase = sp;
2256 if (v.parking != 0)
2257 U.unpark(v.owner);
2258 stat = UNCOMPENSATE;
2259 }
2260 }
2261 else if (active > minActive && total >= pc) { // reduce active workers
2262 if (compareAndSetCtl(c, ((c - RC_UNIT) & RC_MASK) | (c & ~RC_MASK)))
2263 stat = UNCOMPENSATE;
2264 }
2265 else if (total < maxTotal && total < MAX_CAP) { // try to expand pool
2266 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
2267 if ((runState & STOP) != 0L) // terminating
2268 stat = 0;
2269 else if (compareAndSetCtl(c, nc))
2270 stat = createWorker() ? UNCOMPENSATE : 0;
2271 }
2272 else if (!compareAndSetCtl(c, c)) // validate
2273 ;
2274 else if ((sat = saturate) != null && sat.test(this))
2275 stat = 0;
2276 else
2277 throw new RejectedExecutionException(
2278 "Thread limit exceeded replacing blocked worker");
2279 return stat;
2280 }
2281
2282 /**
2283 * Readjusts RC count; called from ForkJoinTask after blocking.
2284 */
2285 final void uncompensate() {
2286 getAndAddCtl(RC_UNIT);
2287 }
2288
2289 /**
2290 * Helps if possible until the given task is done. Processes
2291 * compatible local tasks and scans other queues for task produced
2292 * by w's stealers; returning compensated blocking sentinel if
2293 * none are found.
2294 *
2295 * @param task the task
2296 * @param w caller's WorkQueue
2297 * @param internal true if w is owned by a ForkJoinWorkerThread
2298 * @return task status on exit, or UNCOMPENSATE for compensated blocking
2299 */
2300 final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean internal) {
2301 if (w != null)
2302 w.tryRemoveAndExec(task, internal);
2303 int s = 0;
2304 if (task != null && (s = task.status) >= 0 && internal && w != null) {
2305 int wid = w.phase & SMASK, r = wid + 2, wsrc = w.source;
2306 long sctl = 0L; // track stability
2307 outer: for (boolean rescan = true;;) {
2308 if ((s = task.status) < 0)
2309 break;
2310 if (!rescan) {
2311 if ((runState & STOP) != 0L)
2312 break;
2313 if (sctl == (sctl = ctl) && (s = tryCompensate(sctl)) >= 0)
2314 break;
2315 }
2316 rescan = false;
2317 WorkQueue[] qs = queues;
2318 int n = (qs == null) ? 0 : qs.length;
2319 scan: for (int l = n >>> 1; l > 0; --l, r += 2) {
2320 int j; WorkQueue q;
2321 if ((q = qs[j = r & SMASK & (n - 1)]) != null) {
2322 for (;;) {
2323 ForkJoinTask<?> t; ForkJoinTask<?>[] a;
2324 boolean eligible = false;
2325 int sq = q.source, b, cap; long k;
2326 if ((a = q.array) == null || (cap = a.length) <= 0)
2327 break;
2328 t = (ForkJoinTask<?>)U.getReferenceAcquire(
2329 a, k = slotOffset((cap - 1) & (b = q.base)));
2330 if (t == task)
2331 eligible = true;
2332 else if (t != null) { // check steal chain
2333 for (int v = sq, d = cap;;) {
2334 WorkQueue p;
2335 if (v == wid) {
2336 eligible = true;
2337 break;
2338 }
2339 if ((v & 1) == 0 || // external or none
2340 --d < 0 || // bound depth
2341 (p = qs[v & (n - 1)]) == null)
2342 break;
2343 v = p.source;
2344 }
2345 }
2346 if ((s = task.status) < 0)
2347 break outer; // validate
2348 if (q.source == sq && q.base == b &&
2349 U.getReference(a, k) == t) {
2350 if (!eligible) { // revisit if nonempty
2351 if (!rescan && t == null && q.top - b > 0)
2352 rescan = true;
2353 break;
2354 }
2355 if (U.compareAndSetReference(a, k, t, null)) {
2356 q.base = b + 1;
2357 w.source = j; // volatile write
2358 t.doExec();
2359 w.source = wsrc;
2360 rescan = true; // restart at index r
2361 break scan;
2362 }
2363 }
2364 }
2365 }
2366 }
2367 }
2368 }
2369 return s;
2370 }
2371
2372 /**
2373 * Version of helpJoin for CountedCompleters.
2374 *
2375 * @param task root of computation (only called when a CountedCompleter)
2376 * @param w caller's WorkQueue
2377 * @param internal true if w is owned by a ForkJoinWorkerThread
2378 * @return task status on exit, or UNCOMPENSATE for compensated blocking
2379 */
2380 final int helpComplete(ForkJoinTask<?> task, WorkQueue w, boolean internal) {
2381 int s = 0;
2382 if (task != null && (s = task.status) >= 0 && w != null) {
2383 int r = w.phase + 1; // for indexing
2384 long sctl = 0L; // track stability
2385 outer: for (boolean rescan = true, locals = true;;) {
2386 if (locals && (s = w.helpComplete(task, internal, 0)) < 0)
2387 break;
2388 if ((s = task.status) < 0)
2389 break;
2390 if (!rescan) {
2391 if ((runState & STOP) != 0L)
2392 break;
2393 if (sctl == (sctl = ctl) &&
2394 (!internal || (s = tryCompensate(sctl)) >= 0))
2395 break;
2396 }
2397 rescan = locals = false;
2398 WorkQueue[] qs = queues;
2399 int n = (qs == null) ? 0 : qs.length;
2400 scan: for (int l = n; l > 0; --l, ++r) {
2401 int j; WorkQueue q;
2402 if ((q = qs[j = r & SMASK & (n - 1)]) != null) {
2403 for (;;) {
2404 ForkJoinTask<?> t; ForkJoinTask<?>[] a;
2405 int b, cap, nb; long k;
2406 boolean eligible = false;
2407 if ((a = q.array) == null || (cap = a.length) <= 0)
2408 break;
2409 t = (ForkJoinTask<?>)U.getReferenceAcquire(
2410 a, k = slotOffset((cap - 1) & (b = q.base)));
2411 if (t instanceof CountedCompleter) {
2412 CountedCompleter<?> f = (CountedCompleter<?>)t;
2413 for (int steps = cap; steps > 0; --steps) {
2414 if (f == task) {
2415 eligible = true;
2416 break;
2417 }
2418 if ((f = f.completer) == null)
2419 break;
2420 }
2421 }
2422 if ((s = task.status) < 0) // validate
2423 break outer;
2424 if (q.base == b) {
2425 if (eligible) {
2426 if (U.compareAndSetReference(
2427 a, k, t, null)) {
2428 q.updateBase(b + 1);
2429 t.doExec();
2430 locals = rescan = true;
2431 break scan;
2432 }
2433 }
2434 else if (U.getReference(a, k) == t) {
2435 if (!rescan && t == null && q.top - b > 0)
2436 rescan = true; // revisit
2437 break;
2438 }
2439 }
2440 }
2441 }
2442 }
2443 }
2444 }
2445 return s;
2446 }
2447
2448 /**
2449 * Runs tasks until all workers are inactive and no tasks are
2450 * found. Rather than blocking when tasks cannot be found, rescans
2451 * until all others cannot find tasks either.
2452 *
2453 * @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
2454 * @param interruptible true if return on interrupt
2455 * @return positive if quiescent, negative if interrupted, else 0
2456 */
2457 private int helpQuiesce(WorkQueue w, long nanos, boolean interruptible) {
2458 int phase; // w.phase inactive bit set when temporarily quiescent
2459 if (w == null || ((phase = w.phase) & IDLE) != 0)
2460 return 0;
2461 int wsrc = w.source;
2462 long startTime = System.nanoTime();
2463 long maxSleep = Math.min(nanos >>> 8, MAX_SLEEP); // approx 1% nanos
2464 long prevSum = 0L;
2465 int activePhase = phase, inactivePhase = phase + IDLE;
2466 int r = phase + 1, waits = 0, returnStatus = 1;
2467 boolean locals = true;
2468 for (long e = runState;;) {
2469 if ((e & STOP) != 0L)
2470 break; // terminating
2471 if (interruptible && Thread.interrupted()) {
2472 returnStatus = -1;
2473 break;
2474 }
2475 if (locals) { // run local tasks before (re)polling
2476 locals = false;
2477 for (ForkJoinTask<?> u; (u = w.nextLocalTask()) != null;)
2478 u.doExec();
2479 }
2480 WorkQueue[] qs = queues;
2481 int n = (qs == null) ? 0 : qs.length;
2482 long phaseSum = 0L;
2483 boolean rescan = false, busy = false;
2484 scan: for (int l = n; l > 0; --l, ++r) {
2485 int j; WorkQueue q;
2486 if ((q = qs[j = r & SMASK & (n - 1)]) != null && q != w) {
2487 for (;;) {
2488 ForkJoinTask<?> t; ForkJoinTask<?>[] a;
2489 int b, cap; long k;
2490 if ((a = q.array) == null || (cap = a.length) <= 0)
2491 break;
2492 t = (ForkJoinTask<?>)U.getReferenceAcquire(
2493 a, k = slotOffset((cap - 1) & (b = q.base)));
2494 if (t != null && phase == inactivePhase) // reactivate
2495 w.phase = phase = activePhase;
2496 if (q.base == b && U.getReference(a, k) == t) {
2497 int nb = b + 1;
2498 if (t == null) {
2499 if (!rescan) {
2500 int qp = q.phase, mq = qp & (IDLE | 1);
2501 phaseSum += qp;
2502 if (mq == 0 || q.top - b > 0)
2503 rescan = true;
2504 else if (mq == 1)
2505 busy = true;
2506 }
2507 break;
2508 }
2509 if (U.compareAndSetReference(a, k, t, null)) {
2510 q.base = nb;
2511 w.source = j; // volatile write
2512 t.doExec();
2513 w.source = wsrc;
2514 rescan = locals = true;
2515 break scan;
2516 }
2517 }
2518 }
2519 }
2520 }
2521 if (e != (e = runState) || prevSum != (prevSum = phaseSum) ||
2522 rescan || (e & RS_LOCK) != 0L)
2523 ; // inconsistent
2524 else if (!busy)
2525 break;
2526 else if (phase == activePhase) {
2527 waits = 0; // recheck, then sleep
2528 w.phase = phase = inactivePhase;
2529 }
2530 else if (System.nanoTime() - startTime > nanos) {
2531 returnStatus = 0; // timed out
2532 break;
2533 }
2534 else if (waits == 0) // same as spinLockRunState except
2535 waits = MIN_SLEEP; // with rescan instead of onSpinWait
2536 else {
2537 LockSupport.parkNanos(this, (long)waits);
2538 if (waits < maxSleep)
2539 waits <<= 1;
2540 }
2541 }
2542 w.phase = activePhase;
2543 return returnStatus;
2544 }
2545
2546 /**
2547 * Helps quiesce from external caller until done, interrupted, or timeout
2548 *
2549 * @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
2550 * @param interruptible true if return on interrupt
2551 * @return positive if quiescent, negative if interrupted, else 0
2552 */
2553 private int externalHelpQuiesce(long nanos, boolean interruptible) {
2554 if (quiescent() < 0) {
2555 long startTime = System.nanoTime();
2556 long maxSleep = Math.min(nanos >>> 8, MAX_SLEEP);
2557 for (int waits = 0;;) {
2558 ForkJoinTask<?> t;
2559 if (interruptible && Thread.interrupted())
2560 return -1;
2561 else if ((t = pollScan(false)) != null) {
2562 waits = 0;
2563 t.doExec();
2564 }
2565 else if (quiescent() >= 0)
2566 break;
2567 else if (System.nanoTime() - startTime > nanos)
2568 return 0;
2569 else if (waits == 0)
2570 waits = MIN_SLEEP;
2571 else {
2572 LockSupport.parkNanos(this, (long)waits);
2573 if (waits < maxSleep)
2574 waits <<= 1;
2575 }
2576 }
2577 }
2578 return 1;
2579 }
2580
2581 /**
2582 * Helps quiesce from either internal or external caller
2583 *
2584 * @param pool the pool to use, or null if any
2585 * @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
2586 * @param interruptible true if return on interrupt
2587 * @return positive if quiescent, negative if interrupted, else 0
2588 */
2589 static final int helpQuiescePool(ForkJoinPool pool, long nanos,
2590 boolean interruptible) {
2591 Thread t; ForkJoinPool p; ForkJoinWorkerThread wt;
2592 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
2593 (p = (wt = (ForkJoinWorkerThread)t).pool) != null &&
2594 (p == pool || pool == null))
2595 return p.helpQuiesce(wt.workQueue, nanos, interruptible);
2596 else if ((p = pool) != null || (p = common) != null)
2597 return p.externalHelpQuiesce(nanos, interruptible);
2598 else
2599 return 0;
2600 }
2601
2602 /**
2603 * Gets and removes a local or stolen task for the given worker.
2604 *
2605 * @return a task, if available
2606 */
2607 final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2608 ForkJoinTask<?> t;
2609 if (w == null || (t = w.nextLocalTask()) == null)
2610 t = pollScan(false);
2611 return t;
2612 }
2613
2614 // External operations
2615
2616 /**
2617 * Finds and locks a WorkQueue for an external submitter, or
2618 * throws RejectedExecutionException if shutdown
2619 * @param rejectOnShutdown true if RejectedExecutionException
2620 * should be thrown when shutdown
2621 */
2622 final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) {
2623 int r;
2624 if ((r = ThreadLocalRandom.getProbe()) == 0) {
2625 ThreadLocalRandom.localInit(); // initialize caller's probe
2626 r = ThreadLocalRandom.getProbe();
2627 }
2628 for (;;) {
2629 WorkQueue q; WorkQueue[] qs; int n, id, i;
2630 if ((qs = queues) == null || (n = qs.length) <= 0)
2631 break;
2632 if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
2633 WorkQueue newq = new WorkQueue(null, id, 0, false);
2634 lockRunState();
2635 if (qs[i] == null && queues == qs)
2636 q = qs[i] = newq; // else lost race to install
2637 unlockRunState();
2638 }
2639 if (q != null && q.tryLockPhase()) {
2640 if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
2641 q.unlockPhase(); // check while q lock held
2642 break;
2643 }
2644 return q;
2645 }
2646 r = ThreadLocalRandom.advanceProbe(r); // move
2647 }
2648 throw new RejectedExecutionException();
2649 }
2650
2651 private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty, ForkJoinTask<T> task) {
2652 Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal;
2653 if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) &&
2654 (wt = (ForkJoinWorkerThread)t).pool == this) {
2655 internal = true;
2656 q = wt.workQueue;
2657 }
2658 else { // find and lock queue
2659 internal = false;
2660 q = externalSubmissionQueue(true);
2661 }
2662 q.push(task, signalIfEmpty ? this : null, internal);
2663 return task;
2664 }
2665
2666 /**
2667 * Returns queue for an external thread, if one exists that has
2668 * possibly ever submitted to the given pool (nonzero probe), or
2669 * null if none.
2670 */
2671 static WorkQueue externalQueue(ForkJoinPool p) {
2672 WorkQueue[] qs; int n;
2673 int r = ThreadLocalRandom.getProbe();
2674 return (p != null && (qs = p.queues) != null &&
2675 (n = qs.length) > 0 && r != 0) ?
2676 qs[r & EXTERNAL_ID_MASK & (n - 1)] : null;
2677 }
2678
2679 /**
2680 * Returns external queue for common pool.
2681 */
2682 static WorkQueue commonQueue() {
2683 return externalQueue(common);
2684 }
2685
2686 /**
2687 * If the given executor is a ForkJoinPool, poll and execute
2688 * AsynchronousCompletionTasks from worker's queue until none are
2689 * available or blocker is released.
2690 */
2691 static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
2692 WorkQueue w = null; Thread t; ForkJoinWorkerThread wt;
2693 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2694 (wt = (ForkJoinWorkerThread)t).pool == e)
2695 w = wt.workQueue;
2696 else if (e instanceof ForkJoinPool)
2697 w = externalQueue((ForkJoinPool)e);
2698 if (w != null)
2699 w.helpAsyncBlocker(blocker);
2700 }
2701
2702 /**
2703 * Returns a cheap heuristic guide for task partitioning when
2704 * programmers, frameworks, tools, or languages have little or no
2705 * idea about task granularity. In essence, by offering this
2706 * method, we ask users only about tradeoffs in overhead vs
2707 * expected throughput and its variance, rather than how finely to
2708 * partition tasks.
2709 *
2710 * In a steady state strict (tree-structured) computation, each
2711 * thread makes available for stealing enough tasks for other
2712 * threads to remain active. Inductively, if all threads play by
2713 * the same rules, each thread should make available only a
2714 * constant number of tasks.
2715 *
2716 * The minimum useful constant is just 1. But using a value of 1
2717 * would require immediate replenishment upon each steal to
2718 * maintain enough tasks, which is infeasible. Further,
2719 * partitionings/granularities of offered tasks should minimize
2720 * steal rates, which in general means that threads nearer the top
2721 * of computation tree should generate more than those nearer the
2722 * bottom. In perfect steady state, each thread is at
2723 * approximately the same level of computation tree. However,
2724 * producing extra tasks amortizes the uncertainty of progress and
2725 * diffusion assumptions.
2726 *
2727 * So, users will want to use values larger (but not much larger)
2728 * than 1 to both smooth over transient shortages and hedge
2729 * against uneven progress; as traded off against the cost of
2730 * extra task overhead. We leave the user to pick a threshold
2731 * value to compare with the results of this call to guide
2732 * decisions, but recommend values such as 3.
2733 *
2734 * When all threads are active, it is on average OK to estimate
2735 * surplus strictly locally. In steady-state, if one thread is
2736 * maintaining say 2 surplus tasks, then so are others. So we can
2737 * just use estimated queue length. However, this strategy alone
2738 * leads to serious mis-estimates in some non-steady-state
2739 * conditions (ramp-up, ramp-down, other stalls). We can detect
2740 * many of these by further considering the number of "idle"
2741 * threads, that are known to have zero queued tasks, so
2742 * compensate by a factor of (#idle/#active) threads.
2743 */
2744 static int getSurplusQueuedTaskCount() {
2745 Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2746 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2747 (pool = (wt = (ForkJoinWorkerThread)t).pool) != null &&
2748 (q = wt.workQueue) != null) {
2749 int n = q.top - q.base;
2750 int p = pool.parallelism;
2751 int a = (short)(pool.ctl >>> RC_SHIFT);
2752 return n - (a > (p >>>= 1) ? 0 :
2753 a > (p >>>= 1) ? 1 :
2754 a > (p >>>= 1) ? 2 :
2755 a > (p >>>= 1) ? 4 :
2756 8);
2757 }
2758 return 0;
2759 }
2760
2761 // Termination
2762
2763 /**
2764 * Possibly initiates and/or completes pool termination.
2765 *
2766 * @param now if true, unconditionally terminate, else only
2767 * if no work and no active workers
2768 * @param enable if true, terminate when next possible
2769 * @return runState on exit
2770 */
2771 private long tryTerminate(boolean now, boolean enable) {
2772 long e, isShutdown, ps;
2773 if (((e = runState) & TERMINATED) != 0L)
2774 now = false;
2775 else if ((e & STOP) != 0L)
2776 now = true;
2777 else if (now) {
2778 if (((ps = getAndBitwiseOrRunState(SHUTDOWN|STOP) & STOP)) == 0L) {
2779 if ((ps & RS_LOCK) != 0L) {
2780 spinLockRunState(); // ensure queues array stable after stop
2781 unlockRunState();
2782 }
2783 interruptAll();
2784 }
2785 }
2786 else if ((isShutdown = (e & SHUTDOWN)) != 0L || enable) {
2787 long quiet; DelayScheduler ds;
2788 if (isShutdown == 0L)
2789 getAndBitwiseOrRunState(SHUTDOWN);
2790 if ((quiet = quiescent()) > 0)
2791 now = true;
2792 else if (quiet == 0 && (ds = delayScheduler) != null)
2793 ds.signal();
2794 }
2795
2796 if (now) {
2797 DelayScheduler ds;
2798 releaseWaiters();
2799 if ((ds = delayScheduler) != null)
2800 ds.signal();
2801 for (;;) {
2802 if (((e = runState) & CLEANED) == 0L) {
2803 boolean clean = cleanQueues();
2804 if (((e = runState) & CLEANED) == 0L && clean)
2805 e = getAndBitwiseOrRunState(CLEANED) | CLEANED;
2806 }
2807 if ((e & TERMINATED) != 0L)
2808 break;
2809 if (ctl != 0L) // else loop if didn't finish cleaning
2810 break;
2811 if ((ds = delayScheduler) != null && ds.signal() >= 0)
2812 break;
2813 if ((e & CLEANED) != 0L) {
2814 e |= TERMINATED;
2815 if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0L) {
2816 CountDownLatch done; SharedThreadContainer ctr;
2817 if ((done = termination) != null)
2818 done.countDown();
2819 if ((ctr = container) != null)
2820 ctr.close();
2821 }
2822 break;
2823 }
2824 }
2825 }
2826 return e;
2827 }
2828
2829 /**
2830 * Scans queues in a psuedorandom order based on thread id,
2831 * cancelling tasks until empty, or returning early upon
2832 * interference or still-active external queues, in which case
2833 * other calls will finish cancellation.
2834 *
2835 * @return true if all queues empty
2836 */
2837 private boolean cleanQueues() {
2838 int r = (int)Thread.currentThread().threadId();
2839 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
2840 int step = (r >>> 16) | 1; // randomize traversals
2841 WorkQueue[] qs = queues;
2842 int n = (qs == null) ? 0 : qs.length;
2843 for (int l = n; l > 0; --l, r += step) {
2844 WorkQueue q; ForkJoinTask<?>[] a; int cap;
2845 if ((q = qs[r & (n - 1)]) != null &&
2846 (a = q.array) != null && (cap = a.length) > 0) {
2847 for (;;) {
2848 ForkJoinTask<?> t; int b; long k;
2849 t = (ForkJoinTask<?>)U.getReferenceAcquire(
2850 a, k = slotOffset((cap - 1) & (b = q.base)));
2851 if (q.base == b && t != null &&
2852 U.compareAndSetReference(a, k, t, null)) {
2853 q.updateBase(b + 1);
2854 try {
2855 t.cancel(false);
2856 } catch (Throwable ignore) {
2857 }
2858 }
2859 else if ((q.phase & (IDLE|1)) == 0 || // externally locked
2860 q.top - q.base > 0)
2861 return false; // incomplete
2862 else
2863 break;
2864 }
2865 }
2866 }
2867 return true;
2868 }
2869
2870 /**
2871 * Interrupts all workers
2872 */
2873 private void interruptAll() {
2874 Thread current = Thread.currentThread();
2875 WorkQueue[] qs = queues;
2876 int n = (qs == null) ? 0 : qs.length;
2877 for (int i = 1; i < n; i += 2) {
2878 WorkQueue q; Thread o;
2879 if ((q = qs[i]) != null && (o = q.owner) != null && o != current) {
2880 try {
2881 o.interrupt();
2882 } catch (Throwable ignore) {
2883 }
2884 }
2885 }
2886 }
2887
2888 /**
2889 * Returns termination signal, constructing if necessary
2890 */
2891 private CountDownLatch terminationSignal() {
2892 CountDownLatch signal, s, u;
2893 if ((signal = termination) == null)
2894 signal = ((u = cmpExTerminationSignal(
2895 s = new CountDownLatch(1))) == null) ? s : u;
2896 return signal;
2897 }
2898
2899 // Exported methods
2900
2901 // Constructors
2902
2903 /**
2904 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2905 * java.lang.Runtime#availableProcessors}, using defaults for all
2906 * other parameters (see {@link #ForkJoinPool(int,
2907 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,
2908 * int, int, int, Predicate, long, TimeUnit)}).
2909 */
2910 public ForkJoinPool() {
2911 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2912 defaultForkJoinWorkerThreadFactory, null, false,
2913 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2914 }
2915
2916 /**
2917 * Creates a {@code ForkJoinPool} with the indicated parallelism
2918 * level, using defaults for all other parameters (see {@link
2919 * #ForkJoinPool(int, ForkJoinWorkerThreadFactory,
2920 * UncaughtExceptionHandler, boolean, int, int, int, Predicate,
2921 * long, TimeUnit)}).
2922 *
2923 * @param parallelism the parallelism level
2924 * @throws IllegalArgumentException if parallelism less than or
2925 * equal to zero, or greater than implementation limit
2926 */
2927 public ForkJoinPool(int parallelism) {
2928 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false,
2929 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2930 }
2931
2932 /**
2933 * Creates a {@code ForkJoinPool} with the given parameters (using
2934 * defaults for others -- see {@link #ForkJoinPool(int,
2935 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,
2936 * int, int, int, Predicate, long, TimeUnit)}).
2937 *
2938 * @param parallelism the parallelism level. For default value,
2939 * use {@link java.lang.Runtime#availableProcessors}.
2940 * @param factory the factory for creating new threads. For default value,
2941 * use {@link #defaultForkJoinWorkerThreadFactory}.
2942 * @param handler the handler for internal worker threads that
2943 * terminate due to unrecoverable errors encountered while executing
2944 * tasks. For default value, use {@code null}.
2945 * @param asyncMode if true,
2946 * establishes local first-in-first-out scheduling mode for forked
2947 * tasks that are never joined. This mode may be more appropriate
2948 * than default locally stack-based mode in applications in which
2949 * worker threads only process event-style asynchronous tasks.
2950 * For default value, use {@code false}.
2951 * @throws IllegalArgumentException if parallelism less than or
2952 * equal to zero, or greater than implementation limit
2953 * @throws NullPointerException if the factory is null
2954 */
2955 public ForkJoinPool(int parallelism,
2956 ForkJoinWorkerThreadFactory factory,
2957 UncaughtExceptionHandler handler,
2958 boolean asyncMode) {
2959 this(parallelism, factory, handler, asyncMode,
2960 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2961 }
2962
2963 /**
2964 * Creates a {@code ForkJoinPool} with the given parameters.
2965 *
2966 * @param parallelism the parallelism level. For default value,
2967 * use {@link java.lang.Runtime#availableProcessors}.
2968 *
2969 * @param factory the factory for creating new threads. For
2970 * default value, use {@link #defaultForkJoinWorkerThreadFactory}.
2971 *
2972 * @param handler the handler for internal worker threads that
2973 * terminate due to unrecoverable errors encountered while
2974 * executing tasks. For default value, use {@code null}.
2975 *
2976 * @param asyncMode if true, establishes local first-in-first-out
2977 * scheduling mode for forked tasks that are never joined. This
2978 * mode may be more appropriate than default locally stack-based
2979 * mode in applications in which worker threads only process
2980 * event-style asynchronous tasks. For default value, use {@code
2981 * false}.
2982 *
2983 * @param corePoolSize ignored: used in previous releases of this
2984 * class but no longer applicable. Using {@code 0} maintains
2985 * compatibility across releases.
2986 *
2987 * @param maximumPoolSize the maximum number of threads allowed.
2988 * When the maximum is reached, attempts to replace blocked
2989 * threads fail. (However, because creation and termination of
2990 * different threads may overlap, and may be managed by the given
2991 * thread factory, this value may be transiently exceeded.) To
2992 * arrange the same value as is used by default for the common
2993 * pool, use {@code 256} plus the {@code parallelism} level. (By
2994 * default, the common pool allows a maximum of 256 spare
2995 * threads.) Using a value (for example {@code
2996 * Integer.MAX_VALUE}) larger than the implementation's total
2997 * thread limit has the same effect as using this limit (which is
2998 * the default).
2999 *
3000 * @param minimumRunnable the minimum allowed number of core
3001 * threads not blocked by a join or {@link ManagedBlocker}. To
3002 * ensure progress, when too few unblocked threads exist and
3003 * unexecuted tasks may exist, new threads are constructed, up to
3004 * the given maximumPoolSize. For the default value, use {@code
3005 * 1}, that ensures liveness. A larger value might improve
3006 * throughput in the presence of blocked activities, but might
3007 * not, due to increased overhead. A value of zero may be
3008 * acceptable when submitted tasks cannot have dependencies
3009 * requiring additional threads.
3010 *
3011 * @param saturate if non-null, a predicate invoked upon attempts
3012 * to create more than the maximum total allowed threads. By
3013 * default, when a thread is about to block on a join or {@link
3014 * ManagedBlocker}, but cannot be replaced because the
3015 * maximumPoolSize would be exceeded, a {@link
3016 * RejectedExecutionException} is thrown. But if this predicate
3017 * returns {@code true}, then no exception is thrown, so the pool
3018 * continues to operate with fewer than the target number of
3019 * runnable threads, which might not ensure progress.
3020 *
3021 * @param keepAliveTime the elapsed time since last use before
3022 * a thread is terminated (and then later replaced if needed).
3023 * For the default value, use {@code 60, TimeUnit.SECONDS}.
3024 *
3025 * @param unit the time unit for the {@code keepAliveTime} argument
3026 *
3027 * @throws IllegalArgumentException if parallelism is less than or
3028 * equal to zero, or is greater than implementation limit,
3029 * or if maximumPoolSize is less than parallelism,
3030 * of if the keepAliveTime is less than or equal to zero.
3031 * @throws NullPointerException if the factory is null
3032 * @since 9
3033 */
3034 public ForkJoinPool(int parallelism,
3035 ForkJoinWorkerThreadFactory factory,
3036 UncaughtExceptionHandler handler,
3037 boolean asyncMode,
3038 int corePoolSize,
3039 int maximumPoolSize,
3040 int minimumRunnable,
3041 Predicate<? super ForkJoinPool> saturate,
3042 long keepAliveTime,
3043 TimeUnit unit) {
3044 int p = parallelism;
3045 if (p <= 0 || p > MAX_CAP || p > maximumPoolSize || keepAliveTime <= 0L)
3046 throw new IllegalArgumentException();
3047 if (factory == null || unit == null)
3048 throw new NullPointerException();
3049 int size = Math.max(MIN_QUEUES_SIZE,
3050 1 << (33 - Integer.numberOfLeadingZeros(p - 1)));
3051 this.parallelism = p;
3052 this.factory = factory;
3053 this.ueh = handler;
3054 this.saturate = saturate;
3055 this.keepAlive = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
3056 int maxSpares = Math.clamp(maximumPoolSize - p, 0, MAX_CAP);
3057 int minAvail = Math.clamp(minimumRunnable, 0, MAX_CAP);
3058 this.config = (((asyncMode ? FIFO : 0) & LMASK) |
3059 (((long)maxSpares) << TC_SHIFT) |
3060 (((long)minAvail) << RC_SHIFT));
3061 this.queues = new WorkQueue[size];
3062 String pid = Integer.toString(getAndAddPoolIds(1) + 1);
3063 String name = "ForkJoinPool-" + pid;
3064 this.poolName = name;
3065 this.workerNamePrefix = name + "-worker-";
3066 this.container = SharedThreadContainer.create(name);
3067 }
3068
3069 /**
3070 * Constructor for common pool using parameters possibly
3071 * overridden by system properties
3072 */
3073 private ForkJoinPool(byte forCommonPoolOnly) {
3074 String name = "ForkJoinPool.commonPool";
3075 ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory;
3076 UncaughtExceptionHandler handler = null;
3077 int maxSpares = DEFAULT_COMMON_MAX_SPARES;
3078 int pc = 0, preset = 0; // nonzero if size set as property
3079 try { // ignore exceptions in accessing/parsing properties
3080 String pp = System.getProperty
3081 ("java.util.concurrent.ForkJoinPool.common.parallelism");
3082 if (pp != null) {
3083 pc = Math.max(0, Integer.parseInt(pp));
3084 preset = PRESET_SIZE;
3085 }
3086 String ms = System.getProperty
3087 ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
3088 if (ms != null)
3089 maxSpares = Math.clamp(Integer.parseInt(ms), 0, MAX_CAP);
3090 String sf = System.getProperty
3091 ("java.util.concurrent.ForkJoinPool.common.threadFactory");
3092 String sh = System.getProperty
3093 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
3094 if (sf != null || sh != null) {
3095 ClassLoader ldr = ClassLoader.getSystemClassLoader();
3096 if (sf != null)
3097 fac = (ForkJoinWorkerThreadFactory)
3098 ldr.loadClass(sf).getConstructor().newInstance();
3099 if (sh != null)
3100 handler = (UncaughtExceptionHandler)
3101 ldr.loadClass(sh).getConstructor().newInstance();
3102 }
3103 } catch (Exception ignore) {
3104 }
3105 if (preset == 0)
3106 pc = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
3107 int p = Math.min(pc, MAX_CAP);
3108 int size = Math.max(MIN_QUEUES_SIZE,
3109 (p == 0) ? 1 :
3110 1 << (33 - Integer.numberOfLeadingZeros(p-1)));
3111 this.parallelism = p;
3112 this.config = ((preset & LMASK) | (((long)maxSpares) << TC_SHIFT) |
3113 (1L << RC_SHIFT));
3114 this.factory = fac;
3115 this.ueh = handler;
3116 this.keepAlive = DEFAULT_KEEPALIVE;
3117 this.saturate = null;
3118 this.workerNamePrefix = null;
3119 this.poolName = name;
3120 this.queues = new WorkQueue[size];
3121 this.container = SharedThreadContainer.create(name);
3122 }
3123
3124 /**
3125 * Returns the common pool instance. This pool is statically
3126 * constructed; its run state is unaffected by attempts to {@link
3127 * #shutdown} or {@link #shutdownNow}. However this pool and any
3128 * ongoing processing are automatically terminated upon program
3129 * {@link System#exit}. Any program that relies on asynchronous
3130 * task processing to complete before program termination should
3131 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
3132 * before exit.
3133 *
3134 * @return the common pool instance
3135 * @since 1.8
3136 */
3137 public static ForkJoinPool commonPool() {
3138 // assert common != null : "static init error";
3139 return common;
3140 }
3141
3142 /**
3143 * Package-private access to commonPool overriding zero parallelism
3144 */
3145 static ForkJoinPool asyncCommonPool() {
3146 ForkJoinPool cp; int p;
3147 if ((p = (cp = common).parallelism) == 0)
3148 U.compareAndSetInt(cp, PARALLELISM, 0, 2);
3149 return cp;
3150 }
3151
3152 // Execution methods
3153
3154 /**
3155 * Performs the given task, returning its result upon completion.
3156 * If the computation encounters an unchecked Exception or Error,
3157 * it is rethrown as the outcome of this invocation. Rethrown
3158 * exceptions behave in the same way as regular exceptions, but,
3159 * when possible, contain stack traces (as displayed for example
3160 * using {@code ex.printStackTrace()}) of both the current thread
3161 * as well as the thread actually encountering the exception;
3162 * minimally only the latter.
3163 *
3164 * @param task the task
3165 * @param <T> the type of the task's result
3166 * @return the task's result
3167 * @throws NullPointerException if the task is null
3168 * @throws RejectedExecutionException if the task cannot be
3169 * scheduled for execution
3170 */
3171 public <T> T invoke(ForkJoinTask<T> task) {
3172 poolSubmit(true, Objects.requireNonNull(task));
3173 try {
3174 return task.join();
3175 } catch (RuntimeException | Error unchecked) {
3176 throw unchecked;
3177 } catch (Exception checked) {
3178 throw new RuntimeException(checked);
3179 }
3180 }
3181
3182 /**
3183 * Arranges for (asynchronous) execution of the given task.
3184 *
3185 * @param task the task
3186 * @throws NullPointerException if the task is null
3187 * @throws RejectedExecutionException if the task cannot be
3188 * scheduled for execution
3189 */
3190 public void execute(ForkJoinTask<?> task) {
3191 poolSubmit(true, Objects.requireNonNull(task));
3192 }
3193
3194 // AbstractExecutorService methods
3195
3196 /**
3197 * @throws NullPointerException if the task is null
3198 * @throws RejectedExecutionException if the task cannot be
3199 * scheduled for execution
3200 */
3201 @Override
3202 @SuppressWarnings("unchecked")
3203 public void execute(Runnable task) {
3204 poolSubmit(true, (Objects.requireNonNull(task) instanceof ForkJoinTask<?>)
3205 ? (ForkJoinTask<Void>) task // avoid re-wrap
3206 : new ForkJoinTask.RunnableExecuteAction(task));
3207 }
3208
3209 /**
3210 * Submits a ForkJoinTask for execution.
3211 *
3212 * @implSpec
3213 * This method is equivalent to {@link #externalSubmit(ForkJoinTask)}
3214 * when called from a thread that is not in this pool.
3215 *
3216 * @param task the task to submit
3217 * @param <T> the type of the task's result
3218 * @return the task
3219 * @throws NullPointerException if the task is null
3220 * @throws RejectedExecutionException if the task cannot be
3221 * scheduled for execution
3222 */
3223 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
3224 return poolSubmit(true, Objects.requireNonNull(task));
3225 }
3226
3227 /**
3228 * @throws NullPointerException if the task is null
3229 * @throws RejectedExecutionException if the task cannot be
3230 * scheduled for execution
3231 */
3232 @Override
3233 public <T> ForkJoinTask<T> submit(Callable<T> task) {
3234 Objects.requireNonNull(task);
3235 return poolSubmit(
3236 true,
3237 (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
3238 new ForkJoinTask.AdaptedCallable<T>(task) :
3239 new ForkJoinTask.AdaptedInterruptibleCallable<T>(task));
3240 }
3241
3242 /**
3243 * @throws NullPointerException if the task is null
3244 * @throws RejectedExecutionException if the task cannot be
3245 * scheduled for execution
3246 */
3247 @Override
3248 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
3249 Objects.requireNonNull(task);
3250 return poolSubmit(
3251 true,
3252 (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
3253 new ForkJoinTask.AdaptedRunnable<T>(task, result) :
3254 new ForkJoinTask.AdaptedInterruptibleRunnable<T>(task, result));
3255 }
3256
3257 /**
3258 * @throws NullPointerException if the task is null
3259 * @throws RejectedExecutionException if the task cannot be
3260 * scheduled for execution
3261 */
3262 @Override
3263 @SuppressWarnings("unchecked")
3264 public ForkJoinTask<?> submit(Runnable task) {
3265 Objects.requireNonNull(task);
3266 return poolSubmit(
3267 true,
3268 (task instanceof ForkJoinTask<?>) ?
3269 (ForkJoinTask<Void>) task : // avoid re-wrap
3270 ((Thread.currentThread() instanceof ForkJoinWorkerThread) ?
3271 new ForkJoinTask.AdaptedRunnable<Void>(task, null) :
3272 new ForkJoinTask.AdaptedInterruptibleRunnable<Void>(task, null)));
3273 }
3274
3275 /**
3276 * Submits the given task as if submitted from a non-{@code ForkJoinTask}
3277 * client. The task is added to a scheduling queue for submissions to the
3278 * pool even when called from a thread in the pool.
3279 *
3280 * @implSpec
3281 * This method is equivalent to {@link #submit(ForkJoinTask)} when called
3282 * from a thread that is not in this pool.
3283 *
3284 * @return the task
3285 * @param task the task to submit
3286 * @param <T> the type of the task's result
3287 * @throws NullPointerException if the task is null
3288 * @throws RejectedExecutionException if the task cannot be
3289 * scheduled for execution
3290 * @since 20
3291 */
3292 public <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
3293 Objects.requireNonNull(task);
3294 externalSubmissionQueue(true).push(task, this, false);
3295 return task;
3296 }
3297
3298 /**
3299 * Submits the given task without guaranteeing that it will
3300 * eventually execute in the absence of available active threads.
3301 * In some contexts, this method may reduce contention and
3302 * overhead by relying on context-specific knowledge that existing
3303 * threads (possibly including the calling thread if operating in
3304 * this pool) will eventually be available to execute the task.
3305 *
3306 * @param task the task
3307 * @param <T> the type of the task's result
3308 * @return the task
3309 * @throws NullPointerException if the task is null
3310 * @throws RejectedExecutionException if the task cannot be
3311 * scheduled for execution
3312 * @since 19
3313 */
3314 public <T> ForkJoinTask<T> lazySubmit(ForkJoinTask<T> task) {
3315 return poolSubmit(false, Objects.requireNonNull(task));
3316 }
3317
3318 /**
3319 * Changes the target parallelism of this pool, controlling the
3320 * future creation, use, and termination of worker threads.
3321 * Applications include contexts in which the number of available
3322 * processors changes over time.
3323 *
3324 * @implNote This implementation restricts the maximum number of
3325 * running threads to 32767
3326 *
3327 * @param size the target parallelism level
3328 * @return the previous parallelism level.
3329 * @throws IllegalArgumentException if size is less than 1 or
3330 * greater than the maximum supported by this pool.
3331 * @throws UnsupportedOperationException this is the{@link
3332 * #commonPool()} and parallelism level was set by System
3333 * property {@systemProperty
3334 * java.util.concurrent.ForkJoinPool.common.parallelism}.
3335 * @since 19
3336 */
3337 public int setParallelism(int size) {
3338 if (size < 1 || size > MAX_CAP)
3339 throw new IllegalArgumentException();
3340 if ((config & PRESET_SIZE) != 0)
3341 throw new UnsupportedOperationException("Cannot override System property");
3342 return getAndSetParallelism(size);
3343 }
3344
3345 /**
3346 * Uninterrupible version of {@code invokeAll}. Executes the given
3347 * tasks, returning a list of Futures holding their status and
3348 * results when all complete, ignoring interrupts. {@link
3349 * Future#isDone} is {@code true} for each element of the returned
3350 * list. Note that a <em>completed</em> task could have
3351 * terminated either normally or by throwing an exception. The
3352 * results of this method are undefined if the given collection is
3353 * modified while this operation is in progress.
3354 *
3355 * @apiNote This method supports usages that previously relied on an
3356 * incompatible override of
3357 * {@link ExecutorService#invokeAll(java.util.Collection)}.
3358 *
3359 * @param tasks the collection of tasks
3360 * @param <T> the type of the values returned from the tasks
3361 * @return a list of Futures representing the tasks, in the same
3362 * sequential order as produced by the iterator for the
3363 * given task list, each of which has completed
3364 * @throws NullPointerException if tasks or any of its elements are {@code null}
3365 * @throws RejectedExecutionException if any task cannot be
3366 * scheduled for execution
3367 * @since 22
3368 */
3369 public <T> List<Future<T>> invokeAllUninterruptibly(Collection<? extends Callable<T>> tasks) {
3370 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
3371 try {
3372 for (Callable<T> t : tasks) {
3373 ForkJoinTask<T> f = ForkJoinTask.adapt(t);
3374 futures.add(f);
3375 poolSubmit(true, f);
3376 }
3377 for (int i = futures.size() - 1; i >= 0; --i)
3378 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
3379 return futures;
3380 } catch (Throwable t) {
3381 for (Future<T> e : futures)
3382 e.cancel(true);
3383 throw t;
3384 }
3385 }
3386
3387 /**
3388 * Common support for timed and untimed invokeAll
3389 */
3390 private <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
3391 long deadline)
3392 throws InterruptedException {
3393 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
3394 try {
3395 for (Callable<T> t : tasks) {
3396 ForkJoinTask<T> f = ForkJoinTask.adaptInterruptible(t);
3397 futures.add(f);
3398 poolSubmit(true, f);
3399 }
3400 for (int i = futures.size() - 1; i >= 0; --i)
3401 ((ForkJoinTask<?>)futures.get(i))
3402 .quietlyJoinPoolInvokeAllTask(deadline);
3403 return futures;
3404 } catch (Throwable t) {
3405 for (Future<T> e : futures)
3406 e.cancel(true);
3407 throw t;
3408 }
3409 }
3410
3411 @Override
3412 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
3413 throws InterruptedException {
3414 return invokeAll(tasks, 0L);
3415 }
3416 // for jdk version < 22, replace with
3417 // /**
3418 // * @throws NullPointerException {@inheritDoc}
3419 // * @throws RejectedExecutionException {@inheritDoc}
3420 // */
3421 // @Override
3422 // public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
3423 // return invokeAllUninterruptibly(tasks);
3424 // }
3425
3426 @Override
3427 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
3428 long timeout, TimeUnit unit)
3429 throws InterruptedException {
3430 return invokeAll(tasks, (System.nanoTime() + unit.toNanos(timeout)) | 1L);
3431 }
3432
3433 @Override
3434 public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
3435 throws InterruptedException, ExecutionException {
3436 try {
3437 return new ForkJoinTask.InvokeAnyRoot<T>()
3438 .invokeAny(tasks, this, false, 0L);
3439 } catch (TimeoutException cannotHappen) {
3440 assert false;
3441 return null;
3442 }
3443 }
3444
3445 @Override
3446 public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
3447 long timeout, TimeUnit unit)
3448 throws InterruptedException, ExecutionException, TimeoutException {
3449 return new ForkJoinTask.InvokeAnyRoot<T>()
3450 .invokeAny(tasks, this, true, unit.toNanos(timeout));
3451 }
3452
3453 // Support for delayed tasks
3454
3455 /**
3456 * Returns STOP and SHUTDOWN status (zero if neither), masking or
3457 * truncating out other bits.
3458 */
3459 final int shutdownStatus(DelayScheduler ds) {
3460 return (int)(runState & (SHUTDOWN | STOP));
3461 }
3462
3463 /**
3464 * Tries to stop and possibly terminate if already enabled, return success.
3465 */
3466 final boolean tryStopIfShutdown(DelayScheduler ds) {
3467 return (tryTerminate(false, false) & STOP) != 0L;
3468 }
3469
3470 /**
3471 * Creates and starts DelayScheduler
3472 */
3473 private DelayScheduler startDelayScheduler() {
3474 DelayScheduler ds;
3475 if ((ds = delayScheduler) == null) {
3476 boolean start = false;
3477 String name = poolName + "-delayScheduler";
3478 if (workerNamePrefix == null)
3479 asyncCommonPool(); // override common parallelism zero
3480 long isShutdown = lockRunState() & SHUTDOWN;
3481 try {
3482 if (isShutdown == 0L && (ds = delayScheduler) == null) {
3483 ds = delayScheduler = new DelayScheduler(this, name);
3484 start = true;
3485 }
3486 } finally {
3487 unlockRunState();
3488 }
3489 if (start) { // start outside of lock
3490 SharedThreadContainer ctr;
3491 try {
3492 if ((ctr = container) != null)
3493 ctr.start(ds);
3494 else
3495 ds.start();
3496 } catch (RuntimeException | Error ex) { // back out
3497 lockRunState();
3498 ds = delayScheduler = null;
3499 unlockRunState();
3500 tryTerminate(false, false);
3501 if (ex instanceof Error)
3502 throw ex;
3503 }
3504 }
3505 }
3506 return ds;
3507 }
3508
3509 /**
3510 * Arranges execution of a ScheduledForkJoinTask whose delay has
3511 * elapsed
3512 */
3513 final void executeEnabledScheduledTask(ScheduledForkJoinTask<?> task) {
3514 externalSubmissionQueue(false).push(task, this, false);
3515 }
3516
3517 /**
3518 * Arranges delayed execution of a ScheduledForkJoinTask via the
3519 * DelayScheduler, creating and starting it if necessary.
3520 * @return the task
3521 */
3522 final <T> ScheduledForkJoinTask<T> scheduleDelayedTask(ScheduledForkJoinTask<T> task) {
3523 DelayScheduler ds;
3524 if (((ds = delayScheduler) == null &&
3525 (ds = startDelayScheduler()) == null) ||
3526 (runState & SHUTDOWN) != 0L)
3527 throw new RejectedExecutionException();
3528 ds.pend(task);
3529 return task;
3530 }
3531
3532 /**
3533 * Submits a one-shot task that becomes enabled for execution after the given
3534 * delay. At that point it will execute unless explicitly
3535 * cancelled, or fail to execute (eventually reporting
3536 * cancellation) when encountering resource exhaustion, or the
3537 * pool is {@link #shutdownNow}, or is {@link #shutdown} when
3538 * otherwise quiescent and {@link #cancelDelayedTasksOnShutdown}
3539 * is in effect.
3540 *
3541 * @param command the task to execute
3542 * @param delay the time from now to delay execution
3543 * @param unit the time unit of the delay parameter
3544 * @return a ForkJoinTask implementing the ScheduledFuture
3545 * interface, whose {@code get()} method will return
3546 * {@code null} upon normal completion.
3547 * @throws RejectedExecutionException if the pool is shutdown or
3548 * submission encounters resource exhaustion.
3549 * @throws NullPointerException if command or unit is null
3550 * @since 25
3551 */
3552 public ScheduledFuture<?> schedule(Runnable command,
3553 long delay, TimeUnit unit) {
3554 return scheduleDelayedTask(
3555 new ScheduledForkJoinTask<Void>(
3556 unit.toNanos(delay), 0L, false, // implicit null check of unit
3557 Objects.requireNonNull(command), null, this));
3558 }
3559
3560 /**
3561 * Submits a value-returning one-shot task that becomes enabled for execution
3562 * after the given delay. At that point it will execute unless
3563 * explicitly cancelled, or fail to execute (eventually reporting
3564 * cancellation) when encountering resource exhaustion, or the
3565 * pool is {@link #shutdownNow}, or is {@link #shutdown} when
3566 * otherwise quiescent and {@link #cancelDelayedTasksOnShutdown}
3567 * is in effect.
3568 *
3569 * @param callable the function to execute
3570 * @param delay the time from now to delay execution
3571 * @param unit the time unit of the delay parameter
3572 * @param <V> the type of the callable's result
3573 * @return a ForkJoinTask implementing the ScheduledFuture
3574 * interface, whose {@code get()} method will return the
3575 * value from the callable upon normal completion.
3576 * @throws RejectedExecutionException if the pool is shutdown or
3577 * submission encounters resource exhaustion.
3578 * @throws NullPointerException if command or unit is null
3579 * @since 25
3580 */
3581 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
3582 long delay, TimeUnit unit) {
3583 return scheduleDelayedTask(
3584 new ScheduledForkJoinTask<V>(
3585 unit.toNanos(delay), 0L, false, null, // implicit null check of unit
3586 Objects.requireNonNull(callable), this));
3587 }
3588
3589 /**
3590 * Submits a periodic action that becomes enabled for execution first after the
3591 * given initial delay, and subsequently with the given period;
3592 * that is, executions will commence after
3593 * {@code initialDelay}, then {@code initialDelay + period}, then
3594 * {@code initialDelay + 2 * period}, and so on.
3595 *
3596 * <p>The sequence of task executions continues indefinitely until
3597 * one of the following exceptional completions occur:
3598 * <ul>
3599 * <li>The task is {@linkplain Future#cancel explicitly cancelled}
3600 * <li>Method {@link #shutdownNow} is called
3601 * <li>Method {@link #shutdown} is called and the pool is
3602 * otherwise quiescent, in which case existing executions continue
3603 * but subsequent executions do not.
3604 * <li>An execution or the task encounters resource exhaustion.
3605 * <li>An execution of the task throws an exception. In this case
3606 * calling {@link Future#get() get} on the returned future will throw
3607 * {@link ExecutionException}, holding the exception as its cause.
3608 * </ul>
3609 * Subsequent executions are suppressed. Subsequent calls to
3610 * {@link Future#isDone isDone()} on the returned future will
3611 * return {@code true}.
3612 *
3613 * <p>If any execution of this task takes longer than its period, then
3614 * subsequent executions may start late, but will not concurrently
3615 * execute.
3616 * @param command the task to execute
3617 * @param initialDelay the time to delay first execution
3618 * @param period the period between successive executions
3619 * @param unit the time unit of the initialDelay and period parameters
3620 * @return a ForkJoinTask implementing the ScheduledFuture
3621 * interface. The future's {@link Future#get() get()}
3622 * method will never return normally, and will throw an
3623 * exception upon task cancellation or abnormal
3624 * termination of a task execution.
3625 * @throws RejectedExecutionException if the pool is shutdown or
3626 * submission encounters resource exhaustion.
3627 * @throws NullPointerException if command or unit is null
3628 * @throws IllegalArgumentException if period less than or equal to zero
3629 * @since 25
3630 */
3631 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
3632 long initialDelay,
3633 long period, TimeUnit unit) {
3634 if (period <= 0L)
3635 throw new IllegalArgumentException();
3636 return scheduleDelayedTask(
3637 new ScheduledForkJoinTask<Void>(
3638 unit.toNanos(initialDelay), // implicit null check of unit
3639 unit.toNanos(period), false,
3640 Objects.requireNonNull(command), null, this));
3641 }
3642
3643 /**
3644 * Submits a periodic action that becomes enabled for execution first after the
3645 * given initial delay, and subsequently with the given delay
3646 * between the termination of one execution and the commencement of
3647 * the next.
3648 * <p>The sequence of task executions continues indefinitely until
3649 * one of the following exceptional completions occur:
3650 * <ul>
3651 * <li>The task is {@linkplain Future#cancel explicitly cancelled}
3652 * <li>Method {@link #shutdownNow} is called
3653 * <li>Method {@link #shutdown} is called and the pool is
3654 * otherwise quiescent, in which case existing executions continue
3655 * but subsequent executions do not.
3656 * <li>An execution or the task encounters resource exhaustion.
3657 * <li>An execution of the task throws an exception. In this case
3658 * calling {@link Future#get() get} on the returned future will throw
3659 * {@link ExecutionException}, holding the exception as its cause.
3660 * </ul>
3661 * Subsequent executions are suppressed. Subsequent calls to
3662 * {@link Future#isDone isDone()} on the returned future will
3663 * return {@code true}.
3664 * @param command the task to execute
3665 * @param initialDelay the time to delay first execution
3666 * @param delay the delay between the termination of one
3667 * execution and the commencement of the next
3668 * @param unit the time unit of the initialDelay and delay parameters
3669 * @return a ForkJoinTask implementing the ScheduledFuture
3670 * interface. The future's {@link Future#get() get()}
3671 * method will never return normally, and will throw an
3672 * exception upon task cancellation or abnormal
3673 * termination of a task execution.
3674 * @throws RejectedExecutionException if the pool is shutdown or
3675 * submission encounters resource exhaustion.
3676 * @throws NullPointerException if command or unit is null
3677 * @throws IllegalArgumentException if delay less than or equal to zero
3678 * @since 25
3679 */
3680 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
3681 long initialDelay,
3682 long delay, TimeUnit unit) {
3683 if (delay <= 0L)
3684 throw new IllegalArgumentException();
3685 return scheduleDelayedTask(
3686 new ScheduledForkJoinTask<Void>(
3687 unit.toNanos(initialDelay), // implicit null check of unit
3688 -unit.toNanos(delay), false, // negative for fixed delay
3689 Objects.requireNonNull(command), null, this));
3690 }
3691
3692 /**
3693 * Body of a task performed on timeout of another task
3694 */
3695 static final class TimeoutAction<V> implements Runnable {
3696 // set after construction, nulled after use
3697 ForkJoinTask.CallableWithTimeout<V> task;
3698 Consumer<? super ForkJoinTask<V>> action;
3699 TimeoutAction(Consumer<? super ForkJoinTask<V>> action) {
3700 this.action = action;
3701 }
3702 public void run() {
3703 ForkJoinTask.CallableWithTimeout<V> t = task;
3704 Consumer<? super ForkJoinTask<V>> a = action;
3705 task = null;
3706 action = null;
3707 if (t != null && t.status >= 0) {
3708 if (a == null)
3709 t.cancel(true);
3710 else {
3711 a.accept(t);
3712 t.interruptIfRunning(true);
3713 }
3714 }
3715 }
3716 }
3717
3718 /**
3719 * Submits a task executing the given function, cancelling the
3720 * task or performing a given timeoutAction if not completed
3721 * within the given timeout period. If the optional {@code
3722 * timeoutAction} is null, the task is cancelled (via {@code
3723 * cancel(true)}. Otherwise, the action is applied and the task
3724 * may be interrupted if running. Actions may include {@link
3725 * ForkJoinTask#complete} to set a replacement value or {@link
3726 * ForkJoinTask#completeExceptionally} to throw an appropriate
3727 * exception. Note that these can succeed only if the task has
3728 * not already completed when the timeoutAction executes.
3729 *
3730 * @param callable the function to execute
3731 * @param <V> the type of the callable's result
3732 * @param timeout the time to wait before cancelling if not completed
3733 * @param timeoutAction if nonnull, an action to perform on
3734 * timeout, otherwise the default action is to cancel using
3735 * {@code cancel(true)}.
3736 * @param unit the time unit of the timeout parameter
3737 * @return a Future that can be used to extract result or cancel
3738 * @throws RejectedExecutionException if the task cannot be
3739 * scheduled for execution
3740 * @throws NullPointerException if callable or unit is null
3741 * @since 25
3742 */
3743 public <V> ForkJoinTask<V> submitWithTimeout(Callable<V> callable,
3744 long timeout, TimeUnit unit,
3745 Consumer<? super ForkJoinTask<V>> timeoutAction) {
3746 ForkJoinTask.CallableWithTimeout<V> task; TimeoutAction<V> onTimeout;
3747 Objects.requireNonNull(callable);
3748 ScheduledForkJoinTask<Void> timeoutTask =
3749 new ScheduledForkJoinTask<Void>(
3750 unit.toNanos(timeout), 0L, true,
3751 onTimeout = new TimeoutAction<V>(timeoutAction), null, this);
3752 onTimeout.task = task =
3753 new ForkJoinTask.CallableWithTimeout<V>(callable, timeoutTask);
3754 scheduleDelayedTask(timeoutTask);
3755 return poolSubmit(true, task);
3756 }
3757
3758 /**
3759 * Arranges that scheduled tasks that are not executing and have
3760 * not already been enabled for execution will not be executed and
3761 * will be cancelled upon {@link #shutdown} (unless this pool is
3762 * the {@link #commonPool()} which never shuts down). This method
3763 * may be invoked either before {@link #shutdown} to take effect
3764 * upon the next call, or afterwards to cancel such tasks, which
3765 * may then allow termination. Note that subsequent executions of
3766 * periodic tasks are always disabled upon shutdown, so this
3767 * method applies meaningfully only to non-periodic tasks.
3768 * @since 25
3769 */
3770 public void cancelDelayedTasksOnShutdown() {
3771 DelayScheduler ds;
3772 if ((ds = delayScheduler) != null ||
3773 (ds = startDelayScheduler()) != null)
3774 ds.cancelDelayedTasksOnShutdown();
3775 }
3776
3777 /**
3778 * Returns the factory used for constructing new workers.
3779 *
3780 * @return the factory used for constructing new workers
3781 */
3782 public ForkJoinWorkerThreadFactory getFactory() {
3783 return factory;
3784 }
3785
3786 /**
3787 * Returns the handler for internal worker threads that terminate
3788 * due to unrecoverable errors encountered while executing tasks.
3789 *
3790 * @return the handler, or {@code null} if none
3791 */
3792 public UncaughtExceptionHandler getUncaughtExceptionHandler() {
3793 return ueh;
3794 }
3795
3796 /**
3797 * Returns the targeted parallelism level of this pool.
3798 *
3799 * @return the targeted parallelism level of this pool
3800 */
3801 public int getParallelism() {
3802 return Math.max(getParallelismOpaque(), 1);
3803 }
3804
3805 /**
3806 * Returns the targeted parallelism level of the common pool.
3807 *
3808 * @return the targeted parallelism level of the common pool
3809 * @since 1.8
3810 */
3811 public static int getCommonPoolParallelism() {
3812 return common.getParallelism();
3813 }
3814
3815 /**
3816 * Returns the number of worker threads that have started but not
3817 * yet terminated. The result returned by this method may differ
3818 * from {@link #getParallelism} when threads are created to
3819 * maintain parallelism when others are cooperatively blocked.
3820 *
3821 * @return the number of worker threads
3822 */
3823 public int getPoolSize() {
3824 return (short)(ctl >>> TC_SHIFT);
3825 }
3826
3827 /**
3828 * Returns {@code true} if this pool uses local first-in-first-out
3829 * scheduling mode for forked tasks that are never joined.
3830 *
3831 * @return {@code true} if this pool uses async mode
3832 */
3833 public boolean getAsyncMode() {
3834 return (config & FIFO) != 0;
3835 }
3836
3837 /**
3838 * Returns an estimate of the number of worker threads that are
3839 * not blocked waiting to join tasks or for other managed
3840 * synchronization. This method may overestimate the
3841 * number of running threads.
3842 *
3843 * @return the number of worker threads
3844 */
3845 public int getRunningThreadCount() {
3846 WorkQueue[] qs; WorkQueue q;
3847 int rc = 0;
3848 if ((runState & TERMINATED) == 0L && (qs = queues) != null) {
3849 for (int i = 1; i < qs.length; i += 2) {
3850 if ((q = qs[i]) != null && q.isApparentlyUnblocked())
3851 ++rc;
3852 }
3853 }
3854 return rc;
3855 }
3856
3857 /**
3858 * Returns an estimate of the number of threads that are currently
3859 * stealing or executing tasks. This method may overestimate the
3860 * number of active threads.
3861 *
3862 * @return the number of active threads
3863 */
3864 public int getActiveThreadCount() {
3865 return Math.max((short)(ctl >>> RC_SHIFT), 0);
3866 }
3867
3868 /**
3869 * Returns {@code true} if all worker threads are currently idle.
3870 * An idle worker is one that cannot obtain a task to execute
3871 * because none are available to steal from other threads, and
3872 * there are no pending submissions to the pool. This method is
3873 * conservative; it might not return {@code true} immediately upon
3874 * idleness of all threads, but will eventually become true if
3875 * threads remain inactive.
3876 *
3877 * @return {@code true} if all threads are currently idle
3878 */
3879 public boolean isQuiescent() {
3880 return quiescent() >= 0;
3881 }
3882
3883 /**
3884 * Returns an estimate of the total number of completed tasks that
3885 * were executed by a thread other than their submitter. The
3886 * reported value underestimates the actual total number of steals
3887 * when the pool is not quiescent. This value may be useful for
3888 * monitoring and tuning fork/join programs: in general, steal
3889 * counts should be high enough to keep threads busy, but low
3890 * enough to avoid overhead and contention across threads.
3891 *
3892 * @return the number of steals
3893 */
3894 public long getStealCount() {
3895 long count = stealCount;
3896 WorkQueue[] qs; WorkQueue q;
3897 if ((qs = queues) != null) {
3898 for (int i = 1; i < qs.length; i += 2) {
3899 if ((q = qs[i]) != null)
3900 count += (long)q.nsteals & 0xffffffffL;
3901 }
3902 }
3903 return count;
3904 }
3905
3906 /**
3907 * Returns an estimate of the total number of tasks currently held
3908 * in queues by worker threads (but not including tasks submitted
3909 * to the pool that have not begun executing). This value is only
3910 * an approximation, obtained by iterating across all threads in
3911 * the pool. This method may be useful for tuning task
3912 * granularities.The returned count does not include scheduled
3913 * tasks that are not yet ready to execute, which are reported
3914 * separately by method {@link getDelayedTaskCount}.
3915 *
3916 * @return the number of queued tasks
3917 * @see ForkJoinWorkerThread#getQueuedTaskCount()
3918 */
3919 public long getQueuedTaskCount() {
3920 WorkQueue[] qs; WorkQueue q;
3921 long count = 0;
3922 if ((runState & TERMINATED) == 0L && (qs = queues) != null) {
3923 for (int i = 1; i < qs.length; i += 2) {
3924 if ((q = qs[i]) != null)
3925 count += q.queueSize();
3926 }
3927 }
3928 return count;
3929 }
3930
3931 /**
3932 * Returns an estimate of the number of tasks submitted to this
3933 * pool that have not yet begun executing. This method may take
3934 * time proportional to the number of submissions.
3935 *
3936 * @return the number of queued submissions
3937 */
3938 public int getQueuedSubmissionCount() {
3939 WorkQueue[] qs; WorkQueue q;
3940 int count = 0;
3941 if ((runState & TERMINATED) == 0L && (qs = queues) != null) {
3942 for (int i = 0; i < qs.length; i += 2) {
3943 if ((q = qs[i]) != null)
3944 count += q.queueSize();
3945 }
3946 }
3947 return count;
3948 }
3949
3950 /**
3951 * Returns an estimate of the number of delayed (including
3952 * periodic) tasks scheduled in this pool that are not yet ready
3953 * to submit for execution. The returned value is inaccurate while
3954 * delayed tasks are being processed.
3955 *
3956 * @return an estimate of the number of delayed tasks
3957 * @since 25
3958 */
3959 public long getDelayedTaskCount() {
3960 DelayScheduler ds;
3961 return ((ds = delayScheduler) == null ? 0 : ds.lastStableSize());
3962 }
3963
3964 /**
3965 * Returns {@code true} if there are any tasks submitted to this
3966 * pool that have not yet begun executing.
3967 *
3968 * @return {@code true} if there are any queued submissions
3969 */
3970 public boolean hasQueuedSubmissions() {
3971 WorkQueue[] qs; WorkQueue q;
3972 if ((runState & STOP) == 0L && (qs = queues) != null) {
3973 for (int i = 0; i < qs.length; i += 2) {
3974 if ((q = qs[i]) != null && q.queueSize() > 0)
3975 return true;
3976 }
3977 }
3978 return false;
3979 }
3980
3981 /**
3982 * Removes and returns the next unexecuted submission if one is
3983 * available. This method may be useful in extensions to this
3984 * class that re-assign work in systems with multiple pools.
3985 *
3986 * @return the next submission, or {@code null} if none
3987 */
3988 protected ForkJoinTask<?> pollSubmission() {
3989 return pollScan(true);
3990 }
3991
3992 /**
3993 * Removes all available unexecuted submitted and forked tasks
3994 * from scheduling queues and adds them to the given collection,
3995 * without altering their execution status. These may include
3996 * artificially generated or wrapped tasks. This method is
3997 * designed to be invoked only when the pool is known to be
3998 * quiescent. Invocations at other times may not remove all
3999 * tasks. A failure encountered while attempting to add elements
4000 * to collection {@code c} may result in elements being in
4001 * neither, either or both collections when the associated
4002 * exception is thrown. The behavior of this operation is
4003 * undefined if the specified collection is modified while the
4004 * operation is in progress.
4005 *
4006 * @param c the collection to transfer elements into
4007 * @return the number of elements transferred
4008 */
4009 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
4010 int count = 0;
4011 for (ForkJoinTask<?> t; (t = pollScan(false)) != null; ) {
4012 c.add(t);
4013 ++count;
4014 }
4015 return count;
4016 }
4017
4018 /**
4019 * Returns a string identifying this pool, as well as its state,
4020 * including indications of run state, parallelism level, and
4021 * worker and task counts.
4022 *
4023 * @return a string identifying this pool, as well as its state
4024 */
4025 public String toString() {
4026 // Use a single pass through queues to collect counts
4027 DelayScheduler ds;
4028 long e = runState;
4029 long st = stealCount;
4030 long qt = 0L, ss = 0L; int rc = 0;
4031 WorkQueue[] qs; WorkQueue q;
4032 if ((qs = queues) != null) {
4033 for (int i = 0; i < qs.length; ++i) {
4034 if ((q = qs[i]) != null) {
4035 int size = q.queueSize();
4036 if ((i & 1) == 0)
4037 ss += size;
4038 else {
4039 qt += size;
4040 st += (long)q.nsteals & 0xffffffffL;
4041 if (q.isApparentlyUnblocked())
4042 ++rc;
4043 }
4044 }
4045 }
4046 }
4047 String delayed = ((ds = delayScheduler) == null ? "" :
4048 ", delayed = " + ds.lastStableSize());
4049 int pc = parallelism;
4050 long c = ctl;
4051 int tc = (short)(c >>> TC_SHIFT);
4052 int ac = (short)(c >>> RC_SHIFT);
4053 if (ac < 0) // ignore transient negative
4054 ac = 0;
4055 String level = ((e & TERMINATED) != 0L ? "Terminated" :
4056 (e & STOP) != 0L ? "Terminating" :
4057 (e & SHUTDOWN) != 0L ? "Shutting down" :
4058 "Running");
4059 return super.toString() +
4060 "[" + level +
4061 ", parallelism = " + pc +
4062 ", size = " + tc +
4063 ", active = " + ac +
4064 ", running = " + rc +
4065 ", steals = " + st +
4066 ", tasks = " + qt +
4067 ", submissions = " + ss +
4068 delayed +
4069 "]";
4070 }
4071
4072 /**
4073 * Possibly initiates an orderly shutdown in which previously
4074 * submitted tasks are executed, but no new tasks will be
4075 * accepted. Invocation has no effect on execution state if this
4076 * is the {@link #commonPool()}, and no additional effect if
4077 * already shut down. Tasks that are in the process of being
4078 * submitted concurrently during the course of this method may or
4079 * may not be rejected.
4080 */
4081 public void shutdown() {
4082 if (workerNamePrefix != null) // not common pool
4083 tryTerminate(false, true);
4084 }
4085
4086 /**
4087 * Possibly attempts to cancel and/or stop all tasks, and reject
4088 * all subsequently submitted tasks. Invocation has no effect on
4089 * execution state if this is the {@link #commonPool()}, and no
4090 * additional effect if already shut down. Otherwise, tasks that
4091 * are in the process of being submitted or executed concurrently
4092 * during the course of this method may or may not be
4093 * rejected. This method cancels both existing and unexecuted
4094 * tasks, in order to permit termination in the presence of task
4095 * dependencies. So the method always returns an empty list
4096 * (unlike the case for some other Executors).
4097 *
4098 * @return an empty list
4099 */
4100 public List<Runnable> shutdownNow() {
4101 if (workerNamePrefix != null) // not common pool
4102 tryTerminate(true, true);
4103 return Collections.emptyList();
4104 }
4105
4106 /**
4107 * Returns {@code true} if all tasks have completed following shut down.
4108 *
4109 * @return {@code true} if all tasks have completed following shut down
4110 */
4111 public boolean isTerminated() {
4112 return (tryTerminate(false, false) & TERMINATED) != 0;
4113 }
4114
4115 /**
4116 * Returns {@code true} if the process of termination has
4117 * commenced but not yet completed. This method may be useful for
4118 * debugging. A return of {@code true} reported a sufficient
4119 * period after shutdown may indicate that submitted tasks have
4120 * ignored or suppressed interruption, or are waiting for I/O,
4121 * causing this executor not to properly terminate. (See the
4122 * advisory notes for class {@link ForkJoinTask} stating that
4123 * tasks should not normally entail blocking operations. But if
4124 * they do, they must abort them on interrupt.)
4125 *
4126 * @return {@code true} if terminating but not yet terminated
4127 */
4128 public boolean isTerminating() {
4129 return (tryTerminate(false, false) & (STOP | TERMINATED)) == STOP;
4130 }
4131
4132 /**
4133 * Returns {@code true} if this pool has been shut down.
4134 *
4135 * @return {@code true} if this pool has been shut down
4136 */
4137 public boolean isShutdown() {
4138 return (runState & SHUTDOWN) != 0L;
4139 }
4140
4141 /**
4142 * Blocks until all tasks have completed execution after a
4143 * shutdown request, or the timeout occurs, or the current thread
4144 * is interrupted, whichever happens first. Because the {@link
4145 * #commonPool()} never terminates until program shutdown, when
4146 * applied to the common pool, this method is equivalent to {@link
4147 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
4148 *
4149 * @param timeout the maximum time to wait
4150 * @param unit the time unit of the timeout argument
4151 * @return {@code true} if this executor terminated and
4152 * {@code false} if the timeout elapsed before termination
4153 * @throws InterruptedException if interrupted while waiting
4154 */
4155 public boolean awaitTermination(long timeout, TimeUnit unit)
4156 throws InterruptedException {
4157 long nanos = unit.toNanos(timeout);
4158 CountDownLatch done;
4159 if (workerNamePrefix == null) { // is common pool
4160 if (helpQuiescePool(this, nanos, true) < 0)
4161 throw new InterruptedException();
4162 return false;
4163 }
4164 else if ((tryTerminate(false, false) & TERMINATED) != 0 ||
4165 (done = terminationSignal()) == null ||
4166 (runState & TERMINATED) != 0L)
4167 return true;
4168 else
4169 return done.await(nanos, TimeUnit.NANOSECONDS);
4170 }
4171
4172 /**
4173 * If called by a ForkJoinTask operating in this pool, equivalent
4174 * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
4175 * waits and/or attempts to assist performing tasks until this
4176 * pool {@link #isQuiescent} or the indicated timeout elapses.
4177 *
4178 * @param timeout the maximum time to wait
4179 * @param unit the time unit of the timeout argument
4180 * @return {@code true} if quiescent; {@code false} if the
4181 * timeout elapsed.
4182 */
4183 public boolean awaitQuiescence(long timeout, TimeUnit unit) {
4184 return (helpQuiescePool(this, unit.toNanos(timeout), false) > 0);
4185 }
4186
4187 /**
4188 * Unless this is the {@link #commonPool()}, initiates an orderly
4189 * shutdown in which previously submitted tasks are executed, but
4190 * no new tasks will be accepted, and waits until all tasks have
4191 * completed execution and the executor has terminated.
4192 *
4193 * <p> If already terminated, or this is the {@link
4194 * #commonPool()}, this method has no effect on execution, and
4195 * does not wait. Otherwise, if interrupted while waiting, this
4196 * method stops all executing tasks as if by invoking {@link
4197 * #shutdownNow()}. It then continues to wait until all actively
4198 * executing tasks have completed. Tasks that were awaiting
4199 * execution are not executed. The interrupted status will be
4200 * re-asserted before this method returns.
4201 *
4202 * @since 19
4203 */
4204 @Override
4205 public void close() {
4206 if (workerNamePrefix != null) {
4207 CountDownLatch done = null;
4208 boolean interrupted = false;
4209 while ((tryTerminate(interrupted, true) & TERMINATED) == 0) {
4210 if (done == null)
4211 done = terminationSignal();
4212 else {
4213 try {
4214 done.await();
4215 break;
4216 } catch (InterruptedException ex) {
4217 interrupted = true;
4218 }
4219 }
4220 }
4221 if (interrupted)
4222 Thread.currentThread().interrupt();
4223 }
4224 }
4225
4226 /**
4227 * Interface for extending managed parallelism for tasks running
4228 * in {@link ForkJoinPool}s.
4229 *
4230 * <p>A {@code ManagedBlocker} provides two methods. Method
4231 * {@link #isReleasable} must return {@code true} if blocking is
4232 * not necessary. Method {@link #block} blocks the current thread
4233 * if necessary (perhaps internally invoking {@code isReleasable}
4234 * before actually blocking). These actions are performed by any
4235 * thread invoking {@link
4236 * ForkJoinPool#managedBlock(ManagedBlocker)}. The unusual
4237 * methods in this API accommodate synchronizers that may, but
4238 * don't usually, block for long periods. Similarly, they allow
4239 * more efficient internal handling of cases in which additional
4240 * workers may be, but usually are not, needed to ensure
4241 * sufficient parallelism. Toward this end, implementations of
4242 * method {@code isReleasable} must be amenable to repeated
4243 * invocation. Neither method is invoked after a prior invocation
4244 * of {@code isReleasable} or {@code block} returns {@code true}.
4245 *
4246 * <p>For example, here is a ManagedBlocker based on a
4247 * ReentrantLock:
4248 * <pre> {@code
4249 * class ManagedLocker implements ManagedBlocker {
4250 * final ReentrantLock lock;
4251 * boolean hasLock = false;
4252 * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
4253 * public boolean block() {
4254 * if (!hasLock)
4255 * lock.lock();
4256 * return true;
4257 * }
4258 * public boolean isReleasable() {
4259 * return hasLock || (hasLock = lock.tryLock());
4260 * }
4261 * }}</pre>
4262 *
4263 * <p>Here is a class that possibly blocks waiting for an
4264 * item on a given queue:
4265 * <pre> {@code
4266 * class QueueTaker<E> implements ManagedBlocker {
4267 * final BlockingQueue<E> queue;
4268 * volatile E item = null;
4269 * QueueTaker(BlockingQueue<E> q) { this.queue = q; }
4270 * public boolean block() throws InterruptedException {
4271 * if (item == null)
4272 * item = queue.take();
4273 * return true;
4274 * }
4275 * public boolean isReleasable() {
4276 * return item != null || (item = queue.poll()) != null;
4277 * }
4278 * public E getItem() { // call after pool.managedBlock completes
4279 * return item;
4280 * }
4281 * }}</pre>
4282 */
4283 public static interface ManagedBlocker {
4284 /**
4285 * Possibly blocks the current thread, for example waiting for
4286 * a lock or condition.
4287 *
4288 * @return {@code true} if no additional blocking is necessary
4289 * (i.e., if isReleasable would return true)
4290 * @throws InterruptedException if interrupted while waiting
4291 * (the method is not required to do so, but is allowed to)
4292 */
4293 boolean block() throws InterruptedException;
4294
4295 /**
4296 * Returns {@code true} if blocking is unnecessary.
4297 * @return {@code true} if blocking is unnecessary
4298 */
4299 boolean isReleasable();
4300 }
4301
4302 /**
4303 * Runs the given possibly blocking task. When {@linkplain
4304 * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
4305 * method possibly arranges for a spare thread to be activated if
4306 * necessary to ensure sufficient parallelism while the current
4307 * thread is blocked in {@link ManagedBlocker#block blocker.block()}.
4308 *
4309 * <p>This method repeatedly calls {@code blocker.isReleasable()} and
4310 * {@code blocker.block()} until either method returns {@code true}.
4311 * Every call to {@code blocker.block()} is preceded by a call to
4312 * {@code blocker.isReleasable()} that returned {@code false}.
4313 *
4314 * <p>If not running in a ForkJoinPool, this method is
4315 * behaviorally equivalent to
4316 * <pre> {@code
4317 * while (!blocker.isReleasable())
4318 * if (blocker.block())
4319 * break;}</pre>
4320 *
4321 * If running in a ForkJoinPool, the pool may first be expanded to
4322 * ensure sufficient parallelism available during the call to
4323 * {@code blocker.block()}.
4324 *
4325 * @param blocker the blocker task
4326 * @throws InterruptedException if {@code blocker.block()} did so
4327 */
4328 public static void managedBlock(ManagedBlocker blocker)
4329 throws InterruptedException {
4330 Thread t; ForkJoinPool p;
4331 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
4332 (p = ((ForkJoinWorkerThread)t).pool) != null)
4333 p.compensatedBlock(blocker);
4334 else
4335 unmanagedBlock(blocker);
4336 }
4337
4338 /** ManagedBlock for ForkJoinWorkerThreads */
4339 private void compensatedBlock(ManagedBlocker blocker)
4340 throws InterruptedException {
4341 Objects.requireNonNull(blocker);
4342 for (;;) {
4343 int comp; boolean done;
4344 long c = ctl;
4345 if (blocker.isReleasable())
4346 break;
4347 if ((runState & STOP) != 0L)
4348 throw new InterruptedException();
4349 if ((comp = tryCompensate(c)) >= 0) {
4350 try {
4351 done = blocker.block();
4352 } finally {
4353 if (comp > 0)
4354 getAndAddCtl(RC_UNIT);
4355 }
4356 if (done)
4357 break;
4358 }
4359 }
4360 }
4361
4362 /**
4363 * Invokes tryCompensate to create or re-activate a spare thread to
4364 * compensate for a thread that performs a blocking operation. When the
4365 * blocking operation is done then endCompensatedBlock must be invoked
4366 * with the value returned by this method to re-adjust the parallelism.
4367 * @return value to use in endCompensatedBlock
4368 */
4369 final long beginCompensatedBlock() {
4370 int c;
4371 do {} while ((c = tryCompensate(ctl)) < 0);
4372 return (c == 0) ? 0L : RC_UNIT;
4373 }
4374
4375 /**
4376 * Re-adjusts parallelism after a blocking operation completes.
4377 * @param post value from beginCompensatedBlock
4378 */
4379 void endCompensatedBlock(long post) {
4380 if (post > 0L) {
4381 getAndAddCtl(post);
4382 }
4383 }
4384
4385 /** ManagedBlock for external threads */
4386 private static void unmanagedBlock(ManagedBlocker blocker)
4387 throws InterruptedException {
4388 Objects.requireNonNull(blocker);
4389 do {} while (!blocker.isReleasable() && !blocker.block());
4390 }
4391
4392 @Override
4393 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
4394 Objects.requireNonNull(runnable);
4395 return (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
4396 new ForkJoinTask.AdaptedRunnable<T>(runnable, value) :
4397 new ForkJoinTask.AdaptedInterruptibleRunnable<T>(runnable, value);
4398 }
4399
4400 @Override
4401 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
4402 Objects.requireNonNull(callable);
4403 return (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
4404 new ForkJoinTask.AdaptedCallable<T>(callable) :
4405 new ForkJoinTask.AdaptedInterruptibleCallable<T>(callable);
4406 }
4407
4408 static {
4409 U = Unsafe.getUnsafe();
4410 Class<ForkJoinPool> klass = ForkJoinPool.class;
4411 try {
4412 Field poolIdsField = klass.getDeclaredField("poolIds");
4413 POOLIDS_BASE = U.staticFieldBase(poolIdsField);
4414 POOLIDS = U.staticFieldOffset(poolIdsField);
4415 } catch (NoSuchFieldException e) {
4416 throw new ExceptionInInitializerError(e);
4417 }
4418 CTL = U.objectFieldOffset(klass, "ctl");
4419 RUNSTATE = U.objectFieldOffset(klass, "runState");
4420 PARALLELISM = U.objectFieldOffset(klass, "parallelism");
4421 THREADIDS = U.objectFieldOffset(klass, "threadIds");
4422 TERMINATION = U.objectFieldOffset(klass, "termination");
4423 Class<ForkJoinTask[]> aklass = ForkJoinTask[].class;
4424 ABASE = U.arrayBaseOffset(aklass);
4425 int scale = U.arrayIndexScale(aklass);
4426 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
4427 if ((scale & (scale - 1)) != 0)
4428 throw new Error("array index scale not a power of two");
4429
4430 Class<?> dep = LockSupport.class; // ensure loaded
4431 // allow access to non-public methods
4432 JLA = SharedSecrets.getJavaLangAccess();
4433 SharedSecrets.setJavaUtilConcurrentFJPAccess(
4434 new JavaUtilConcurrentFJPAccess() {
4435 @Override
4436 public long beginCompensatedBlock(ForkJoinPool pool) {
4437 return pool.beginCompensatedBlock();
4438 }
4439 public void endCompensatedBlock(ForkJoinPool pool, long post) {
4440 pool.endCompensatedBlock(post);
4441 }
4442 });
4443 defaultForkJoinWorkerThreadFactory =
4444 new DefaultForkJoinWorkerThreadFactory();
4445 common = new ForkJoinPool((byte)0);
4446 }
4447 }