< prev index next > src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
Print this page
* @author Doug Lea
*/
public class ForkJoinPool extends AbstractExecutorService {
/*
- * Implementation Overview
- *
- * This class and its nested classes provide the main
- * functionality and control for a set of worker threads. Because
- * most internal methods and nested classes are interrelated,
- * their main rationale and descriptions are presented here;
- * individual methods and nested classes contain only brief
- * comments about details. Broadly: submissions from non-FJ
- * threads enter into submission queues. Workers take these tasks
- * and typically split them into subtasks that may be stolen by
- * other workers. Work-stealing based on randomized scans
- * generally leads to better throughput than "work dealing" in
- * which producers assign tasks to idle threads, in part because
- * threads that have finished other tasks before the signalled
- * thread wakes up (which can be a long time) can take the task
- * instead. Preference rules give first priority to processing
- * tasks from their own queues (LIFO or FIFO, depending on mode),
- * then to randomized FIFO steals of tasks in other queues. This
- * framework began as vehicle for supporting tree-structured
- * parallelism using work-stealing. Over time, its scalability
- * advantages led to extensions and changes to better support more
- * diverse usage contexts. Here's a brief history of major
- * revisions, each also with other minor features and changes.
- *
- * 1. Only handle recursively structured computational tasks
- * 2. Async (FIFO) mode and striped submission queues
- * 3. Completion-based tasks (mainly CountedCompleters)
- * 4. CommonPool and parallelStream support
- * 5. InterruptibleTasks for externally submitted tasks
- *
- * Most changes involve adaptions of base algorithms using
- * combinations of static and dynamic bitwise mode settings (both
- * here and in ForkJoinTask), and subclassing of ForkJoinTask.
- * There are a fair number of odd code constructions and design
- * decisions for components that reside at the edge of Java vs JVM
- * functionality.
- *
- * WorkQueues
- * ==========
- *
- * Most operations occur within work-stealing queues (in nested
- * class WorkQueue). These are special forms of Deques that
- * support only three of the four possible end-operations -- push,
- * pop, and poll (aka steal), under the further constraints that
- * push and pop are called only from the owning thread (or, as
- * extended here, under a lock), while poll may be called from
- * other threads. (If you are unfamiliar with them, you probably
- * want to read Herlihy and Shavit's book "The Art of
- * Multiprocessor programming", chapter 16 describing these in
- * more detail before proceeding.) The main work-stealing queue
- * design is roughly similar to those in the papers "Dynamic
- * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
- * (http://research.sun.com/scalable/pubs/index.html) and
- * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
- * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
- * The main differences ultimately stem from GC requirements that
- * we null out taken slots as soon as we can, to maintain as small
- * a footprint as possible even in programs generating huge
- * numbers of tasks. To accomplish this, we shift the CAS
- * arbitrating pop vs poll (steal) from being on the indices
- * ("base" and "top") to the slots themselves. These provide the
- * primary required memory ordering -- see "Correct and Efficient
- * Work-Stealing for Weak Memory Models" by Le, Pop, Cohen, and
- * Nardelli, PPoPP 2013
- * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
- * analysis of memory ordering requirements in work-stealing
- * algorithms similar to the one used here. We use per-operation
- * ordered writes of various kinds for updates, but usually use
- * explicit load fences for reads, to cover access of several
- * fields of possibly several objects without further constraining
- * read-by-read ordering.
- *
- * We also support a user mode in which local task processing is
- * in FIFO, not LIFO order, simply by using a local version of
- * poll rather than pop. This can be useful in message-passing
- * frameworks in which tasks are never joined, although with
- * increased contention among task producers and consumers. Also,
- * the same data structure (and class) is used for "submission
- * queues" (described below) holding externally submitted tasks,
- * that differ only in that a lock (using field "phase"; see below) is
- * required by external callers to push and pop tasks.
- *
- * Adding tasks then takes the form of a classic array push(task)
- * in a circular buffer:
- * q.array[q.top++ % length] = task;
- *
- * The actual code needs to null-check and size-check the array,
- * uses masking, not mod, for indexing a power-of-two-sized array,
- * enforces memory ordering, supports resizing, and possibly
- * signals waiting workers to start scanning (described below),
- * which requires stronger forms of order accesses.
- *
- * The pop operation (always performed by owner) is of the form:
- * if ((task = getAndSet(q.array, (q.top-1) % length, null)) != null)
- * decrement top and return task;
- * If this fails, the queue is empty. This operation is one part
- * of the nextLocalTask method, that instead does a local-poll
- * in FIFO mode.
- *
- * The poll operation is, basically:
- * if (CAS nonnull task t = q.array[k = q.base % length] to null)
- * increment base and return task;
- *
- * However, there are several more cases that must be dealt with.
- * Some of them are just due to asynchrony; others reflect
- * contention and stealing policies. Stepping through them
- * illustrates some of the implementation decisions in this class.
- *
- * * Slot k must be read with an acquiring read, which it must
- * anyway to dereference and run the task if the (acquiring)
- * CAS succeeds, but uses an explicit acquire fence to support
- * the following rechecks even if the CAS is not attempted. To
- * more easily distinguish among kinds of CAS failures, we use
- * the compareAndExchange version, and usually handle null
- * returns (indicating contention) separately from others.
- *
- * * q.base may change between reading and using its value to
- * index the slot. To avoid trying to use the wrong t, the
- * index and slot must be reread (not necessarily immediately)
- * until consistent, unless this is a local poll by owner, in
- * which case this form of inconsistency can only appear as t
- * being null, below.
- *
- * * Similarly, q.array may change (due to a resize), unless this
- * is a local poll by owner. Otherwise, when t is present, this
- * only needs consideration on CAS failure (since a CAS
- * confirms the non-resized case.)
- *
- * * t may appear null because a previous poll operation has not
- * yet incremented q.base, so the read is from an already-taken
- * index. This form of stall reflects the non-lock-freedom of
- * the poll operation. Stalls can be detected by observing that
- * q.base doesn't change on repeated reads of null t and when
- * no other alternatives apply, spin-wait for it to settle. To
- * reduce producing these kinds of stalls by other stealers, we
- * encourage timely writes to indices using otherwise
- * unnecessarily strong writes.
- *
- * * The CAS may fail, in which case we may want to retry unless
- * there is too much contention. One goal is to balance and
- * spread out the many forms of contention that may be
- * encountered across polling and other operations to avoid
- * sustained performance degradations. Across all cases where
- * alternatives exist, a bounded number of CAS misses or stalls
- * are tolerated (for slots, ctl, and elsewhere described
- * below) before taking alternative action. These may move
- * contention or retries elsewhere, which is still preferable
- * to single-point bottlenecks.
- *
- * * Even though the check "top == base" is quiescently accurate
- * to determine whether a queue is empty, it is not of much use
- * when deciding whether to try to poll or repoll after a
- * failure. Both top and base may move independently, and both
- * lag updates to the underlying array. To reduce memory
- * contention, non-owners avoid reading the "top" when
- * possible, by using one-ahead reads to check whether to
- * repoll, relying on the fact that a non-empty queue does not
- * have two null slots in a row, except in cases (resizes and
- * shifts) that can be detected with a secondary recheck that
- * is less likely to conflict with owner writes.
- *
- * The poll operations in q.poll(), scan(), helpJoin(), and
- * elsewhere differ with respect to whether other queues are
- * available to try, and the presence or nature of screening steps
- * when only some kinds of tasks can be taken. When alternatives
- * (or failing) is an option, they uniformly give up after
- * bounded numbers of stalls and/or CAS failures, which reduces
- * contention when too many workers are polling too few tasks.
- * Overall, in the aggregate, we ensure probabilistic
- * non-blockingness of work-stealing at least until checking
- * quiescence (which is intrinsically blocking): If an attempted
- * steal fails in these ways, a scanning thief chooses a different
- * target to try next. In contexts where alternatives aren't
- * available, and when progress conditions can be isolated to
- * values of a single variable, simple spinloops (using
- * Thread.onSpinWait) are used to reduce memory traffic.
- *
- * WorkQueues are also used in a similar way for tasks submitted
- * to the pool. We cannot mix these tasks in the same queues used
- * by workers. Instead, we randomly associate submission queues
- * with submitting threads, using a form of hashing. The
- * ThreadLocalRandom probe value serves as a hash code for
- * choosing existing queues, and may be randomly repositioned upon
- * contention with other submitters. In essence, submitters act
- * like workers except that they are restricted to executing local
- * tasks that they submitted (or when known, subtasks thereof).
- * Insertion of tasks in shared mode requires a lock. We use only
- * a simple spinlock (as one role of field "phase") because
- * submitters encountering a busy queue move to a different
- * position to use or create other queues. They (spin) block when
- * registering new queues, or indirectly elsewhere, by revisiting
- * later.
- *
- * Management
- * ==========
- *
- * The main throughput advantages of work-stealing stem from
- * decentralized control -- workers mostly take tasks from
- * themselves or each other, at rates that can exceed a billion
- * per second. Most non-atomic control is performed by some form
- * of scanning across or within queues. The pool itself creates,
- * activates (enables scanning for and running tasks),
- * deactivates, blocks, and terminates threads, all with minimal
- * central information. There are only a few properties that we
- * can globally track or maintain, so we pack them into a small
- * number of variables, often maintaining atomicity without
- * blocking or locking. Nearly all essentially atomic control
- * state is held in a few variables that are by far most often
- * read (not written) as status and consistency checks. We pack as
- * much information into them as we can.
- *
- * Field "ctl" contains 64 bits holding information needed to
- * atomically decide to add, enqueue (on an event queue), and
- * dequeue and release workers. To enable this packing, we
- * restrict maximum parallelism to (1<<15)-1 (which is far in
- * excess of normal operating range) to allow ids, counts, and
- * their negations (used for thresholding) to fit into 16bit
- * subfields.
- *
- * Field "runState" and per-WorkQueue field "phase" play similar
- * roles, as lockable, versioned counters. Field runState also
- * includes monotonic event bits (SHUTDOWN, STOP, and TERMINATED).
- * The version tags enable detection of state changes (by
- * comparing two reads) modulo bit wraparound. The bit range in
- * each case suffices for purposes of determining quiescence,
- * termination, avoiding ABA-like errors, and signal control, most
- * of which are ultimately based on at most 15bit ranges (due to
- * 32767 max total workers). RunState updates do not need to be
- * atomic with respect to ctl updates, but because they are not,
- * some care is required to avoid stalls. The seqLock properties
- * detect changes and conditionally upgrade to coordinate with
- * updates. It is typically held for less than a dozen
- * instructions unless the queue array is being resized, during
- * which contention is rare. To be conservative, lockRunState is
- * implemented as a spin/sleep loop. Here and elsewhere spin
- * constants are short enough to apply even on systems with few
- * available processors. In addition to checking pool status,
- * reads of runState sometimes serve as acquire fences before
- * reading other fields.
- *
- * Field "parallelism" holds the target parallelism (normally
- * corresponding to pool size). Users can dynamically reset target
- * parallelism, but is only accessed when signalling or awaiting
- * work, so only slowly has an effect in creating threads or
- * letting them time out and terminate when idle.
- *
- * Array "queues" holds references to WorkQueues. It is updated
- * (only during worker creation and termination) under the
- * runState lock. It is otherwise concurrently readable but reads
- * for use in scans (see below) are always prefaced by a volatile
- * read of runState (or equivalent constructions), ensuring that
- * its state is current at the point it is used (which is all we
- * require). To simplify index-based operations, the array size is
- * always a power of two, and all readers must tolerate null
- * slots. Worker queues are at odd indices. Worker phase ids
- * masked with SMASK match their index. Shared (submission) queues
- * are at even indices. Grouping them together in this way aids in
- * task scanning: At top-level, both kinds of queues should be
- * sampled with approximately the same probability, which is
- * simpler if they are all in the same array. But we also need to
- * identify what kind they are without looking at them, leading to
- * this odd/even scheme. One disadvantage is that there are
- * usually many fewer submission queues, so there can be many
- * wasted probes (null slots). But this is still cheaper than
- * alternatives. Other loops over the queues array vary in origin
- * and stride depending on whether they cover only submission
- * (even) or worker (odd) queues or both, and whether they require
- * randomness (in which case cyclically exhaustive strides may be
- * used).
- *
- * All worker thread creation is on-demand, triggered by task
- * submissions, replacement of terminated workers, and/or
- * compensation for blocked workers. However, all other support
- * code is set up to work with other policies. To ensure that we
- * do not hold on to worker or task references that would prevent
- * GC, all accesses to workQueues in waiting, signalling, and
- * control methods are via indices into the queues array (which is
- * one source of some of the messy code constructions here). In
- * essence, the queues array serves as a weak reference
- * mechanism. In particular, the stack top subfield of ctl stores
- * indices, not references. Operations on queues obtained from
- * these indices remain valid (with at most some unnecessary extra
- * work) even if an underlying worker failed and was replaced by
- * another at the same index. During termination, worker queue
- * array updates are disabled.
- *
- * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
- * cannot let workers spin indefinitely scanning for tasks when
- * none can be found immediately, and we cannot start/resume
- * workers unless there appear to be tasks available. On the
- * other hand, we must quickly prod them into action when new
- * tasks are submitted or generated. These latencies are mainly a
- * function of JVM park/unpark (and underlying OS) performance,
- * which can be slow and variable (even though usages are
- * streamlined as much as possible). In many usages, ramp-up time
- * is the main limiting factor in overall performance, which is
- * compounded at program start-up by JIT compilation and
- * allocation. On the other hand, throughput degrades when too
- * many threads poll for too few tasks. (See below.)
- *
- * The "ctl" field atomically maintains total and "released"
- * worker counts, plus the head of the available worker queue
- * (actually stack, represented by the lower 32bit subfield of
- * ctl). Released workers are those known to be scanning for
- * and/or running tasks (we cannot accurately determine
- * which). Unreleased ("available") workers are recorded in the
- * ctl stack. These workers are made eligible for signalling by
- * enqueuing in ctl (see method runWorker). This "queue" is a
- * form of Treiber stack. This is ideal for activating threads in
- * most-recently used order, and improves performance and
- * locality, outweighing the disadvantages of being prone to
- * contention and inability to release a worker unless it is
- * topmost on stack. The top stack state holds the value of the
- * "phase" field of the worker: its index and status, plus a
- * version counter that, in addition to the count subfields (also
- * serving as version stamps) provide protection against Treiber
- * stack ABA effects.
- *
- * Creating workers. To create a worker, we pre-increment counts
- * (serving as a reservation), and attempt to construct a
- * ForkJoinWorkerThread via its factory. On starting, the new
- * thread first invokes registerWorker, where it constructs a
- * WorkQueue and is assigned an index in the queues array
- * (expanding the array if necessary). Upon any exception across
- * these steps, or null return from factory, deregisterWorker
- * adjusts counts and records accordingly. If a null return, the
- * pool continues running with fewer than the target number
- * workers. If exceptional, the exception is propagated, generally
- * to some external caller.
- *
- * WorkQueue field "phase" encodes the queue array id in lower
- * bits, and otherwise acts similarly to the pool runState field:
- * The "IDLE" bit is clear while active (either a released worker
- * or a locked external queue), with other bits serving as a
- * version counter to distinguish changes across multiple reads.
- * Note that phase field updates lag queue CAS releases; seeing a
- * non-idle phase does not guarantee that the worker is available
- * (and so is never checked in this way).
- *
- * The ctl field also serves as the basis for memory
- * synchronization surrounding activation. This uses a more
- * efficient version of a Dekker-like rule that task producers and
- * consumers sync with each other by both writing/CASing ctl (even
- * if to its current value). However, rather than CASing ctl to
- * its current value in the common case where no action is
- * required, we reduce write contention by ensuring that
- * signalWork invocations are prefaced with a fully fenced memory
- * access (which is usually needed anyway).
- *
- * Signalling. Signals (in signalWork) cause new or reactivated
- * workers to scan for tasks. Method signalWork and its callers
- * try to approximate the unattainable goal of having the right
- * number of workers activated for the tasks at hand, but must err
- * on the side of too many workers vs too few to avoid stalls:
- *
- * * If computations are purely tree structured, it suffices for
- * every worker to activate another when it pushes a task into
- * an empty queue, resulting in O(log(#threads)) steps to full
- * activation. Emptiness must be conservatively approximated,
- * sometimes resulting in unnecessary signals. Also, to reduce
- * resource usages in some cases, at the expense of slower
- * startup in others, activation of an idle thread is preferred
- * over creating a new one, here and elsewhere.
- *
- * * If instead, tasks come in serially from only a single
- * producer, each worker taking its first (since the last
- * activation) task from a queue should propagate a signal if
- * there are more tasks in that queue. This is equivalent to,
- * but generally faster than, arranging the stealer take
- * multiple tasks, re-pushing one or more on its own queue, and
- * signalling (because its queue is empty), also resulting in
- * logarithmic full activation time
- *
- * * Because we don't know about usage patterns (or most commonly,
- * mixtures), we use both approaches, which present even more
- * opportunities to over-signal. Note that in either of these
- * contexts, signals may be (and often are) unnecessary because
- * active workers continue scanning after running tasks without
- * the need to be signalled (which is one reason work stealing
- * is often faster than alternatives), so additional workers
- * aren't needed. We filter out some of these cases by exiting
- * retry loops in signalWork if the task responsible for the
- * signal has already been taken.
- *
- * * For rapidly branching tasks that require full pool resources,
- * oversignalling is OK, because signalWork will soon have no
- * more workers to create or reactivate. But for others (mainly
- * externally submitted tasks), overprovisioning may cause very
- * noticeable slowdowns due to contention and resource
- * wastage. All remedies are intrinsically heuristic. We use a
- * strategy that works well in most cases: We track "sources"
- * (queue ids) of non-empty (usually polled) queues while
- * scanning. These are maintained in the "source" field of
- * WorkQueues for use in method helpJoin and elsewhere (see
- * below). We also maintain them as arguments/results of
- * top-level polls (argument "window" in method scan, with setup
- * in method runWorker) as an encoded sliding window of current
- * and previous two sources (or INVALID_ID if none), and stop
- * signalling when all were from the same source. These
- * mechanisms may result in transiently too few workers, but
- * once workers poll from a new source, they rapidly reactivate
- * others.
- *
- * * Despite these, signal contention and overhead effects still
- * occur during ramp-up and ramp-down of small computations.
- *
- * Scanning. Method scan performs top-level scanning for (and
- * execution of) tasks by polling a pseudo-random permutation of
- * the array (by starting at a given index, and using a constant
- * cyclically exhaustive stride.) It uses the same basic polling
- * method as WorkQueue.poll(), but restarts with a different
- * permutation on each invocation. The pseudorandom generator
- * need not have high-quality statistical properties in the long
- * term. We use Marsaglia XorShifts, seeded with the Weyl sequence
- * from ThreadLocalRandom probes, which are cheap and
- * suffice. Scans do not otherwise explicitly take into account
- * core affinities, loads, cache localities, etc, However, they do
- * exploit temporal locality (which usually approximates these) by
- * preferring to re-poll from the same queue (either in method
- * tryPoll() or scan) after a successful poll before trying others
- * (see method topLevelExec), which also reduces bookkeeping,
- * cache traffic, and scanning overhead. But it also reduces
- * fairness, which is partially counteracted by giving up on
- * contention.
- *
- * Deactivation. When method scan indicates that no tasks are
- * found by a worker, it tries to deactivate (in awaitWork),
- * giving up (and rescanning) on ctl contention. To avoid missed
- * signals during deactivation, the method rescans and reactivates
- * if there may have been a missed signal during deactivation,
- * filtering out most cases in which this is unnecessary. Because
- * idle workers are often not yet blocked (parked), we use the
- * WorkQueue parking field to advertise that a waiter actually
- * needs unparking upon signal.
- *
- * Quiescence. Workers scan looking for work, giving up when they
- * don't find any, without being sure that none are available.
- * However, some required functionality relies on consensus about
- * quiescence (also termination, discussed below). The count
- * fields in ctl allow accurate discovery of states in which all
- * workers are idle. However, because external (asynchronous)
- * submitters are not part of this vote, these mechanisms
- * themselves do not guarantee that the pool is in a quiescent
- * state with respect to methods isQuiescent, shutdown (which
- * begins termination when quiescent), helpQuiesce, and indirectly
- * others including tryCompensate. Method quiescent() is
- * used in all of these contexts. It provides checks that all
- * workers are idle and there are no submissions that they could
- * poll if they were not idle, retrying on inconsistent reads of
- * queues and using the runState seqLock to retry on queue array
- * updates. (It also reports quiescence if the pool is
- * terminating.) A true report means only that there was a moment
- * at which quiescence held. False negatives are inevitable (for
- * example when queues indices lag updates, as described above),
- * which is accommodated when (tentatively) idle by scanning for
- * work etc, and then re-invoking. This includes cases in which
- * the final unparked thread (in awaitWork) uses quiescent()
- * to check for tasks that could have been added during a race
- * window that would not be accompanied by a signal, in which case
- * re-activating itself (or any other worker) to rescan. Method
- * helpQuiesce acts similarly but cannot rely on ctl counts to
- * determine that all workers are inactive because the caller and
- * any others executing helpQuiesce are not included in counts.
- *
- * Termination. A call to shutdownNow invokes tryTerminate to
- * atomically set a runState mode bit. However, the process of
- * termination is intrinsically non-atomic. The calling thread, as
- * well as other workers thereafter terminating help cancel queued
- * tasks and interrupt other workers. These actions race with
- * unterminated workers. By default, workers check for
- * termination only when accessing pool state. This may take a
- * while but suffices for structured computational tasks. But not
- * necessarily for others. Class InterruptibleTask (see below)
- * further arranges runState checks before executing task bodies,
- * and ensures interrupts while terminating. Even so, there are no
- * guarantees after an abrupt shutdown that remaining tasks
- * complete normally or exceptionally or are cancelled.
- * Termination may fail to complete if running tasks ignore both
- * task status and interrupts and/or produce more tasks after
- * others that could cancel them have exited.
- *
- * Trimming workers. To release resources after periods of lack of
- * use, a worker starting to wait when the pool is quiescent will
- * time out and terminate if the pool has remained quiescent for
- * period given by field keepAlive (default 60sec), which applies
- * to the first timeout of a fully populated pool. Subsequent (or
- * other) cases use delays such that, if still quiescent, all will
- * be released before one additional keepAlive unit elapses.
- *
- * Joining Tasks
- * =============
- *
- * The "Join" part of ForkJoinPools consists of a set of
- * mechanisms that sometimes or always (depending on the kind of
- * task) avoid context switching or adding worker threads when one
- * task would otherwise be blocked waiting for completion of
- * another, basically, just by running that task or one of its
- * subtasks if not already taken. These mechanics are disabled for
- * InterruptibleTasks, that guarantee that callers do not execute
- * submitted tasks.
- *
- * The basic structure of joining is an extended spin/block scheme
- * in which workers check for task completions status between
- * steps to find other work, until relevant pool state stabilizes
- * enough to believe that no such tasks are available, at which
- * point blocking. This is usually a good choice of when to block
- * that would otherwise be harder to approximate.
- *
- * These forms of helping may increase stack space usage, but that
- * space is bounded in tree/dag structured procedurally parallel
- * designs to be no more than that if a task were executed only by
- * the joining thread. This is arranged by associated task
- * subclasses that also help detect and control the ways in which
- * this may occur.
- *
- * Normally, the first option when joining a task that is not done
- * is to try to take it from the local queue and run it. Method
- * tryRemoveAndExec tries to do so. For tasks with any form of
- * subtasks that must be completed first, we try to locate these
- * subtasks and run them as well. This is easy when local, but
- * when stolen, steal-backs are restricted to the same rules as
- * stealing (polling), which requires additional bookkeeping and
- * scanning. This cost is still very much worthwhile because of
- * its impact on task scheduling and resource control.
- *
- * The two methods for finding and executing subtasks vary in
- * details. The algorithm in helpJoin entails a form of "linear
- * helping". Each worker records (in field "source") the index of
- * the internal queue from which it last stole a task. (Note:
- * because chains cannot include even-numbered external queues,
- * they are ignored, and 0 is an OK default.) The scan in method
- * helpJoin uses these markers to try to find a worker to help
- * (i.e., steal back a task from and execute it) that could make
- * progress toward completion of the actively joined task. Thus,
- * the joiner executes a task that would be on its own local deque
- * if the to-be-joined task had not been stolen. This is a
- * conservative variant of the approach described in Wagner &
- * Calder "Leapfrogging: a portable technique for implementing
- * efficient futures" SIGPLAN Notices, 1993
- * (http://portal.acm.org/citation.cfm?id=155354). It differs
- * mainly in that we only record queues, not full dependency
- * links. This requires a linear scan of the queues to locate
- * stealers, but isolates cost to when it is needed, rather than
- * adding to per-task overhead. For CountedCompleters, the
- * analogous method helpComplete doesn't need stealer-tracking,
- * but requires a similar (but simpler) check of completion
- * chains.
- *
- * In either case, searches can fail to locate stealers when
- * stalls delay recording sources or issuing subtasks. We avoid
- * some of these cases by using snapshotted values of ctl as a
- * check that the numbers of workers are not changing, along with
- * rescans to deal with contention and stalls. But even when
- * accurately identified, stealers might not ever produce a task
- * that the joiner can in turn help with.
- *
- * Related method helpAsyncBlocker does not directly rely on
- * subtask structure, but instead avoids or postpones blocking of
- * tagged tasks (CompletableFuture.AsynchronousCompletionTask) by
- * executing other asyncs that can be processed in any order.
- * This is currently invoked only in non-join-based blocking
- * contexts from classes CompletableFuture and
- * SubmissionPublisher, that could be further generalized.
- *
- * When any of the above fail to avoid blocking, we rely on
- * "compensation" -- an indirect form of context switching that
- * either activates an existing worker to take the place of the
- * blocked one, or expands the number of workers.
- *
- * Compensation does not by default aim to keep exactly the target
- * parallelism number of unblocked threads running at any given
- * time. Some previous versions of this class employed immediate
- * compensations for any blocked join. However, in practice, the
- * vast majority of blockages are transient byproducts of GC and
- * other JVM or OS activities that are made worse by replacement
- * by causing longer-term oversubscription. These are inevitable
- * without (unobtainably) perfect information about whether worker
- * creation is actually necessary. False alarms are common enough
- * to negatively impact performance, so compensation is by default
- * attempted only when it appears possible that the pool could
- * stall due to lack of any unblocked workers. However, we allow
- * users to override defaults using the long form of the
- * ForkJoinPool constructor. The compensation mechanism may also
- * be bounded. Bounds for the commonPool better enable JVMs to
- * cope with programming errors and abuse before running out of
- * resources to do so.
- *
- * The ManagedBlocker extension API can't use helping so relies
- * only on compensation in method awaitBlocker. This API was
- * designed to highlight the uncertainty of compensation decisions
- * by requiring implementation of method isReleasable to abort
- * compensation during attempts to obtain a stable snapshot. But
- * users now rely upon the fact that if isReleasable always
- * returns false, the API can be used to obtain precautionary
- * compensation, which is sometimes the only reasonable option
- * when running unknown code in tasks; which is now supported more
- * simply (see method beginCompensatedBlock).
- *
- * Common Pool
- * ===========
- *
- * The static common pool always exists after static
- * initialization. Since it (or any other created pool) need
- * never be used, we minimize initial construction overhead and
- * footprint to the setup of about a dozen fields, although with
- * some System property parsing and security processing that takes
- * far longer than the actual construction when SecurityManagers
- * are used or properties are set. The common pool is
- * distinguished by having a null workerNamePrefix (which is an
- * odd convention, but avoids the need to decode status in factory
- * classes). It also has PRESET_SIZE config set if parallelism
- * was configured by system property.
- *
- * When external threads use the common pool, they can perform
- * subtask processing (see helpComplete and related methods) upon
- * joins, unless they are submitted using ExecutorService
- * submission methods, which implicitly disallow this. This
- * caller-helps policy makes it sensible to set common pool
- * parallelism level to one (or more) less than the total number
- * of available cores, or even zero for pure caller-runs. External
- * threads waiting for joins first check the common pool for their
- * task, which fails quickly if the caller did not fork to common
- * pool.
- *
- * Guarantees for common pool parallelism zero are limited to
- * tasks that are joined by their callers in a tree-structured
- * fashion or use CountedCompleters (as is true for jdk
- * parallelStreams). Support infiltrates several methods,
- * including those that retry helping steps until we are sure that
- * none apply if there are no workers.
- *
- * As a more appropriate default in managed environments, unless
- * overridden by system properties, we use workers of subclass
- * InnocuousForkJoinWorkerThread when there is a SecurityManager
- * present. These workers have no permissions set, do not belong
- * to any user-defined ThreadGroup, and clear all ThreadLocals
- * after executing any top-level task. The associated mechanics
- * may be JVM-dependent and must access particular Thread class
- * fields to achieve this effect.
- *
- * InterruptibleTasks
- * ====================
- *
- * Regular ForkJoinTasks manage task cancellation (method cancel)
- * independently from the interrupt status of threads running
- * tasks. Interrupts are issued internally only while
- * terminating, to wake up workers and cancel queued tasks. By
- * default, interrupts are cleared only when necessary to ensure
- * that calls to LockSupport.park do not loop indefinitely (park
- * returns immediately if the current thread is interrupted).
- *
- * To comply with ExecutorService specs, we use subclasses of
- * abstract class InterruptibleTask for tasks that require
- * stronger interruption and cancellation guarantees. External
- * submitters never run these tasks, even if in the common pool.
- * InterruptibleTasks include a "runner" field (implemented
- * similarly to FutureTask) to support cancel(true). Upon pool
- * shutdown, runners are interrupted so they can cancel. Since
- * external joining callers never run these tasks, they must await
- * cancellation by others, which can occur along several different
- * paths.
- *
- * Across these APIs, rules for reporting exceptions for tasks
- * with results accessed via join() differ from those via get(),
- * which differ from those invoked using pool submit methods by
- * non-workers (which comply with Future.get() specs). Internal
- * usages of ForkJoinTasks ignore interrupt status when executing
- * or awaiting completion. Otherwise, reporting task results or
- * exceptions is preferred to throwing InterruptedExecptions,
- * which are in turn preferred to timeouts. Similarly, completion
- * status is preferred to reporting cancellation. Cancellation is
- * reported as an unchecked exception by join(), and by worker
- * calls to get(), but is otherwise wrapped in a (checked)
- * ExecutionException.
- *
- * Worker Threads cannot be VirtualThreads, as enforced by
- * requiring ForkJoinWorkerThreads in factories. There are
- * several constructions relying on this. However as of this
- * writing, virtual thread bodies are by default run as some form
- * of InterruptibleTask.
- *
- * Memory placement
- * ================
- *
- * Performance is very sensitive to placement of instances of
- * ForkJoinPool and WorkQueues and their queue arrays, as well as
- * the placement of their fields. Caches misses and contention due
- * to false-sharing have been observed to slow down some programs
- * by more than a factor of four. Effects may vary across initial
- * memory configuarations, applications, and different garbage
- * collectors and GC settings, so there is no perfect solution.
- * Too much isolation may generate more cache misses in common
- * cases (because some fields snd slots are usually read at the
- * same time). The @Contended annotation provides only rough
- * control (for good reason). Similarly for relying on fields
- * being placed in size-sorted declaration order.
- *
- * We isolate the ForkJoinPool.ctl field that otherwise causes the
- * most false-sharing misses with respect to other fields. Also,
- * ForkJoinPool fields are ordered such that fields less prone to
- * contention effects are first, offsetting those that otherwise
- * would be, while also reducing total footprint vs using
- * multiple @Contended regions, which tends to slow down
- * less-contended applications. For class WorkQueue, an
- * embedded @Contended region segregates fields most heavily
- * updated by owners from those most commonly read by stealers or
- * other management. Initial sizing and resizing of WorkQueue
- * arrays is an even more delicate tradeoff because the best
- * strategy systematically varies across garbage collectors. Small
- * arrays are better for locality and reduce GC scan time, but
- * large arrays reduce both direct false-sharing and indirect
- * cases due to GC bookkeeping (cardmarks etc), and reduce the
- * number of resizes, which are not especially fast because they
- * require atomic transfers. Currently, arrays are initialized to
- * be fairly small. (Maintenance note: any changes in fields,
- * queues, or their uses, or JVM layout policies, must be
- * accompanied by re-evaluation of these placement and sizing
- * decisions.)
- *
- * Style notes
- * ===========
- *
- * Memory ordering relies mainly on atomic operations (CAS,
- * getAndSet, getAndAdd) along with moded accesses. These use
- * jdk-internal Unsafe for atomics and special memory modes,
- * rather than VarHandles, to avoid initialization dependencies in
- * other jdk components that require early parallelism. This can
- * be awkward and ugly, but also reflects the need to control
- * outcomes across the unusual cases that arise in very racy code
- * with very few invariants. All atomic task slot updates use
- * Unsafe operations requiring offset positions, not indices, as
- * computed by method slotOffset. All fields are read into locals
- * before use, and null-checked if they are references, even if
- * they can never be null under current usages. Usually,
- * computations (held in local variables) are defined as soon as
- * logically enabled, sometimes to convince compilers that they
- * may be performed despite memory ordering constraints. Array
- * accesses using masked indices include checks (that are always
- * true) that the array length is non-zero to avoid compilers
- * inserting more expensive traps. This is usually done in a
- * "C"-like style of listing declarations at the heads of methods
- * or blocks, and using inline assignments on first encounter.
- * Nearly all explicit checks lead to bypass/return, not exception
- * throws, because they may legitimately arise during shutdown. A
- * few unusual loop constructions encourage (with varying
- * effectiveness) JVMs about where (not) to place safepoints.
- *
- * There is a lot of representation-level coupling among classes
- * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
- * fields of WorkQueue maintain data structures managed by
- * ForkJoinPool, so are directly accessed. There is little point
- * trying to reduce this, since any associated future changes in
- * representations will need to be accompanied by algorithmic
- * changes anyway. Several methods intrinsically sprawl because
- * they must accumulate sets of consistent reads of fields held in
- * local variables. Some others are artificially broken up to
- * reduce producer/consumer imbalances due to dynamic compilation.
- * There are also other coding oddities (including several
- * unnecessary-looking hoisted null checks) that help some methods
- * perform reasonably even when interpreted (not compiled).
- *
- * The order of declarations in this file is (with a few exceptions):
- * (1) Static configuration constants
- * (2) Static utility functions
- * (3) Nested (static) classes
- * (4) Fields, along with constants used when unpacking some of them
- * (5) Internal control methods
- * (6) Callbacks and other support for ForkJoinTask methods
- * (7) Exported methods
- * (8) Static block initializing statics in minimally dependent order
- *
- * Revision notes
- * ==============
- *
- * The main sources of differences from previous version are:
- *
- * * New abstract class ForkJoinTask.InterruptibleTask ensures
- * handling of tasks submitted under the ExecutorService
- * API are consistent with specs.
- * * Method quiescent() replaces previous quiescence-related
- * checks, relying on versioning and sequence locking instead
- * of ReentrantLock.
- * * Termination processing now ensures that internal data
- * structures are maintained consistently enough while stopping
- * to interrupt all workers and cancel all tasks. It also uses a
- * CountDownLatch instead of a Condition for termination because
- * of lock change.
- * * Many other changes to avoid performance regressions due
- * to the above.
+ * Implementation Overview -- omitted until stable
+ *
*/
// static configuration constants
/**
// {pool, workQueue} config bits
static final int FIFO = 1 << 0; // fifo queue or access mode
static final int CLEAR_TLS = 1 << 1; // set for Innocuous workers
static final int PRESET_SIZE = 1 << 2; // size was set by property
- // source history window packing used in scan() and runWorker()
- static final long RESCAN = 1L << 63; // must be negative
- static final long HMASK = ((((long)SMASK) << 32) |
- (((long)SMASK) << 16)); // history bits
- static final long NO_HISTORY = ((((long)INVALID_ID) << 32) | // no 3rd
- (((long)INVALID_ID) << 16)); // no 2nd
-
// others
static final int DEREGISTERED = 1 << 31; // worker terminating
static final int UNCOMPENSATE = 1 << 16; // tryCompensate return
static final int IDLE = 1 << 16; // phase seqlock/version count
U.putIntVolatile(this, BASE, v);
}
final void updateTop(int v) {
U.putIntOpaque(this, TOP, v);
}
- final void forgetSource() {
- U.putIntOpaque(this, SOURCE, 0);
+ final void setSource(int v) {
+ U.getAndSetInt(this, SOURCE, v);
}
final void updateArray(ForkJoinTask<?>[] a) {
U.getAndSetReference(this, ARRAY, a);
}
final void unlockPhase() {
newArray = new ForkJoinTask<?>[newCap];
} catch (OutOfMemoryError ex) {
}
if (newArray != null) { // else throw on next push
int newMask = newCap - 1; // poll old, push to new
+ p = newMask & s;
for (int k = s, j = cap; j > 0; --j, --k) {
ForkJoinTask<?> u;
if ((u = (ForkJoinTask<?>)U.getAndSetReference(
a, slotOffset(k & m), null)) == null)
break; // lost to pollers
newArray[k & newMask] = u;
}
- updateArray(newArray); // fully fenced
+ updateArray(a = newArray); // fully fenced
}
- a = null; // always signal
}
if (!internal)
unlockPhase();
- if ((a == null || a[m & (s - 1)] == null) && pool != null)
+ if ((room == 0 || a[m & (s - 1)] == null) && pool != null)
pool.signalWork(a, p);
}
/**
* Takes next task, if one exists, in order specified by mode,
*/
private ForkJoinTask<?> nextLocalTask(int fifo) {
ForkJoinTask<?> t = null;
ForkJoinTask<?>[] a = array;
int b = base, p = top, cap;
- if (a != null && (cap = a.length) > 0) {
- for (int m = cap - 1, s, nb; p - b > 0; ) {
+ if (p - b > 0 && a != null && (cap = a.length) > 0) {
+ for (int m = cap - 1, s, nb;;) {
if (fifo == 0 || (nb = b + 1) == p) {
if ((t = (ForkJoinTask<?>)U.getAndSetReference(
a, slotOffset(m & (s = p - 1)), null)) != null)
updateTop(s); // else lost race for only task
break;
}
while (b == (b = base)) {
U.loadFence();
Thread.onSpinWait(); // spin to reduce memory traffic
}
+ if (p - b <= 0)
+ break;
}
}
return t;
}
}
/**
* Polls for a task. Used only by non-owners.
*
- * @param pool if nonnull, pool to signal if more tasks exist
*/
- final ForkJoinTask<?> poll(ForkJoinPool pool) {
+ final ForkJoinTask<?> poll() {
for (;;) {
ForkJoinTask<?>[] a = array;
int b = base, cap, k;
if (a == null || (cap = a.length) <= 0)
break;
if (t == null)
o = a[k];
else if (t == (o = U.compareAndExchangeReference(
a, slotOffset(k), t, null))) {
updateBase(nb);
- if (a[nk] != null && pool != null)
- pool.signalWork(a, nk); // propagate
return t;
}
if (o == null && a[nk] == null && array == a &&
(phase & (IDLE | 1)) != 0 && top - base <= 0)
break; // empty
}
}
return null;
}
+ // specialized execution methods
+
/**
- * Tries to poll next task in FIFO order, failing without
- * retries on contention or stalls. Used only by topLevelExec
- * to repoll from the queue obtained from pool.scan.
+ * Runs the given task, as well as remaining local tasks, plus
+ * those from src queue that can be taken without interference.
*/
- private ForkJoinTask<?> tryPoll() {
- ForkJoinTask<?>[] a; int cap;
- if ((a = array) != null && (cap = a.length) > 0) {
- for (int b = base, k;;) { // loop only if inconsistent
- ForkJoinTask<?> t = a[k = b & (cap - 1)];
- U.loadFence();
- if (b == (b = base)) {
- Object o;
- if (t == null)
- o = a[k];
- else if (t == (o = U.compareAndExchangeReference(
- a, slotOffset(k), t, null))) {
- updateBase(b + 1);
- return t;
- }
- if (o == null)
+ final void topLevelExec(ForkJoinTask<?> task, WorkQueue src,
+ int srcBase, int cfg) {
+ if (task != null && src != null) {
+ int fifo = cfg & FIFO, nstolen = 1;
+ for (;;) {
+ task.doExec();
+ if ((task = nextLocalTask(fifo)) == null) {
+ int k, cap; ForkJoinTask<?>[] a;
+ if (src.base != srcBase ||
+ (a = src.array) == null || (cap = a.length) <= 0 ||
+ (task = a[k = srcBase & (cap - 1)]) == null)
+ break;
+ U.loadFence();
+ if (src.base != srcBase || !U.compareAndSetReference(
+ a, slotOffset(k), task, null))
break;
+ src.updateBase(++srcBase);
+ ++nstolen;
}
}
+ nsteals += nstolen;
+ if ((cfg & CLEAR_TLS) != 0)
+ ThreadLocalRandom.eraseThreadLocals(Thread.currentThread());
}
- return null;
- }
-
- // specialized execution methods
-
- /**
- * Runs the given (stolen) task if nonnull, as well as
- * remaining local tasks and/or others available from the
- * given queue, if any.
- */
- final void topLevelExec(ForkJoinTask<?> task, WorkQueue src, int srcId) {
- int cfg = config, fifo = cfg & FIFO, nstolen = nsteals + 1;
- if ((srcId & 1) != 0) // don't record external sources
- source = srcId;
- if ((cfg & CLEAR_TLS) != 0)
- ThreadLocalRandom.eraseThreadLocals(Thread.currentThread());
- while (task != null) {
- task.doExec();
- if ((task = nextLocalTask(fifo)) == null && src != null &&
- (task = src.tryPoll()) != null)
- ++nstolen;
- }
- nsteals = nstolen;
- forgetSource();
}
/**
* Deep form of tryUnpush: Traverses from top and removes and
* runs task if present.
if ((src = w.source) != DEREGISTERED) { // else trimmed on timeout
w.source = DEREGISTERED;
if (phase != 0) { // else failed to start
replaceable = true;
if ((phase & IDLE) != 0)
- reactivate(w); // pool stopped before released
+ releaseAll(); // pool stopped before released
}
}
}
- long c = ctl;
- if (src != DEREGISTERED) // decrement counts
+ if (src != DEREGISTERED) { // decrement counts
+ long c = ctl;
do {} while (c != (c = compareAndExchangeCtl(
c, ((RC_MASK & (c - RC_UNIT)) |
(TC_MASK & (c - TC_UNIT)) |
(LMASK & c)))));
- else if ((int)c != 0)
- replaceable = true; // signal below to cascade timeouts
- if (w != null) { // cancel remaining tasks
- for (ForkJoinTask<?> t; (t = w.nextLocalTask()) != null; ) {
- try {
- t.cancel(false);
- } catch (Throwable ignore) {
+ if (w != null) { // cancel remaining tasks
+ for (ForkJoinTask<?> t; (t = w.nextLocalTask()) != null; ) {
+ try {
+ t.cancel(false);
+ } catch (Throwable ignore) {
+ }
}
}
}
if ((tryTerminate(false, false) & STOP) == 0 && w != null) {
WorkQueue[] qs; int n, i; // remove index unless terminating
long ns = w.nsteals & 0xffffffffL;
- int stop = lockRunState() & STOP;
- if (stop == 0 && (qs = queues) != null && (n = qs.length) > 0 &&
- qs[i = phase & SMASK & (n - 1)] == w) {
+ if ((lockRunState() & STOP) != 0)
+ replaceable = false;
+ else if ((qs = queues) != null && (n = qs.length) > 0 &&
+ qs[i = phase & SMASK & (n - 1)] == w) {
qs[i] = null;
stealCount += ns; // accumulate steals
}
unlockRunState();
+ if (replaceable)
+ signalWork(null, 0);
}
- if ((runState & STOP) == 0 && replaceable)
- signalWork(null, 0); // may replace unless trimmed or uninitialized
if (ex != null)
ForkJoinTask.rethrow(ex);
}
/**
*/
final void signalWork(ForkJoinTask<?>[] a, int k) {
int pc = parallelism;
for (long c = ctl;;) {
WorkQueue[] qs = queues;
- long ac = (c + RC_UNIT) & RC_MASK, nc;
- int sp = (int)c, i = sp & SMASK;
- if (qs == null || qs.length <= i)
+ if (a != null && a.length > k && k >= 0 && a[k] == null)
break;
- WorkQueue w = qs[i], v = null;
- if (sp == 0) {
- if ((short)(c >>> TC_SHIFT) >= pc)
- break;
- nc = ((c + TC_UNIT) & TC_MASK);
+ boolean done = false;
+ WorkQueue v = null;
+ long nc = 0L, ac = (c + RC_UNIT) & RC_MASK;
+ int sp = (int)c, i = sp & SMASK;
+ if ((short)(c >>> RC_SHIFT) >= pc || qs == null || qs.length <= i)
+ done = true;
+ else {
+ WorkQueue w = qs[i];
+ if (sp == 0) {
+ if ((short)(c >>> TC_SHIFT) >= pc)
+ done = true;
+ else
+ nc = ac | ((c + TC_UNIT) & TC_MASK);
+ }
+ else if ((v = w) == null)
+ done = true;
+ else
+ nc = ac | (c & TC_MASK) | (v.stackPred & LMASK);
}
- else if ((short)(c >>> RC_SHIFT) >= pc || (v = w) == null)
- break;
- else
- nc = (v.stackPred & LMASK) | (c & TC_MASK);
- if (c == (c = compareAndExchangeCtl(c, nc | ac))) {
- if (v == null)
- createWorker();
- else {
- v.phase = sp;
- if (v.parking != 0)
- U.unpark(v.owner);
+ if (c == (c = ctl)) { // confirm
+ if (done)
+ break;
+ else if (c == (c = compareAndExchangeCtl(c, nc))) {
+ if (v == null)
+ createWorker();
+ else {
+ v.phase = sp;
+ if (v.parking != 0)
+ U.unpark(v.owner);
+ }
+ break;
}
- break;
}
- if (a != null && k >= 0 && k < a.length && a[k] == null)
- break;
}
}
/**
* Reactivates the given worker, and possibly others if not top of
* ctl stack. Called only during shutdown to ensure release on
* termination.
*/
- private void reactivate(WorkQueue w) {
+ private void releaseAll() {
for (long c = ctl;;) {
WorkQueue[] qs; WorkQueue v; int sp, i;
- if ((qs = queues) == null || (sp = (int)c) == 0 ||
- qs.length <= (i = sp & SMASK) || (v = qs[i]) == null ||
- (v != w && w != null && (w.phase & IDLE) == 0))
+ if ((sp = (int)c) == 0 || (qs = queues) == null ||
+ qs.length <= (i = sp & SMASK) || (v = qs[i]) == null)
break;
if (c == (c = compareAndExchangeCtl(
c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) |
(v.stackPred & LMASK))))) {
v.phase = sp;
- if (v == w)
- break;
if (v.parking != 0)
U.unpark(v.owner);
}
}
}
/**
* Internal version of isQuiescent and related functionality.
- * @return true if terminating or all workers are inactive and
- * submission queues are empty and unlocked; if so, setting STOP
- * if shutdown is enabled
+ * @return positive if stopping, nonnegative if terminating or all
+ * workers are inactive and submission queues are empty and
+ * unlocked; if so, setting STOP if shutdown is enabled
*/
- private boolean quiescent() {
+ private int quiescent() {
outer: for (;;) {
long phaseSum = 0L;
boolean swept = false;
for (int e, prevRunState = 0; ; prevRunState = e) {
long c = ctl;
if (((e = runState) & STOP) != 0)
- return true; // terminating
+ return 1; // terminating
else if ((c & RC_MASK) > 0L)
- return false; // at least one active
+ return -1; // at least one active
else if (!swept || e != prevRunState || (e & RS_LOCK) != 0) {
long sum = c;
WorkQueue[] qs = queues; WorkQueue q;
int n = (qs == null) ? 0 : qs.length;
for (int i = 0; i < n; ++i) { // scan queues
if ((q = qs[i]) != null) {
int p = q.phase, s = q.top, b = q.base;
sum += (p & 0xffffffffL) | ((long)b << 32);
if ((p & IDLE) == 0 || s - b > 0) {
if ((i & 1) == 0 && compareAndSetCtl(c, c))
- signalWork(null, 0); // ensure live
- return false;
+ signalWork(q.array, q.base);
+ return -1;
}
}
}
swept = (phaseSum == (phaseSum = sum));
}
else if ((e & SHUTDOWN) == 0)
- return true;
+ return 0;
else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP)) {
- interruptAll(); // confirmed
- return true; // enable termination
+ releaseAll(); // confirmed
+ return 1; // enable termination
}
else
break; // restart
}
}
*
* @param w caller's WorkQueue (may be null on failed initialization)
*/
final void runWorker(WorkQueue w) {
if (w != null) {
- int phase = w.phase, r = w.stackPred; // seed from registerWorker
- long window = (long)((r >>> 16) & SMASK) | NO_HISTORY;
+ int cfg = w.config & (FIFO|CLEAR_TLS), r = w.stackPred;
+ long stat;
do {
- r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
- } while ((runState & STOP) == 0 &&
- (((window = scan(w, window, r)) < 0L ||
- ((phase = awaitWork(w, phase)) & IDLE) == 0)));
+ r = (int)(stat = scan(w, r, cfg));
+ } while ((int)(stat >>> 32) == 0 ||
+ (quiescent() <= 0 && awaitWork(w) == 0));
}
}
/**
- * Scans for and if found executes top-level tasks: Tries to poll
- * each queue starting at initial index with random stride,
- * returning next scan window and retry indicator.
+ * Scans for and if found executes top-level task
*
* @param w caller's WorkQueue
- * @param window up to three queue indices
- * @param r random seed
- * @return the next window to use, with RESCAN set for rescan
+ * @param random seed
+ * @param cfg config bits
+ * @return retry status and seed for next use
*/
- private long scan(WorkQueue w, long window, int r) {
- WorkQueue[] qs = queues;
- int n = (qs == null) ? 0 : qs.length, step = (r << 1) | 1;
- long next = window & ~RESCAN;
- outer: for (int i = (short)window, l = n; l > 0; --l, i += step) {
- int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
- if ((q = qs[j = i & SMASK & (n - 1)]) != null &&
- (a = q.array) != null && (cap = a.length) > 0) {
- for (int b = q.base;;) {
- int nb = b + 1, nk = nb & (cap - 1), k;
- ForkJoinTask<?> t = a[k = b & (cap - 1)];
- U.loadFence(); // re-read b and t
- if (b == (b = q.base)) { // else inconsistent; retry
- Object o;
- if (t == null)
- o = a[k];
- else if (t == (o = U.compareAndExchangeReference(
- a, slotOffset(k), t, null))) {
- q.updateBase(nb);
- next = RESCAN | ((window << 16) & HMASK) | j;
- if (window != next && a[nk] != null)
- signalWork(a, nk); // limit propagation
- if (w != null) // always true
- w.topLevelExec(t, q, j);
- break outer;
- }
- if (o == null) {
- if (next >= 0L && a[nk] != null)
- next |= RESCAN;
+ private long scan(WorkQueue w, int r, int cfg) {
+ int spinScans = 0; // to rescan after deactivate
+ while (w != null && (runState & STOP) == 0) {
+ WorkQueue[] qs = queues;
+ int n = (qs == null) ? 0 : qs.length;
+ long prevCtl = ctl; // for signal check
+ int phase = w.phase; // IDLE set when deactivated
+ r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // advance xorshift
+ int i = r, step = (r >>> 16) | 1; // scan random permutation
+ int stalls = 0; // move and restart if stuck
+ for (int l = n; l > 0; --l, i += step) {
+ int j; WorkQueue q;
+ if ((q = qs[j = i & SMASK & (n - 1)]) != null) {
+ for (;;) {
+ int cap, b, k; long kp; ForkJoinTask<?>[] a;
+ if ((a = q.array) == null || (cap = a.length) <= 0)
break;
+ ForkJoinTask<?> t = a[k = (cap - 1) & (b = q.base)];
+ U.loadFence();
+ if (q.base != b)
+ continue;
+ int nb = b + 1, nk = nb & (cap - 1);
+ if (U.getReference(a, kp = slotOffset(k)) != t)
+ ; // screen CAS
+ else if (t == null) { // check if empty
+ if (a[nk] == null &&
+ a[(nb + 1) & (cap - 1)] == null) {
+ if (q.top - b <= 0)
+ break; // probe slots as filter
+ }
+ else if (++stalls > n)
+ return r & LMASK; // restart to randomly move
+ else if (stalls != 1)
+ Thread.onSpinWait();
+ }
+ else if ((phase & IDLE) != 0) { // recheck or reactivate
+ long sp = w.stackPred & LMASK, sc; int np;
+ if (((phase = w.phase) & IDLE) != 0) {
+ if ((np = phase + 1) != (int)(sc = ctl))
+ break; // ineligible
+ if (compareAndSetCtl(
+ sc, sp | ((sc + RC_UNIT) & UMASK)))
+ w.phase = np;
+ }
+ return r & LMASK; // restart
+ }
+ else if (U.compareAndSetReference(a, kp, t, null)) {
+ q.base = nb;
+ w.setSource(j); // fully fenced
+ signalWork(a, nk); // signal if a[nk] nonnull
+ w.topLevelExec(t, q, nb, cfg);
+ return r & LMASK;
+ }
+ }
+ }
+ }
+ if (w.phase == phase) {
+ int ac; long c; // avoid missed signals
+ if (((ac = (short)((c = ctl) >>> RC_SHIFT)) <= 0 ||
+ c == prevCtl || ac < (short)(prevCtl >>> RC_SHIFT))) {
+ if ((phase & IDLE) == 0) { // try to deactivate
+ long ap = (phase + (IDLE << 1)) & LMASK;
+ spinScans = 0;
+ w.stackPred = (int)c; // set ctl stack link
+ w.phase = phase | IDLE;
+ while (c != (c = compareAndExchangeCtl(
+ c, ap | ((c - RC_UNIT) & UMASK)))) {
+ if (ac <= (ac = (short)(c >>> RC_SHIFT))) {
+ w.phase = phase; // nondecreasing; back out
+ break;
+ }
+ w.stackPred = (int)c; // retry
}
}
+ else if (ac <= 0 || (spinScans += ac) >= SPIN_WAITS)
+ break;
}
}
}
- return next;
+ return (1L << 32) | (r & LMASK);
}
/**
- * Tries to inactivate, and if successful, awaits signal or termination.
+ * Awaits signal or termination.
*
- * @param w the worker (may be null if already terminated)
- * @param phase current phase
- * @return current phase, with IDLE set if worker should exit
+ * @param w the WorkQueue (may be null if already terminated)
+ * @return nonzero for exit
*/
- private int awaitWork(WorkQueue w, int phase) {
- boolean quiet; // true if possibly quiescent
- int active = phase + (IDLE << 1), p = phase | IDLE, e;
- if (w != null) {
- w.phase = p; // deactivate
- long np = active & LMASK, pc = ctl; // try to enqueue
- long qc = np | ((pc - RC_UNIT) & UMASK);
- w.stackPred = (int)pc; // set ctl stack link
- if (pc != (pc = compareAndExchangeCtl(pc, qc))) {
- qc = np | ((pc - RC_UNIT) & UMASK);
- w.stackPred = (int)pc; // retry once
- if (pc != (pc = compareAndExchangeCtl(pc, qc)))
- p = w.phase = phase; // back out
+ private int awaitWork(WorkQueue w) {
+ int p = IDLE, phase;
+ if (w != null && (p = (phase = w.phase) & IDLE) != 0) {
+ int nextPhase = phase + IDLE;
+ long deadline = 0L, c; // set if all idle and w is ctl top
+ if (((c = ctl) & RC_MASK) <= 0L && (int)c == nextPhase) {
+ int np = parallelism, nt = (short)(c >>> TC_SHIFT);
+ long delay = keepAlive; // scale if not fully populated
+ if (nt != (nt = Math.max(nt, np)) && nt > 0)
+ delay = Math.max(TIMEOUT_SLOP, delay / nt);
+ long d = delay + System.currentTimeMillis();
+ deadline = (d == 0L) ? 1L : d;
}
- if (p != phase && ((e = runState) & STOP) == 0 &&
- (!(quiet = (qc & RC_MASK) <= 0L) || (e & SHUTDOWN) == 0 ||
- !(quiet = quiescent()) || (runState & STOP) == 0)) {
- long deadline = 0L; // not terminating
- if (quiet) { // use timeout if trimmable
- int nt = (short)(qc >>> TC_SHIFT);
- long delay = keepAlive; // scale if not at target
- if (nt != (nt = Math.max(nt, parallelism)) && nt > 0)
- delay = Math.max(TIMEOUT_SLOP, delay / nt);
- if ((deadline = delay + System.currentTimeMillis()) == 0L)
- deadline = 1L; // avoid zero
- }
- boolean release = quiet;
- WorkQueue[] qs = queues; // recheck queues
- int n = (qs == null) ? 0 : qs.length;
- for (int l = -n, j = active; l < n; ++l, ++j) {
- WorkQueue q; ForkJoinTask<?>[] a; int cap;
- if ((p = w.phase) == active) // interleave signal checks
- break;
- if ((q = qs[j & (n - 1)]) != null &&
- (a = q.array) != null && (cap = a.length) > 0 &&
- a[q.base & (cap - 1)] != null) {
- if (release && qc == ctl && compareAndSetCtl(qc, pc)) {
- p = w.phase = active;
- break; // possible missed signal
- }
- release = true; // track multiple or reencounter
- }
- Thread.onSpinWait(); // reduce memory traffic
- }
- if (p != active) { // emulate LockSupport.park
- LockSupport.setCurrentBlocker(this);
- w.parking = 1;
- for (;;) {
- if ((runState & STOP) != 0 || (p = w.phase) == active)
- break;
- U.park(deadline != 0L, deadline);
- if ((p = w.phase) == active || (runState & STOP) != 0)
- break;
- Thread.interrupted(); // clear for next park
- if (deadline != 0L && TIMEOUT_SLOP >
- deadline - System.currentTimeMillis()) {
- long sp = w.stackPred & LMASK, c = ctl;
- long nc = sp | (UMASK & (c - TC_UNIT));
- if (((int)c & SMASK) == (active & SMASK) &&
- compareAndSetCtl(c, nc)) {
- w.source = DEREGISTERED;
- w.phase = active;
- break; // trimmed on timeout
- }
- deadline = 0L; // no longer trimmable
+ LockSupport.setCurrentBlocker(this);
+ w.parking = 1; // enable unpark
+ for (;;) { // emulate LockSupport.park
+ if ((runState & STOP) != 0)
+ break;
+ if ((p = w.phase & IDLE) == 0)
+ break;
+ U.park(deadline != 0L, deadline);
+ if ((p = w.phase & IDLE) == 0)
+ break;
+ if ((runState & STOP) != 0)
+ break;
+ Thread.interrupted(); // clear for next park
+ if (deadline != 0L && // try to trim
+ deadline - System.currentTimeMillis() < TIMEOUT_SLOP) {
+ long sp = w.stackPred & LMASK, dc = ctl;
+ long nc = sp | (UMASK & (dc - TC_UNIT));
+ if ((int)dc == nextPhase && compareAndSetCtl(dc, nc)) {
+ WorkQueue[] qs; WorkQueue v; int vp, i;
+ w.source = DEREGISTERED;
+ w.phase = nextPhase; // try to wake up next waiter
+ if ((vp = (int)nc) != 0 && (qs = queues) != null &&
+ qs.length > (i = vp & SMASK) &&
+ (v = qs[i]) != null &&
+ compareAndSetCtl(nc, ((UMASK & (nc + RC_UNIT)) |
+ (nc & TC_MASK) |
+ (v.stackPred & LMASK)))) {
+ v.phase = vp;
+ U.unpark(v.owner);
}
+ break;
}
- w.parking = 0;
- LockSupport.setCurrentBlocker(null);
+ deadline = 0L; // no longer trimmable
}
}
+ w.parking = 0; // disable unpark
+ LockSupport.setCurrentBlocker(null);
}
return p;
}
/**
r &= ~1;
int step = (submissionsOnly) ? 2 : 1;
if ((qs = queues) != null && (n = qs.length) > 0) {
for (int i = n; i > 0; i -= step, r += step) {
if ((q = qs[r & (n - 1)]) != null &&
- (t = q.poll(this)) != null)
+ (t = q.poll()) != null)
return t;
}
}
}
return null;
* @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
* @param interruptible true if return on interrupt
* @return positive if quiescent, negative if interrupted, else 0
*/
private int externalHelpQuiesce(long nanos, boolean interruptible) {
- if (!quiescent()) {
+ if (quiescent() < 0) {
long startTime = System.nanoTime();
long maxSleep = Math.min(nanos >>> 8, MAX_SLEEP);
for (int waits = 0;;) {
ForkJoinTask<?> t;
if (interruptible && Thread.interrupted())
return -1;
else if ((t = pollScan(false)) != null) {
waits = 0;
t.doExec();
}
- else if (quiescent())
+ else if (quiescent() >= 0)
break;
else if (System.nanoTime() - startTime > nanos)
return 0;
else if (waits == 0)
waits = MIN_SLEEP;
if ((qs = queues) == null)
break;
if ((n = qs.length) <= 0)
break;
if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
- WorkQueue w = new WorkQueue(null, id, 0, false);
+ WorkQueue w = new WorkQueue(null, id, (int)config, false);
w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
int stop = lockRunState() & STOP;
if (stop == 0 && queues == qs && qs[i] == null)
q = qs[i] = w; // else discard; retry
unlockRunState();
* if no work and no active workers
* @param enable if true, terminate when next possible
* @return runState on exit
*/
private int tryTerminate(boolean now, boolean enable) {
- int e = runState;
+ int e = runState, isShutdown;
if ((e & STOP) == 0) {
if (now) {
- int s = lockRunState();
- runState = e = (s + RS_LOCK) | STOP | SHUTDOWN;
- if ((s & STOP) == 0)
- interruptAll();
+ runState = e = (lockRunState() + RS_LOCK) | STOP | SHUTDOWN;
+ releaseAll();
}
- else {
- int isShutdown = (e & SHUTDOWN);
- if (isShutdown == 0 && enable)
- getAndBitwiseOrRunState(isShutdown = SHUTDOWN);
- if (isShutdown != 0)
- quiescent(); // may trigger STOP
- e = runState;
+ else if ((isShutdown = (e & SHUTDOWN)) != 0 || enable) {
+ if (isShutdown == 0)
+ getAndBitwiseOrRunState(SHUTDOWN);
+ if (quiescent() > 0)
+ e = runState;
}
}
- if ((e & (STOP | TERMINATED)) == STOP) { // help cancel tasks
- int r = (int)Thread.currentThread().threadId(); // stagger traversals
- WorkQueue[] qs = queues;
- int n = (qs == null) ? 0 : qs.length;
- for (int l = n; l > 0; --l, ++r) {
- int j = r & SMASK & (n - 1); WorkQueue q; ForkJoinTask<?> t;
- while ((q = qs[j]) != null && q.source != DEREGISTERED &&
- (t = q.poll(null)) != null) {
- try {
- t.cancel(false);
- } catch (Throwable ignore) {
- }
- }
+ if ((e & (STOP | TERMINATED)) == STOP) {
+ if ((ctl & RC_MASK) > 0L) { // avoid if quiescent shutdown
+ helpTerminate(now);
+ e = runState;
}
- if (((e = runState) & TERMINATED) == 0 && ctl == 0L) {
+ if ((e & TERMINATED) == 0 && ctl == 0L) {
+ e |= TERMINATED;
if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0) {
CountDownLatch done; SharedThreadContainer ctr;
if ((done = termination) != null)
done.countDown();
if ((ctr = container) != null)
ctr.close();
}
- e = runState;
}
}
return e;
}
/**
- * Interrupts all workers
+ * Cancels tasks and interrupts workers
*/
- private void interruptAll() {
+ private void helpTerminate(boolean now) {
Thread current = Thread.currentThread();
+ int r = (int)current.threadId(); // stagger traversals
WorkQueue[] qs = queues;
int n = (qs == null) ? 0 : qs.length;
- for (int i = 1; i < n; i += 2) {
- WorkQueue q; Thread o;
- if ((q = qs[i]) != null && (o = q.owner) != null && o != current &&
- q.source != DEREGISTERED) {
- try {
- o.interrupt();
- } catch (Throwable ignore) {
+ for (int l = n; l > 0; --l, ++r) {
+ WorkQueue q; ForkJoinTask<?> t; Thread o;
+ int j = r & SMASK & (n - 1);
+ if ((q = qs[j]) != null && q.source != DEREGISTERED) {
+ while ((t = q.poll()) != null) {
+ try {
+ t.cancel(false);
+ } catch (Throwable ignore) {
+ }
+ }
+ if ((r & 1) != 0 && (o = q.owner) != null &&
+ o != current && q.source != DEREGISTERED &&
+ (now || !o.isInterrupted())) {
+ try {
+ o.interrupt();
+ } catch (Throwable ignore) {
+ }
}
}
}
}
* threads remain inactive.
*
* @return {@code true} if all threads are currently idle
*/
public boolean isQuiescent() {
- return quiescent();
+ return quiescent() >= 0;
}
/**
* Returns an estimate of the total number of completed tasks that
* were executed by a thread other than their submitter. The
/**
* Invokes tryCompensate to create or re-activate a spare thread to
* compensate for a thread that performs a blocking operation. When the
* blocking operation is done then endCompensatedBlock must be invoked
* with the value returned by this method to re-adjust the parallelism.
+ * @return value to use in endCompensatedBlock
*/
final long beginCompensatedBlock() {
int c;
do {} while ((c = tryCompensate(ctl)) < 0);
return (c == 0) ? 0L : RC_UNIT;
}
/**
* Re-adjusts parallelism after a blocking operation completes.
+ * @param post value from beginCompensatedBlock
*/
void endCompensatedBlock(long post) {
if (post > 0L) {
getAndAddCtl(post);
}
< prev index next >